Python asyncio와의 동시성을 제한하는 방법은 무엇입니까?
다운로드할 링크가 여러 개 있으며 각 링크를 다운로드하는 데 다른 시간이 걸릴 수 있다고 가정합니다.그리고 저는 최대 3개의 연결만 사용하여 다운로드할 수 있습니다.이제 비동기식을 사용하여 효율적으로 이 작업을 수행하고 싶습니다.
제가 달성하고자 하는 것은 다음과 같습니다.언제든지 최소 3개의 다운로드가 실행되고 있는지 확인합니다.
Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----
숫자는 다운로드 링크를 나타내고 하이픈은 다운로드 대기를 나타냅니다.
여기 제가 지금 사용하고 있는 코드가 있습니다.
from random import randint
import asyncio
count = 0
async def download(code, permit_download, no_concurrent, downloading_event):
global count
downloading_event.set()
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
count -= 1
if count < no_concurrent and not permit_download.is_set():
permit_download.set()
async def main(loop):
global count
permit_download = asyncio.Event()
permit_download.set()
downloading_event = asyncio.Event()
no_concurrent = 3
i = 0
while i < 9:
if permit_download.is_set():
count += 1
if count >= no_concurrent:
permit_download.clear()
loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
await downloading_event.wait() # To force context to switch to download function
downloading_event.clear()
i += 1
else:
await permit_download.wait()
await asyncio.sleep(9)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
출력은 예상대로입니다.
downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8
하지만 제 질문은 다음과 같습니다.
현재, 저는 다운로드가 완료될 때까지 메인 기능을 계속 실행하기 위해 9초를 기다리는 중입니다.?
main
기능?(저도 알고 있습니다.asyncio.wait
하지만 작동하려면 모든 작업 참조를 저장해야 합니다.)이런 종류의 작업을 하는 좋은 도서관은 무엇입니까?자바스크립트에 비동기 라이브러리가 많이 있는 건 알지만, 파이썬은 어떻습니까?
편집: 2.일반적인 비동기 패턴을 처리하는 좋은 라이브러리는 무엇입니까? (비동기와 비슷한 것)
내가 틀리지 않았다면 당신은 비동기식을 찾고 있는 것입니다.세마포어.사용 예:
import asyncio
from random import randint
async def download(code):
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
sem = asyncio.Semaphore(3)
async def safe_download(i):
async with sem: # semaphore limits num of simultaneous downloads
return await download(i)
async def main():
tasks = [
asyncio.ensure_future(safe_download(i)) # creating task starts coroutine
for i
in range(9)
]
await asyncio.gather(*tasks) # await moment all downloads done
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
출력:
downloading 0 will take 3 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 3 second(s)
downloaded 1
downloaded 0
downloading 4 will take 2 second(s)
downloading 5 will take 1 second(s)
downloaded 5
downloaded 3
downloading 6 will take 3 second(s)
downloading 7 will take 1 second(s)
downloaded 4
downloading 8 will take 2 second(s)
downloaded 7
downloaded 8
downloaded 6
로 하는 예aiohttp
여기서 찾을 수 있습니다.참고:aiohttp
에는 세마포어와 동등한 기능이 내장되어 있으며, 여기서 예를 볼 수 있습니다.기본 연결 수 제한은 100개입니다.
미하일 게라시모프의 대답을 사용했고 결국 이 작은 보석으로 끝났습니다.
async def gather_with_concurrency(n, *coros):
semaphore = asyncio.Semaphore(n)
async def sem_coro(coro):
async with semaphore:
return await coro
return await asyncio.gather(*(sem_coro(c) for c in coros))
일반적인 수집 대신 실행할 수 있는 정보
await gather_with_concurrency(100, *my_coroutines)
이 답변의 나머지 부분을 읽기 전에 비동기식을 사용하여 병렬 작업의 수를 제한하는 관용적인 방법에 유의하십시오.asyncio.Semaphore
미하일의 대답에 나타나 있고 안드레이의 대답에 우아하게 추상화되어 있습니다.이 답변에는 작업 방법이 포함되어 있지만, 동일한 작업을 달성하기 위한 좀 더 복잡한 방법이 포함되어 있습니다.어떤 경우에는 이 접근 방식이 세마포보다 장점을 가질 수 있고, 특히 해야 할 작업이 매우 크거나 제한이 없을 때, 모든 코루틴을 미리 만들 수 없기 때문에 답변을 남깁니다.이 경우 두 번째(대기열 기반) 솔루션이 원하는 답입니다.그러나 iohttp를 통한 병렬 다운로드와 같은 대부분의 일반적인 상황에서는 대신 세마포를 사용해야 합니다.
기본적으로 다운로드 작업의 고정 크기 풀이 필요합니다. asyncio
미리 만들어진 작업 풀은 제공되지 않지만 쉽게 만들 수 있습니다. 작업 집합을 유지하고 제한을 초과하여 증가하지 않도록 하십시오.질문에 그 경로를 따라가는 것을 꺼리는 것으로 나타나지만, 코드는 훨씬 더 우아하게 끝납니다.
import asyncio, random
async def download(code):
wait_time = random.randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
async def main(loop):
no_concurrent = 3
dltasks = set()
i = 0
while i < 9:
if len(dltasks) >= no_concurrent:
# Wait for some download to finish before adding a new one
_done, dltasks = await asyncio.wait(
dltasks, return_when=asyncio.FIRST_COMPLETED)
dltasks.add(loop.create_task(download(i)))
i += 1
# Wait for the remaining downloads to finish
await asyncio.wait(dltasks)
의 스레드를 사용하여 작업을 하는 것입니다.asyncio.Queue
다운로드, 으로 호출되는 됩니다.download()
:
# download() defined as above
async def download_worker(q):
while True:
code = await q.get()
await download(code)
q.task_done()
async def main(loop):
q = asyncio.Queue()
workers = [loop.create_task(download_worker(q)) for _ in range(3)]
i = 0
while i < 9:
await q.put(i)
i += 1
await q.join() # wait for all tasks to be processed
for worker in workers:
worker.cancel()
await asyncio.gather(*workers, return_exceptions=True)
당신의 다른 질문에 대해서는, 명백한 선택은 다음과 같습니다.
비동기 풀 라이브러리는 사용자가 필요로 하는 작업을 정확히 수행합니다.
https://pypi.org/project/asyncio-pool/
from asyncio_pool import AioPool
LIST_OF_URLS = ("http://www.google.com", "......")
pool = AioPool(size=3)
await pool.map(your_download_coroutine, LIST_OF_URLS)
작업을 생성하는 제너레이터가 있는 경우 메모리에 동시에 넣을 수 있는 작업보다 더 많은 작업이 있을 수 있습니다.
인 클식래.asyncio.Semaphore
컨텍스트 관리자 패턴은 모든 작업을 동시에 메모리로 레이스합니다.
마음에 안 들어요asyncio.Queue
양식.설정을 통해 모든 작업을 메모리에 미리 로드하지 않도록 할 수 있습니다.maxsize=1
하지만 여전히 작업자 코루틴(큐에서 소비됨)을 정의, 시작 및 종료하려면 보일러 플레이트가 필요하며, 작업이 예외를 발생하더라도 작업자가 실패하지 않도록 보장해야 합니다.마치 자신의 것을 구현하는 것처럼 비피토닉적인 느낌이 듭니다.multiprocessing.pool
.
대신, 여기에 대안이 있습니다.
sem = asyncio.Semaphore(n := 5) # specify maximum concurrency
async def task_wrapper(args):
try:
await my_task(*args)
finally:
sem.release()
for args in my_generator: # may yield too many to list
await sem.acquire()
asyncio.create_task(task_wrapper(args))
# wait for all tasks to complete
for i in range(n):
await sem.acquire()
이렇게 하면 활성 태스크가 충분할 때 생성기가 일시 중지되고 이벤트 루프가 완료된 태스크를 정리할 수 있습니다.버전의 고참, 전 Python 대체경을 합니다.create_task
와 함께ensure_future
.
세마포어를 사용하여 함수를 감쌀 장식기를 만들 수도 있습니다.
import asyncio
from functools import wraps
def request_concurrency_limit_decorator(limit=3):
# Bind the default event loop
sem = asyncio.Semaphore(limit)
def executor(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with sem:
return await func(*args, **kwargs)
return wrapper
return executor
그런 다음 오리진 다운로드 기능에 데코레이터를 추가합니다.
@request_concurrency_limit_decorator(limit=...)
async def download(...):
...
이제 당신은 이전과 같이 다운로드 기능을 호출할 수 있지만, 동시성을 제한하기 위해 세마포어를 사용합니다.
await download(...)
데코레이터 함수가 실행되면 생성된 세마포어가 기본 이벤트 루프에 바인딩되므로 호출할 수 없습니다.asyncio.run
새 루프를 만듭니다.대신에, 전화하세요.asyncio.get_event_loop().run...
기본 이벤트 루프를 사용합니다.
비동기의세마포어 런타임 오류:작업이 다른 루프에 미래를 연결했습니다.
작은 업데이트:더 이상 루프를 만들 필요가 없습니다.아래 코드를 수정했습니다.조금만 치우시면 됩니다.
# download(code) is the same
async def main():
no_concurrent = 3
dltasks = set()
for i in range(9):
if len(dltasks) >= no_concurrent:
# Wait for some download to finish before adding a new one
_done, dltasks = await asyncio.wait(dltasks, return_when=asyncio.FIRST_COMPLETED)
dltasks.add(asyncio.create_task(download(i)))
# Wait for the remaining downloads to finish
await asyncio.wait(dltasks)
if __name__ == '__main__':
asyncio.run(main())
윈도우즈에서 FastAPI를 사용하는 경우 기본값이 64(var FD_SETSIZE로 정의됨)이므로 동시 연결 수에 따라 제한될 수 있습니다.
더 많은 정보는 https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select?redirectedfrom=MSDN 에서 확인할 수 있습니다.
ProactorEventLoop(IOCP를 사용하는 ProactorEventLoop)을 정의했음에도 불구하고 3.7 이전 Python 버전에서는 select() 루틴이 사용되어 예외가 발생합니다.
한 가지 대안은 Andrei의 답변을 사용하여 ML/DL 컨텍스트에서 동시 연결 수를 제한하는 것입니다.asyncio + 하이퍼콘 +API, 코드는 다음과 같습니다.
from hypercorn.config import Config
from hypercorn.asyncio import serve
from fastapi import FastAPI
import asyncio
import json
import time
import sys
app = FastAPI()
conn_limit = 10
async def gather_with_concurrency(n, *coros):
"""
From Andrei's answer
"""
semaphore = asyncio.Semaphore(n)
async def sem_coro(coro):
async with semaphore:
return await coro
return await asyncio.gather(*(sem_coro(c) for c in coros))
@app.get('/app/test')
def req_test():
time.sleep(1)
return {"test": "ok"}
if __name__ == "__main__":
# Start the loop
config = Config()
config.bind = [f"0.0.0.0:12000"]
config.workers = 1
if sys.platform == 'win32':
logger.info("Setting proactor event loop for Windows platform.")
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
loop.run_until_complete(gather_with_concurrency(conn_limit, serve(app, config)))
loop.close()
Obs: 이 스크립트는 다음에서 테스트되었습니다.Python 3.7.16
Apache JMeter의 1,000명의 근로자에 대해.
언급URL : https://stackoverflow.com/questions/48483348/how-to-limit-concurrency-with-python-asyncio
'prosource' 카테고리의 다른 글
"while :" vs "while true" (0) | 2023.05.28 |
---|---|
Swift는 설명서 생성을 지원합니까? (0) | 2023.05.28 |
C# 배열에서 중복 항목을 제거하려면 어떻게 해야 합니까? (0) | 2023.05.28 |
Node.js + Nginx - 이제 어떻게 됩니까? (0) | 2023.05.28 |
C#에서 VB의 As() 및 Chr() 함수에 해당하는 것은 무엇입니까? (0) | 2023.05.28 |