IT TIP

Python 다중 처리를 사용하여 난처한 병렬 문제 해결

itqueen 2020. 10. 12. 21:19
반응형

Python 다중 처리를 사용하여 난처한 병렬 문제 해결


당황스럽게 병렬 문제 를 해결하기 위해 다중 처리어떻게 사용 합니까?

당황스러운 병렬 문제는 일반적으로 세 가지 기본 부분으로 구성됩니다.

  1. 파일, 데이터베이스, tcp 연결 등에서 입력 데이터를 읽습니다 .
  2. 입력 데이터에 대해 계산을 실행 합니다. 여기서 각 계산은 다른 계산과 독립적입니다 .
  3. 계산 결과를 작성 합니다 (파일, 데이터베이스, tcp 연결 등).

프로그램을 두 가지 차원으로 병렬화 할 수 있습니다.

  • 파트 2는 각 계산이 독립적이므로 여러 코어에서 실행할 수 있습니다. 처리 순서는 중요하지 않습니다.
  • 각 부분은 독립적으로 실행할 수 있습니다. 파트 1은 데이터를 입력 큐에 배치하고, 파트 2는 데이터를 입력 큐에서 가져와 출력 큐에 넣을 수 있으며, 파트 3은 결과를 출력 큐에서 가져 와서 쓸 수 있습니다.

이것은 동시 프로그래밍에서 가장 기본적인 패턴으로 보이지만 여전히 풀려고 노력하지 않고 있으므로 이것이 어떻게 multiprocessing을 사용하여 수행되는지를 설명하는 표준 예제를 작성해 보겠습니다 .

다음은 예제 문제입니다. 입력으로 정수 행이 있는 CSV 파일주어지면 합계를 계산합니다. 문제를 세 부분으로 분리하면 모두 병렬로 실행될 수 있습니다.

  1. 입력 파일을 원시 데이터 (정수 목록 / 반복 가능)로 처리
  2. 병렬로 데이터의 합계를 계산합니다.
  3. 합계 출력

