python - end daemon processes with multiprocessing module -
i include example usage of multiprocessing below. process pool model. not simple might be, relatively close in structure code i'm using. uses sqlalchemy, sorry.
my question - have situation have relatively long running python script executing number of functions each code below, parent process same in cases. in other words, multiple pools created 1 python script. (i don't have way, suppose, alternative use os.system , subprocess.) problem these processes hang around , hold on memory. docs these daemon processes supposed stick around till parent process exits, if parent process goes on generate pool or processes , doesn't exit immediately.
calling terminate() works, doesn't seem terribly polite. there way ask processes terminate nicely? i.e. clean after , go away now, need start next pool?
i tried calling join() on processes. according documentation means wait processes terminate. if don't plan terminate? happens process hangs.
thanks in advance.
regards, faheem.
import multiprocessing, time class worker(multiprocessing.process): """process executing tasks given tasks queue""" def __init__(self, queue, num): multiprocessing.process.__init__(self) self.num = num self.queue = queue self.daemon = true def run(self): import traceback while true: func, args, kargs = self.queue.get() try: print "trying %s args %s"%(func.__name__, args) func(*args, **kargs) except: traceback.print_exc() self.queue.task_done() class processpool: """pool of threads consuming tasks queue""" def __init__(self, num_threads): self.queue = multiprocessing.joinablequeue() self.workerlist = [] self.num = num_threads in range(num_threads): self.workerlist.append(worker(self.queue, i)) def add_task(self, func, *args, **kargs): """add task queue""" self.queue.put((func, args, kargs)) def start(self): w in self.workerlist: w.start() def wait_completion(self): """wait completion of tasks in queue""" self.queue.join() worker in self.workerlist: print worker.__dict__ #worker.terminate() <--- terminate used here worker.join() <--- join used here start = time.time() sqlalchemy import * sqlalchemy.orm import * dbuser = '' password = '' dbname = '' dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname) db = create_engine(dbstring, echo=true) m = metadata(db) def make_foo(i): t1 = table('foo%s'%i, m, column('a', integer, primary_key=true)) conn = db.connect() in range(10): conn.execute("drop table if exists foo%s"%i) conn.close() in range(10): make_foo(i) m.create_all() def do(i, dbstring): dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname) db = create_engine(dbstring, echo=true) session = scoped_session(sessionmaker()) session.configure(bind=db) session.execute("alter table foo%s set ( autovacuum_enabled = false );"%i) session.execute("alter table foo%s set ( autovacuum_enabled = true );"%i) session.commit() pool = processpool(5) in range(10): pool.add_task(do, i, dbstring) pool.start() pool.wait_completion()
you know multiprocessing has classes worker pools, right?
the standard way send threads quit signal:
queue.put(("quit", none, none))
then check it:
if func == "quit": return
Comments
Post a Comment