prosource

다중 처리 : tqdm을 사용하여 진행률 표시줄 표시

probook 2023. 6. 7. 22:59
반응형

다중 처리 : tqdm을 사용하여 진행률 표시줄 표시

내 코드를 더 "파이토닉"하고 더 빠르게 만들기 위해, 나는 사용합니다.multiprocessing a)a) 반복 범위 b) 전송하기 위한 맵 입니다.

된 용액 이된솔즉션루(식, 출호))을 호출함)tqdm 거리에직접사정▁range▁on직 범위에.tqdm.tqdm(range(0, 30)))는 아래 코드에 공식화된 대로 멀티프로세싱에서 작동하지 않습니다.

진행률 표시줄은 0 ~ 100%(파이썬이 코드를 읽을 때?)로 표시되지만 지도 기능의 실제 진행률은 표시되지 않습니다.

'맵' 기능이 어느 단계에 있는지 알려주는 진행 표시줄을 어떻게 표시할 수 있습니까?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

어떤 도움이나 제안이라도 환영합니다...

사용하다imapmap처리된 값의 반복기를 반환합니다.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))

, 는 이 을 늦서미안지만동것지면도, 다이기추니습했에 했습니다.tqdm>=4.42.0:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

참조 자료: https://tqdm.github.io/docs/contrib.concurrent/ 및 https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

을 지원합니다.max_workers그리고.chunksize은 쉽전수할있다습니환에서 할 수 .process_mapthread_map.

솔루션을 찾았습니다.조심해!멀티프로세싱으로 인해 추정시간(루프당 반복시간, 총시간 등)이 불안정할 수 있으나 진행 표시줄이 완벽하게 작동합니다.

고참: 컨트관리의 :PoolPython 3.3+에서만 사용할 수 있습니다.

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for _ in p.imap_unordered(_foo, range(0, max_)):
                pbar.update()

사용할 수 있습니다.p_tqdm대신.

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))

사비 마르티네스의 대답을 바탕으로 나는 함수를 작성했습니다.imap_unordered_bar다음과 같은 방식으로 사용할 수 있습니다.imap_unordered처리 막대가 표시되는 유일한 차이입니다.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))

apply_async가 있는 진행률 표시줄의 경우 다음 코드를 사용할 수 있습니다.

https://github.com/tqdm/tqdm/issues/484

import time
import random
from multiprocessing import Pool
from tqdm import tqdm

def myfunc(a):
    time.sleep(random.random())
    return a ** 2

pool = Pool(2)
pbar = tqdm(total=100)

def update(*a):
    pbar.update()

for i in range(pbar.total):
    pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()
import multiprocessing as mp
import tqdm


iterable = ... 
num_cpu = mp.cpu_count() - 2 # dont use all cpus.


def func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(num_cpu) as p:
        list(tqdm.tqdm(p.imap(func, iterable), total=len(iterable)))

병렬 실행 기능에서 결과를 반환해야 하는 경우에 대한 제 견해는 다음과 같습니다.이 함수는 몇 가지 작업을 수행하지만(자세한 내용을 설명하는 다른 게시물이 있음) 핵심은 대기열에 대기 중인 작업과 완료된 작업 대기열이 있다는 것입니다.작업자가 보류 대기열에서 각 작업을 완료하면 완료된 작업 대기열에 결과를 추가합니다.tqdm 진행률 표시줄을 사용하여 검사를 완료된 태스크 대기열로 래핑할 수 있습니다.여기에 do_work() 함수의 구현을 넣는 것이 아닙니다. 작업이 완료된 대기열을 모니터링하고 결과가 나올 때마다 진행률 표시줄을 업데이트하라는 메시지이기 때문에 관련이 없습니다.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results

"user17242583" 답변을 바탕으로 다음과 같은 기능을 만들었습니다.Pool.map만큼 빨라야 하며 항상 결과가 정렬됩니다.또한 단일 반복 가능한 매개 변수가 아니라 원하는 만큼 함수에 전달할 수 있습니다.

from multiprocessing import Pool
from functools import partial
from tqdm import tqdm


def imap_tqdm(function, iterable, processes, chunksize=1, desc=None, disable=False, **kwargs):
    """
    Run a function in parallel with a tqdm progress bar and an arbitrary number of arguments.
    Results are always ordered and the performance should be the same as of Pool.map.
    :param function: The function that should be parallelized.
    :param iterable: The iterable passed to the function.
    :param processes: The number of processes used for the parallelization.
    :param chunksize: The iterable is based on the chunk size chopped into chunks and submitted to the process pool as separate tasks.
    :param desc: The description displayed by tqdm in the progress bar.
    :param disable: Disables the tqdm progress bar.
    :param kwargs: Any additional arguments that should be passed to the function.
    """
    if kwargs:
        function_wrapper = partial(_wrapper, function=function, **kwargs)
    else:
        function_wrapper = partial(_wrapper, function=function)

    results = [None] * len(iterable)
    with Pool(processes=processes) as p:
        with tqdm(desc=desc, total=len(iterable), disable=disable) as pbar:
            for i, result in p.imap_unordered(function_wrapper, enumerate(iterable), chunksize=chunksize):
                results[i] = result
                pbar.update()
    return results


def _wrapper(enum_iterable, function, **kwargs):
    i = enum_iterable[0]
    result = function(enum_iterable[1], **kwargs)
    return i, result

이 접근법은 간단하고 효과적입니다.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()

언급URL : https://stackoverflow.com/questions/41920124/multiprocessing-use-tqdm-to-display-a-progress-bar

반응형