programing

여러 프로세스에서 단일 파일 처리

nasanasas 2020. 11. 21. 11:19
반응형

여러 프로세스에서 단일 파일 처리


각 줄을 처리하고 (일부 작업을 수행) 데이터베이스에 저장하려는 하나의 큰 텍스트 파일이 있습니다. 하나의 간단한 프로그램이 너무 오래 걸리기 때문에 여러 프로세스 또는 스레드를 통해 수행되기를 바랍니다. 각 스레드 / 프로세스는 해당 단일 파일에서 다른 데이터 (다른 줄)를 읽고 데이터 조각 (줄)에 대해 몇 가지 작업을 수행하고 데이터베이스에 넣어 결국 처리 된 데이터 전체와 내 데이터베이스는 필요한 데이터로 덤프됩니다.

그러나 나는 이것에 접근하는 방법을 알 수 없습니다.


찾고있는 것은 생산자 / 소비자 패턴입니다.

기본 스레딩 예

다음은 스레딩 모듈을 사용하는 기본 예 입니다 (다중 처리 대신).

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

스레드와 파일 객체를 공유하지 않습니다. 데이터 줄을 에 제공하여 작업을 생성 합니다. 그런 다음 각 스레드는 라인을 선택하고 처리 한 다음 대기열에 반환합니다.

목록 및 특별한 종류의 Queue 와 같은 데이터를 공유하기 위해 다중 처리 모듈내장 된 고급 기능이 있습니다 . 멀티 프로세싱과 스레드를 사용하는 데는 장단점이 있으며 작업이 CPU 바인딩인지 IO 바인딩인지에 따라 다릅니다.

기본 멀티 프로세싱 풀 예제

다음은 다중 처리 풀의 기본 예입니다.

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

은 자체 프로세스를 관리하는 편리한 개체입니다. 열린 파일은 해당 줄을 반복 할 수 있으므로 파일을으로 전달할 수 있습니다. 그러면 파일을 반복 pool.map()하고 작업자 함수에 줄을 전달합니다. 블록을 매핑 하고 완료되면 전체 결과를 반환합니다. 이것은 지나치게 단순화 된 예이며 pool.map()는 작업을 수행하기 전에 한 번에 전체 파일을 메모리로 읽어 들일 것입니다. 대용량 파일이 예상되는 경우이를 염두에 두십시오. 생산자 / 소비자 설정을 설계하는 더 고급 방법이 있습니다.

제한 및 라인 재 분류가있는 수동 "풀"

이것은 Pool.map 의 수동 예제 이지만 한 번에 전체 이터 러블 을 소비하는 대신 대기열 크기를 설정하여 처리 할 수있는 한 빨리 하나씩 만 공급하도록 할 수 있습니다. 또한 나중에 원하는 경우 추적하고 참조 할 수 있도록 줄 번호를 추가했습니다.

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)

여기 내가 만든 정말 멍청한 예가 있습니다.

import os.path
import multiprocessing

def newlinebefore(f,n):
    f.seek(n)
    c=f.read(1)
    while c!='\n' and n > 0:
        n-=1
        f.seek(n)
        c=f.read(1)

    f.seek(n)
    return n

filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)

#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)

#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.


with open(filename,'r') as f:
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))

end_byte=[i-1 for i in start_byte] [1:] + [None]

def process_piece(filename,start,end):
    with open(filename,'r') as f:
        f.seek(start+1)
        if(end is None):
            text=f.read()
        else: 
            nbytes=end-start+1
            text=f.read(nbytes)

    # process text here. createing some object to be returned
    # You could wrap text into a StringIO object if you want to be able to
    # read from it the way you would a file.

    returnobj=text
    return returnobj

def wrapper(args):
    return process_piece(*args)

filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)

pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)

#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...

The tricky part here is to make sure that we split the file on newline characters so that you don't miss any lines (or only read partial lines). Then, each process reads it's part of the file and returns an object which can be put into the database by the main thread. Of course, you may even need to do this part in chunks so that you don't have to keep all of the information in memory at once. (this is quite easily accomplished -- just split the "args" list into X chunks and call pool.map(wrapper,chunk) -- See here)


well break the single big file into multiple smaller files and have each of them processed in separate threads.

참고URL : https://stackoverflow.com/questions/11196367/processing-single-file-from-multiple-processes

반응형