python - end daemon processes with multiprocessing module -


i include example usage of multiprocessing below. process pool model. not simple might be, relatively close in structure code i'm using. uses sqlalchemy, sorry.

my question - have situation have relatively long running python script executing number of functions each code below, parent process same in cases. in other words, multiple pools created 1 python script. (i don't have way, suppose, alternative use os.system , subprocess.) problem these processes hang around , hold on memory. docs these daemon processes supposed stick around till parent process exits, if parent process goes on generate pool or processes , doesn't exit immediately.

calling terminate() works, doesn't seem terribly polite. there way ask processes terminate nicely? i.e. clean after , go away now, need start next pool?

i tried calling join() on processes. according documentation means wait processes terminate. if don't plan terminate? happens process hangs.

thanks in advance.

regards, faheem.

import multiprocessing, time  class worker(multiprocessing.process):     """process executing tasks given tasks queue"""     def __init__(self, queue, num):         multiprocessing.process.__init__(self)         self.num = num         self.queue = queue         self.daemon = true      def run(self):         import traceback         while true:             func, args, kargs = self.queue.get()             try:                 print "trying %s args %s"%(func.__name__, args)                 func(*args, **kargs)             except:                 traceback.print_exc()             self.queue.task_done()  class processpool:     """pool of threads consuming tasks queue"""     def __init__(self, num_threads):         self.queue = multiprocessing.joinablequeue()         self.workerlist = []         self.num = num_threads         in range(num_threads):             self.workerlist.append(worker(self.queue, i))      def add_task(self, func, *args, **kargs):         """add task queue"""         self.queue.put((func, args, kargs))      def start(self):         w in self.workerlist:             w.start()      def wait_completion(self):         """wait completion of tasks in queue"""         self.queue.join()         worker in self.workerlist:             print worker.__dict__             #worker.terminate()        <--- terminate used here               worker.join()              <--- join used here  start = time.time()  sqlalchemy import * sqlalchemy.orm import *  dbuser = '' password = '' dbname = '' dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname) db = create_engine(dbstring, echo=true) m = metadata(db)  def make_foo(i):     t1 = table('foo%s'%i, m, column('a', integer, primary_key=true))  conn = db.connect() in range(10):     conn.execute("drop table if exists foo%s"%i) conn.close()  in range(10):     make_foo(i)  m.create_all()  def do(i, dbstring):     dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)     db = create_engine(dbstring, echo=true)     session = scoped_session(sessionmaker())     session.configure(bind=db)     session.execute("alter table foo%s set ( autovacuum_enabled = false );"%i)     session.execute("alter table foo%s set ( autovacuum_enabled = true );"%i)     session.commit()  pool = processpool(5) in range(10):     pool.add_task(do, i, dbstring) pool.start() pool.wait_completion() 

you know multiprocessing has classes worker pools, right?

the standard way send threads quit signal:

queue.put(("quit", none, none)) 

then check it:

if func == "quit":     return 

Comments

Popular posts from this blog

java - SNMP4J General Variable Binding Error -

windows - Python Service Installation - "Could not find PythonClass entry" -

Determine if a XmlNode is empty or null in C#? -