postgresql - Python SQLAlchemy Update Postgres Record -


i'm trying update row on database (the asynchronous way) using multiprocessing module. code has simple function create_member insert data on table , create process maybe change data. problem session passed async_create_member closing database connection, , next requisition psycopg's error:

(interface error) connection closed  

here's code:

def create_member(self, data):     member = self.entity(**data)     self.session.add(member)     name in data:         setattr(member, name, data[name])     self.session.commit()     self.session.close()     if self.index.is_indexable:         process(target=self.async_create_member,             args=(data, self.session)).start()     return member  def async_create_member(self, data, session):     ok, data = self.index.create(data)     if ok:          datacopy = data.copy()         data.clear()         data['document'] = datacopy['document']         data['dt_idx'] = datacopy['dt_idx']         stmt = update(self.entity.__table__).where(             self.entity.__table__.c.id_doc == datacopy['id_doc'])\             .values(**data)          session.begin()         session.execute(stmt)         session.commit()         session.close() 

i possibly solve creating new connetion on async_create_member, leaving idle transactions on postgres:

engine = create_new_engine() conn = engine.connect() conn.execute(stmt) conn.close() 

what should now? there way solve first code? or should keep creating new connections create_new_engine function? should use threads or processes ?

you can't reuse sessions across threads or processes. sessions aren't thread safe, , connectivity underlies session isn't inherited cleanly across processes. error message getting accurate, if uninformative: db connection indeed closed if try use after inheriting across process boundary.

in cases, yes, should create session each process in multiprocessing setting.

if problem meets following conditions:

  • you doing lot of cpu-intensive processing each object
  • database writes relatively lightweight in comparison
  • you want use lot of processes (i on 8+ core machines)

it might worth while create single writer process owns session, , pass objects process. here's how works me (note: not meant runnable code):

import multiprocessing your_database_layer import create_new_session, whatevertype  work = multiprocessing.joinablequeue()  def writer(commit_every = 50):     global work     session = create_new_session()     counter = 0      while true:         item = work.get()         if item none:             break          session.add(item)         counter += 1         if counter % commit_every == 0:             session.commit()          work.task_done()      # last db writes     session.commit()      # mark final none in queue complete     work.task_done()     return   def very_expensive_object_creation(data):     global work     very_expensive_object = whatevertype(**data)     # perform lots of computation     work.put(very_expensive_object)     return   def main():     writer_process = multiprocessing.process(target=writer)     writer_process.start()      # create pool feed queue here, i.e.     workers = multiprocessing.pool()     # dispatch lots of work very_expensive_object_creation in parallel here     workers.map(very_expensive_object_creation, some_iterable_source_here)     # --or-- in whatever other way floats boat, such     workers.apply_async(very_expensive_object_creation, args=(some_data_1,))     workers.apply_async(very_expensive_object_creation, args=(some_data_2,))     # etc.      # signal won't dispatch more work     workers.close()      # wait creation work done     workers.join()      # trigger exit condition writer     work.put(none)      # wait queue emptied     work.join()      return 

Comments

Popular posts from this blog

c++ - QTextObjectInterface with Qml TextEdit (QQuickTextEdit) -

javascript - angular ng-required radio button not toggling required off in firefox 33, OK in chrome -

xcode - Swift Playground - Files are not readable -