IT TIP

Django 다중 처리 및 데이터베이스 연결

itqueen 2020. 10. 12. 21:16
반응형

Django 다중 처리 및 데이터베이스 연결


배경:

Postgres 데이터베이스와 함께 Django를 사용하는 프로젝트를 진행 중입니다. 내 웹 검색 중 일부가 언급했기 때문에 중요한 경우 mod_wsgi를 사용하고 있습니다. 웹 양식 제출에서 Django 뷰는 상당한 시간 (사용자가 기다리는 것보다 더 많이)이 걸리는 작업을 시작하므로 백그라운드에서 시스템 호출을 통해 작업을 시작합니다. 현재 실행중인 작업은 데이터베이스를 읽고 쓸 수 있어야합니다. 이 작업은 너무 오래 걸리기 때문에 다중 처리를 사용하여 일부를 병렬로 실행합니다.

문제:

최상위 스크립트에는 데이터베이스 연결이 있으며 자식 프로세스를 생성 할 때 부모의 연결을 자식이 사용할 수있는 것처럼 보입니다. 그런 다음 쿼리 전에 SET TRANSACTION ISOLATION LEVEL을 호출해야하는 방법에 대한 예외가 있습니다. 연구에 따르면 이는 여러 프로세스에서 동일한 데이터베이스 연결을 사용하려고하기 때문입니다. 내가 찾은 한 스레드는 자식 프로세스의 시작 부분에서 connection.close ()를 호출하여 Django가 필요할 때 자동으로 새 연결을 생성하므로 각 자식 프로세스가 고유 한 연결을 갖도록 제안했습니다. 즉, 공유되지 않습니다. 자식 프로세스에서 connection.close ()를 호출하면 부모 프로세스가 연결이 끊어 졌다고 불평했기 때문에 이것은 작동하지 않았습니다.

기타 결과 :

내가 읽은 일부 내용은 실제로 이것을 할 수 없으며 다중 처리, mod_wsgi 및 Django가 함께 잘 작동하지 않는다는 것을 나타내는 것 같습니다. 믿을 수없는 것 같아요.

일부는 장기적인 해결책이 될 수있는 셀러리 사용을 제안했지만 현재로서는 셀러리를 설치할 수 없어 일부 승인 프로세스를 보류 중이 어서 지금은 옵션이 아닙니다.

SO 및 다른 곳에서 영구 데이터베이스 연결에 대한 몇 가지 참조를 찾았으며 이는 다른 문제라고 생각합니다.

또한 psycopg2.pool 및 pgpool에 대한 참조와 bouncer에 대한 내용을 찾았습니다. 사실, 나는 내가 읽고있는 내용을 대부분 이해하지 못했지만, 내가 찾던 것이 확실하게 튀어 나오지는 않았습니다.

현재 "해결 방법":

지금은 직렬로 실행하는 것으로 돌아가서 작동하지만 원하는 것보다 느립니다.

다중 처리를 사용하여 병렬로 실행하는 방법에 대한 제안이 있습니까? 내가 부모를 가질 수 있고 두 자녀가 모두 데이터베이스에 독립적 인 연결을 가질 수 있다면 모든 것이 괜찮을 것 같지만 그 행동을 얻을 수없는 것 같습니다.

감사합니다. 길이 죄송합니다!


다중 처리는 프로세스를 분기하기 때문에 프로세스간에 연결 개체를 복사하므로 상위 프로세스의 모든 파일 설명자를 복사합니다. 즉, SQL 서버에 대한 연결은 파일 일 뿐이며 Linux에서 / proc // fd / .... 아래에서 볼 수 있습니다. 열려있는 모든 파일은 분기 된 프로세스간에 공유됩니다. 여기에서 포크에 대해 자세히 알아볼 수 있습니다 .

내 솔루션은 프로세스를 시작하기 직전에 db 연결을 닫는 것입니다. 각 프로세스는 필요할 때 연결 자체를 다시 만듭니다 (django 1.4에서 테스트 됨).

from django import db
db.connections.close_all()
def db_worker():      
    some_paralell_code()
Process(target = db_worker,args = ())

Pgbouncer / pgpool은 다중 처리의 의미에서 스레드와 연결되지 않습니다. 각 요청에 대해 연결을 닫지 않는 대신 높은 부하를받는 동안 postgres에 대한 연결 속도를 높이는 것입니다.

최신 정보:

데이터베이스 연결 문제를 완전히 제거하려면 데이터베이스와 연결된 모든 논리를 db_worker로 이동하기 만하면됩니다.-QueryDict를 인수로 전달하고 싶었습니다. 더 나은 아이디어는 단순히 ID 목록을 전달하는 것입니다 ... QueryDict 및 values_list ( 'id', flat =)를 참조하십시오. 사실), 목록으로 바꾸는 것을 잊지 마십시오! db_worker에 전달하기 전에 list (QueryDict). 덕분에 모델 데이터베이스 연결을 복사하지 않습니다.