다음은 이러한 세 가지 작업을 해결하는 전통적인 단일 프로세스 바인딩 Python 프로그램입니다.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# basicsums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file.
"""

import csv
import optparse
import sys

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    return cli_parser


def parse_input_csv(csvfile):
    """Parses the input CSV and yields tuples with the index of the row
    as the first element, and the integers of the row as the second
    element.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.reader` instance

    """
    for i, row in enumerate(csvfile):
        row = [int(entry) for entry in row]
        yield i, row


def sum_rows(rows):
    """Yields a tuple with the index of each input list of integers
    as the first element, and the sum of the list of integers as the
    second element.

    The index is zero-index based.

    :Parameters:
    - `rows`: an iterable of tuples, with the index of the original row
      as the first element, and a list of integers as the second element

    """
    for i, row in rows:
        yield i, sum(row)


def write_results(csvfile, results):
    """Writes a series of results to an outfile, where the first column
    is the index of the original row of data, and the second column is
    the result of the calculation.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.writer` instance to which to write results
    - `results`: an iterable of tuples, with the index (zero-based) of
      the original row as the first element, and the calculated result
      from that row as the second element

    """
    for result_row in results:
        csvfile.writerow(result_row)


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)
    # gets an iterable of rows that's not yet evaluated
    input_rows = parse_input_csv(in_csvfile)
    # sends the rows iterable to sum_rows() for results iterable, but
    # still not evaluated
    result_rows = sum_rows(input_rows)
    # finally evaluation takes place as a chain in write_results()
    write_results(out_csvfile, result_rows)
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

이 프로그램을 사용하여 위에 설명 된 세 부분을 병렬화하기 위해 다중 처리를 사용하도록 다시 작성해 보겠습니다. 아래는 주석의 부분을 다루기 위해 구체화해야하는이 새로운 병렬 프로그램의 골격입니다.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)

    # Parse the input file and add the parsed data to a queue for
    # processing, possibly chunking to decrease communication between
    # processes.

    # Process the parsed data as soon as any (chunks) appear on the
    # queue, using as many processes as allotted by the user
    # (opts.numprocs); place results on a queue for output.
    #
    # Terminate processes when the parser stops putting data in the
    # input queue.

    # Write the results to disk as soon as they appear on the output
    # queue.

    # Ensure all child processes have terminated.

    # Clean up files.
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

이러한 코드와 테스트 목적으로 예제 CSV 파일생성 할 수있는 다른 코드는 github에서 찾을 수 있습니다 .

동시성 전문가 가이 문제에 어떻게 접근하는지에 대한 통찰력을 높이고 싶습니다.


이 문제에 대해 생각할 때 몇 가지 질문이 있습니다. 일부 / 모두 해결에 대한 보너스 포인트 :

  • 데이터를 읽고 큐에 배치하기위한 자식 프로세스가 있어야합니까? 아니면 모든 입력을 읽을 때까지 차단하지 않고 주 프로세스가이를 수행 할 수 있습니까?
  • 마찬가지로, 처리 된 대기열에서 결과를 작성하기위한 자식 프로세스가 있어야합니까? 아니면 모든 결과를 기다릴 필요없이 주 프로세스가이를 수행 할 수 있습니까?
  • 합계 작업에 프로세스 풀사용해야 합니까?
    • 그렇다면 입력 및 출력 프로세스도 차단하지 않고 입력 대기열로 들어오는 결과를 처리하기 위해 풀에서 어떤 메서드를 호출해야합니까? apply_async () ? map_async () ? imap () ? imap_unorder () ?
  • 데이터가 입력 될 때 입력 및 출력 큐를 빼낼 필요가 없지만 모든 입력이 구문 분석되고 모든 결과가 계산 될 때까지 기다릴 수 있다고 가정합니다 (예 : 모든 입력 및 출력이 시스템 메모리에 맞을 것임을 알고 있기 때문입니다). 어떤 방식 으로든 알고리즘을 변경해야합니까 (예 : I / O와 동시에 프로세스를 실행하지 않음)?

내 솔루션에는 출력 순서가 입력 순서와 동일한 지 확인하기 위해 추가 벨과 휘파람이 있습니다. 저는 multiprocessing.queue를 사용하여 프로세스간에 데이터를 전송하고 중지 메시지를 전송하여 각 프로세스가 큐 확인을 종료하도록 알립니다. 나는 소스의 코멘트가 무슨 일이 일어나고 있는지 명확하게해야한다고 생각하지만 알려주지 않는다면 말이다.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser

class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()

        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]

        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()

        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1

        self.pout.join()
        self.infile.close()

    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.

            The index is zero-index based.

            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input process sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )

            for i in range(self.numprocs):
                self.inq.put("STOP")

    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")

    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.

        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across processes so open/close
        # and use it all in the same process or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)

        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1

        outfile.close()

def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")

    c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
    main(sys.argv[1:])

파티에 늦게 오는 중 ...

joblib has a layer on top of multiprocessing to help making parallel for loops. It gives you facilities like a lazy dispatching of jobs, and better error reporting in addition to its very simple syntax.

As a disclaimer, I am the original author of joblib.


I realize that I'm a bit late for the party, but I've recently discovered GNU parallel, and want to show how easy it is to accomplish this typical task with it.

cat input.csv | parallel ./sum.py --pipe > sums

Something like this will do for sum.py:

#!/usr/bin/python

from sys import argv

if __name__ == '__main__':
    row = argv[-1]
    values = (int(value) for value in row.split(','))
    print row, ':', sum(values)

Parallel will run sum.py for every line in input.csv (in parallel, of course), then output the results to sums. Clearly better than multiprocessing hassle


Old School.

p1.py

import csv
import pickle
import sys

with open( "someFile", "rb" ) as source:
    rdr = csv.reader( source )
    for line in eumerate( rdr ):
        pickle.dump( line, sys.stdout )

p2.py

import pickle
import sys

while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    pickle.dump( i, sum(row) )

p3.py

import pickle
import sys
while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    print i, row

Here's the multi-processing final structure.

python p1.py | python p2.py | python p3.py

Yes, the shell has knit these together at the OS level. It seems simpler to me and it works very nicely.

Yes, there's slightly more overhead in using pickle (or cPickle). The simplification, however, seems worth the effort.

If you want the filename to be an argument to p1.py, that's an easy change.

More importantly, a function like the following is very handy.

def get_stdin():
    while True:
        try:
            yield pickle.load( sys.stdin )
        except EOFError:
            return

That allows you to do this:

for item in get_stdin():
     process item

This is very simple, but it does not easily allow you to have multiple copies of P2.py running.

You have two problems: fan-out and fan-in. The P1.py must somehow fan out to multiple P2.py's. And the P2.py's must somehow merge their results into a single P3.py.

The old-school approach to fan-out is a "Push" architecture, which is very effective.

Theoretically, multiple P2.py's pulling from a common queue is the optimal allocation of resources. This is often ideal, but it's also a fair amount of programming. Is the programming really necessary? Or will round-robin processing be good enough?

Practically, you'll find that making P1.py do a simple "round robin" dealing among multiple P2.py's may be quite good. You'd have P1.py configured to deal to n copies of P2.py via named pipes. The P2.py's would each read from their appropriate pipe.

What if one P2.py gets all the "worst case" data and runs way behind? Yes, round-robin isn't perfect. But it's better than only one P2.py and you can address this bias with simple randomization.

Fan-in from multiple P2.py's to one P3.py is a bit more complex, still. At this point, the old-school approach stops being advantageous. P3.py needs to read from multiple named pipes using the select library to interleave the reads.


It's probably possible to introduce a bit of parallelism into part 1 as well. Probably not an issue with a format that's as simple as CSV, but if the processing of the input data is noticeably slower than the reading of the data, you could read larger chunks, then continue to read until you find a "row separator" (newline in the CSV case, but again that depends on the format read; doesn't work if the format is sufficiently complex).

These chunks, each probably containing multiple entries, can then be farmed off to a crowd of parallel processes reading jobs off a queue, where they're parsed and split, then placed on the in-queue for stage 2.

참고URL : https://stackoverflow.com/questions/2359253/solving-embarassingly-parallel-problems-using-python-multiprocessing

반응형