postgresql - Python SQLAlchemy Update Postgres Record -
i'm trying update row on database (the asynchronous way) using multiprocessing
module. code has simple function create_member
insert data on table , create process maybe change data. problem session passed async_create_member
closing database connection, , next requisition psycopg's error:
(interface error) connection closed
here's code:
def create_member(self, data): member = self.entity(**data) self.session.add(member) name in data: setattr(member, name, data[name]) self.session.commit() self.session.close() if self.index.is_indexable: process(target=self.async_create_member, args=(data, self.session)).start() return member def async_create_member(self, data, session): ok, data = self.index.create(data) if ok: datacopy = data.copy() data.clear() data['document'] = datacopy['document'] data['dt_idx'] = datacopy['dt_idx'] stmt = update(self.entity.__table__).where( self.entity.__table__.c.id_doc == datacopy['id_doc'])\ .values(**data) session.begin() session.execute(stmt) session.commit() session.close()
i possibly solve creating new connetion on async_create_member
, leaving idle
transactions on postgres:
engine = create_new_engine() conn = engine.connect() conn.execute(stmt) conn.close()
what should now? there way solve first code? or should keep creating new connections create_new_engine
function? should use threads or processes ?
you can't reuse sessions across threads or processes. sessions aren't thread safe, , connectivity underlies session isn't inherited cleanly across processes. error message getting accurate, if uninformative: db connection indeed closed if try use after inheriting across process boundary.
in cases, yes, should create session each process in multiprocessing
setting.
if problem meets following conditions:
- you doing lot of cpu-intensive processing each object
- database writes relatively lightweight in comparison
- you want use lot of processes (i on 8+ core machines)
it might worth while create single writer process owns session, , pass objects process. here's how works me (note: not meant runnable code):
import multiprocessing your_database_layer import create_new_session, whatevertype work = multiprocessing.joinablequeue() def writer(commit_every = 50): global work session = create_new_session() counter = 0 while true: item = work.get() if item none: break session.add(item) counter += 1 if counter % commit_every == 0: session.commit() work.task_done() # last db writes session.commit() # mark final none in queue complete work.task_done() return def very_expensive_object_creation(data): global work very_expensive_object = whatevertype(**data) # perform lots of computation work.put(very_expensive_object) return def main(): writer_process = multiprocessing.process(target=writer) writer_process.start() # create pool feed queue here, i.e. workers = multiprocessing.pool() # dispatch lots of work very_expensive_object_creation in parallel here workers.map(very_expensive_object_creation, some_iterable_source_here) # --or-- in whatever other way floats boat, such workers.apply_async(very_expensive_object_creation, args=(some_data_1,)) workers.apply_async(very_expensive_object_creation, args=(some_data_2,)) # etc. # signal won't dispatch more work workers.close() # wait creation work done workers.join() # trigger exit condition writer work.put(none) # wait queue emptied work.join() return
Comments
Post a Comment