def db_worker(models_ids):        
    obj = PartModelWorkerClass(model_ids) # here You do Model.objects.filter(id__in = model_ids)
    obj.run()


model_ids = Model.objects.all().values_list('id', flat=True)
model_ids = list(model_ids) # cast to list
process_count = 5
delta = (len(model_ids) / process_count) + 1

# do all the db stuff here ...

# here you can close db connection
from django import db
db.connections.close_all()

for it in range(0:process_count):
    Process(target = db_worker,args = (model_ids[it*delta:(it+1)*delta]))   

여러 데이터베이스를 사용하는 경우 모든 연결을 닫아야합니다.

from django import db
for connection_name in db.connections.databases:
    db.connections[connection_name].close()

편집하다

모든 연결을 닫으려면 언급 된 @lechup과 동일하게 사용하십시오 (이 메서드가 추가 된 장고 버전 이후 확실하지 않음).

from django import db
db.connections.close_all()

For Python 3 and Django 1.9 this is what worked for me:

import multiprocessing
import django
django.setup() # Must call setup

def db_worker():
    for name, info in django.db.connections.databases.items(): # Close the DB connections
        django.db.connection.close()
    # Execute parallel code here

if __name__ == '__main__':
    multiprocessing.Process(target=db_worker)

Note that without the django.setup() I could not get this to work. I am guessing something needs to be initialized again for multiprocessing.


I had "closed connection" issues when running Django test cases sequentially. In addition to the tests, there is also another process intentionally modifying the database during test execution. This process is started in each test case setUp().

A simple fix was to inherit my test classes from TransactionTestCase instead of TestCase. This makes sure that the database was actually written, and the other process has an up-to-date view on the data.


(not a great solution, but a possible workaround)

if you can't use celery, maybe you could implement your own queueing system, basically adding tasks to some task table and having a regular cron that picks them off and processes? (via a management command)


Hey I ran into this issue and was able to resolve it by performing the following (we are implementing a limited task system)

task.py

from django.db import connection

def as_task(fn):
    """  this is a decorator that handles task duties, like setting up loggers, reporting on status...etc """ 
    connection.close()  #  this is where i kill the database connection VERY IMPORTANT
    # This will force django to open a new unique connection, since on linux at least
    # Connections do not fare well when forked 
    #...etc

ScheduledJob.py

from django.db import connection

def run_task(request, job_id):
    """ Just a simple view that when hit with a specific job id kicks of said job """ 
    # your logic goes here
    # ...
    processor = multiprocessing.Queue()
    multiprocessing.Process(
        target=call_command,  # all of our tasks are setup as management commands in django
        args=[
            job_info.management_command,
        ],
        kwargs= {
            'web_processor': processor,
        }.items() + vars(options).items()).start()

result = processor.get(timeout=10)  # wait to get a response on a successful init
# Result is a tuple of [TRUE|FALSE,<ErrorMessage>]
if not result[0]:
    raise Exception(result[1])
else:
   # THE VERY VERY IMPORTANT PART HERE, notice that up to this point we haven't touched the db again, but now we absolutely have to call connection.close()
   connection.close()
   # we do some database accessing here to get the most recently updated job id in the database

Honestly, to prevent race conditions (with multiple simultaneous users) it would be best to call database.close() as quickly as possible after you fork the process. There may still be a chance that another user somewhere down the line totally makes a request to the db before you have a chance to flush the database though.

In all honesty it would likely be safer and smarter to have your fork not call the command directly, but instead call a script on the operating system so that the spawned task runs in its own django shell!


You could give more resources to Postgre, in Debian/Ubuntu you can edit :

nano /etc/postgresql/9.4/main/postgresql.conf

by replacing 9.4 by your postgre version .

Here are some useful lines that should be updated with example values to do so, names speak for themselves :

max_connections=100
shared_buffers = 3000MB
temp_buffers = 800MB
effective_io_concurrency = 300
max_worker_processes = 80

Be careful not to boost too much these parameters as it might lead to errors with Postgre trying to take more ressources than available. Examples above are running fine on a Debian 8GB Ram machine equiped with 4 cores.


If all you need is I/O parallelism and not processing parallelism, you can avoid this problem by switch your processes to threads. Replace

from multiprocessing import Process

with

from threading import Thread

The Thread object has the same interface as Procsess

참고URL : https://stackoverflow.com/questions/8242837/django-multiprocessing-and-database-connections

반응형