Использование функций asyncio to_thread и create_task

Python представил свою новую парадигму параллельной обработки — asyncio — в версии 3.5 языка (см. PEP 492). Это было в 2016 году, и за прошедший период asyncio продолжал расширяться и улучшаться.

Одно из самых интересных событий произошло в Python 3.9, где в asyncio была введена новая функция под названием to_thread. Эта функция позволяет выполнять обычные синхронные, блокирующие вызовы функций асинхронно.

В частности, она запустит вашу функцию блокировки в отдельном потоке и в сочетании с функцией asyncio create_task позволит остальной части вашей программы продолжать выполнять полезную работу, пока функция блокировки работает в фоновом режиме.

Примером того, почему вы можете захотеть использовать функции to_thread и create_task, может быть, когда вам нужно вызвать API от третьей стороны, которая не реализует асинхронный интерфейс.

Хорошо, давайте начнем. Сначала мы напишем обычную синхронную программу на Python, а затем добавим в нее asyncio. Мы снова добавим в него функцию блокировки, чтобы увидеть ее эффект, и, наконец, посмотрим, как использование функций to_thread() и create_task() решит нашу проблему.

Обычный синхронный код

В нашем примере кода у нас будет функция, которая обращается к определенным популярным веб-страницам, подсчитывает количество байтов, содержащихся на домашней странице, и сохраняет это значение и связанный с ним веб-адрес в словаре. В конечном итоге мы распечатываем словарь, содержащий каждую веб-страницу и количество ее байтов.

import time 
from urllib.request import urlopen  

url_list=[ 
"https://nasa.gov", 
"https://nytimes.com", 
"https://forbes.com", 
"https://groupon.com",  
"https://huffpost.com",
"https://bbc.co.uk" 
]  

# Store our retrieved values in this dictionary
url_data={} 

def get_web_data(url): 
# added this sleep to make the time differences between the 
# the sync and async runs more obvious 
    time.sleep(5) 
    page=urlopen(url) 
    nr_bytes = len(page.read()) 
    url_data[url] = nr_bytes 
 
def main():   
    for url in url_list: 
        get_web_data(url) 

    (print(url_data) for url in url_list)   

if __name__ == "__main__":   
    s = time.perf_counter()   

    main() 

    print(url_data)  
    elapsed = time.perf_counter() - s   
    print(f"Finished in {elapsed:0.2f} seconds.") 

Вот мой результат:

{'https://nasa.gov': 12471, 'https://nytimes.com': 1333088, 
'https://forbes.com': 551301, 'https://groupon.com': 251859, 
'https://huffpost.com': 741283, 'https://bbc.co.uk': 538493}

Закончил за 36,10 секунды.

Асинхронная версия нашего кода

Чтобы создать асинхронную версию нашего кода, нам нужно поставить async перед частью def определения каждой функции, мы собираем вызовы наших функций в цикл асинхронных событий, а затем вызываем наши функции, ожидая их.

import time 
import asyncio 
from urllib.request import urlopen  

url_list=[ 
"https://nasa.gov", 
"https://nytimes.com" 
"https://forbes.com", 
"https://groupon.com",  
"https://huffpost.com", 
"https:bbc.co.uk" 
] 

url_data={} 

async def get_web_data(url): 
# added this to make the time differences between the 
# the sync and async runs more obviuos 
  await asyncio.sleep(5) 
  page=urlopen(url) 
  nr_bytes = len(page.read()) 
  url_data[url] = nr_bytes 

async def main():  
    await asyncio.gather(*[get_web_data(url) for url in url_list])  

if __name__ == "__main__":  
    s = time.perf_counter()  

    await(main())  
 
    print(sum(results))  
    elapsed = time.perf_counter() - s  
    print(f"Finished in {elapsed:0.2f} seconds.")  

Мой вывод на этот раз:

{'https://nasa.gov': 12471, 'https://forbes.com': 551312, 
'https://bbc.co.uk': 533739, 
'https://huffpost.com': 741285, 
'https://nytimes.com': 1333058, 
'https://groupon.com': 251904}

Закончили за 11.50 секунд.

По прошедшему времени вы можете сказать, что функции для получения веб-данных вызывались асинхронно.

Войдите в блокировщик

Хорошо, давайте включим гаечный ключ в работу, смешав вызов нашей «сторонней» блокирующей функции в середине нашего асинхронного кода.

Чтобы имитировать длительный процесс, все, что будет делать наша блокирующая функция, — это спать в течение 20 секунд.

def blocking_func():  
    time.sleep(20) 
    print("Blocking_func finished") 
 
# Let's call it before our async code. 

async def main():   

    blocking_func() 
    await asyncio.gather(*[get_web_data(url) for url in url_list])   

Остальная часть нашего кода остается неизменной. Это мой новый вывод:

Blocking_func готово

{'https://nasa.gov': 12471, 'https://forbes.com': 546439, 
'https://bbc.co.uk': 544406, 
'https://huffpost.com': 741758, 
'https://nytimes.com': 2144874, 
'https://groupon.com': 251807}

Финишировал за 30,64 секунды.

Часть кода async не запускалась до тех пор, пока не завершился наш блокирующий код. Как мы можем это исправить? Вот тут-то и появляются функции to_thread() и create_task(). Все, что нам нужно сделать, это поместить вызов нашей блокирующей функции внутрь вызова to_thread, затем создать асинхронную задачу на основе этого, вызвать задачу, и это должно сработать.

Функция asyncio create_task принимает сопрограмму в качестве аргумента (это то, что возвращает to_thread) и запускает ее независимо в своем собственном цикле обработки событий. Вот наш полный, окончательный асинхронный код.

import time 
import asyncio 
from urllib.request import urlopen  

url_list=[ 
"https://nasa.gov", 
"https://nytimes.com", 
"https://forbes.com", 
"https://groupon.com",  
"https://huffpost.com", 
"https://bbc.co.uk" 
]  

url_data={} 
  
async def get_web_data(url):    
    await asyncio.sleep(5) 
    page=urlopen(url) 
    nr_bytes = len(page.read()) 
    url_data[url] = nr_bytes 


def blocking_func():        
    time.sleep(20) 
    print("Blocking_func finished") 

async def main():   
    # create a coroutine for the blocking function 
    # using the asyncio to_thread() function
    blocker = asyncio.to_thread(blocking_func) 

    # Use the asyncio create_task function to execute the 
    # blocking function independently 
    task = asyncio.create_task(blocker) 
    await asyncio.gather(*[get_web_data(url) for url in url_list])   

if __name__ == "__main__":   
    s = time.perf_counter()   

    asyncio.run(main()) 
    print(url_data)  
    elapsed = time.perf_counter() - s   
    print(f"Finished in {elapsed:0.2f} seconds.") 

Это сработало? Вот мой окончательный вывод:

Blocking_func готово

{'https://nasa.gov': 12471, 'https://forbes.com': 551605, 
'https://bbc.co.uk': 548625, 'https://huffpost.com': 741758, 
'https://nytimes.com': 2144843, 'https://groupon.com': 250329}

Финишировал за 20,02 секунды.

Время выполнения выходных данных ясно показывает, что во время выполнения кода блокировки также выполнялся код для получения данных URL, поскольку общее время выполнения было всего лишь тенью по сравнению с 20 секундами, указанными в команде сна в коде блокировки. .

Это все на данный момент.

Если вам понравилась эта статья, пожалуйста, следите за новостями.

Если вы еще не являетесь средним участником и цените подобный контент, рассмотрите возможность присоединения по этой ссылке. Если вы это сделаете, я заработаю (очень) небольшую комиссию, которая поможет мне как писателю.