Multithreading Python FS Crawler -


i've written python function scours filesystem using provided directory pattern, optional 'actions' take provided @ each level. tried multi-threading since of volumes on network shares , want minimize io blocking. started using multiprocessing pool class, since convenient...(seriously, no pool class threading?) function unravels provided fs pattern as possible , submits newly returned paths pool until no new paths returned. got work great when using function , class directly, trying use function class , program seems hang. simplify rewrote function using threads instead of processes, , wrote simple threadpool class... same problem. here's simplified version of code still exhibits same problems:

file test1.py: ------------------------------------------------  import os import glob multiprocessing import pool  def mapglob(pool,paths,pattern):     results = []     paths = [os.path.join(p,pattern) p in paths]     result in pool.map(glob.glob,paths):         results += result     return results  def findallmypaths():     pool = pool(10)     paths = ['/volumes']     follow = ['**','ptid_*','expid_*','slkid_*']     pattern in follow:         paths = mapglob(pool,paths,pattern)     return paths   file test2.py: ----------------------------------------------------------------------------  test1 import findallmypaths  allmypaths = findallmypaths() 

now if call

>>>from test1 import findallmypaths >>>findallmypaths() >>>...long list of paths 

this works fine, if try:

>>>from test2 import allmypaths 

python hangs forever. action functions called (in example glob), never seem return... need please... parallelized version runs faster when works (6-20x faster depending on 'actions' being mapped @ each point in fs tree), i'd able use it.

also if change mapping function non-parallel version:

def mapglob(pool,paths,pattern):     results = []     paths = [os.path.join(p,pattern) p in paths]     path in paths:         results += glob.glob(path)     return results 

everything runs fine.

edit:

i turned on debugging in multiprocessing see if me further. in case works get:

[debug/mainprocess] created semlock handle 5 [debug/mainprocess] created semlock handle 6 [debug/mainprocess] created semlock handle 9 [debug/mainprocess] created semlock handle 10 [info/poolworker-1] child process calling self.run() [info/poolworker-2] child process calling self.run() [info/poolworker-3] child process calling self.run() [info/poolworker-5] child process calling self.run() [info/poolworker-4] child process calling self.run() [info/poolworker-6] child process calling self.run() [info/poolworker-7] child process calling self.run() [info/poolworker-9] child process calling self.run() [info/poolworker-8] child process calling self.run() [info/poolworker-10] child process calling self.run() [debug/mainprocess] closing pool [subdebug/mainprocess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.pool'>> args (<queue.queue instance @ 0x34af918>, <multiprocessing.queues.simplequeue object @ 0x3494950>, <multiprocessing.queues.simplequeue object @ 0x34a61b0>, [<process(poolworker-1, started daemon)>, <process(poolworker-2, started daemon)>, <process(poolworker-3, started daemon)>, <process(poolworker-4, started daemon)>, <process(poolworker-5, started daemon)>, <process(poolworker-6, started daemon)>, <process(poolworker-7, started daemon)>, <process(poolworker-8, started daemon)>, <process(poolworker-9, started daemon)>, <process(poolworker-10, started daemon)>], <thread(thread-1, started daemon -1341648896)>, <thread(thread-2, started daemon -1341116416)>, {}) , kwargs {} [debug/mainprocess] finalizing pool [debug/mainprocess] helping task handler/workers finish [debug/mainprocess] removing tasks inqueue until task handler finished [debug/mainprocess] task handler got sentinel [debug/mainprocess] task handler sending sentinel result handler [debug/mainprocess] task handler sending sentinel workers [debug/mainprocess] task handler exiting [debug/mainprocess] result handler got sentinel [debug/mainprocess] ensuring outqueue not full [debug/mainprocess] result handler exiting: len(cache)=0, thread._state=0 [debug/poolworker-2] worker got sentinel -- exiting [debug/poolworker-1] worker got sentinel -- exiting [info/poolworker-2] process shutting down [debug/poolworker-7] worker got sentinel -- exiting [info/poolworker-1] process shutting down [info/poolworker-7] process shutting down [debug/poolworker-7] running "atexit" finalizers priority >= 0 [debug/poolworker-1] running "atexit" finalizers priority >= 0 [debug/poolworker-7] running remaining "atexit" finalizers [info/poolworker-7] process exiting exitcode 0 [debug/poolworker-1] running remaining "atexit" finalizers [info/poolworker-1] process exiting exitcode 0 [debug/poolworker-5] worker got sentinel -- exiting [debug/poolworker-2] running "atexit" finalizers priority >= 0 [info/poolworker-5] process shutting down [debug/poolworker-5] running "atexit" finalizers priority >= 0 [debug/poolworker-2] running remaining "atexit" finalizers [debug/poolworker-5] running remaining "atexit" finalizers [info/poolworker-2] process exiting exitcode 0 [info/poolworker-5] process exiting exitcode 0 [debug/poolworker-6] worker got sentinel -- exiting [info/poolworker-6] process shutting down [debug/poolworker-6] running "atexit" finalizers priority >= 0 [debug/poolworker-6] running remaining "atexit" finalizers [info/poolworker-6] process exiting exitcode 0 [debug/poolworker-4] worker got sentinel -- exiting [debug/poolworker-9] worker got sentinel -- exiting [info/poolworker-9] process shutting down [debug/poolworker-9] running "atexit" finalizers priority >= 0 [debug/poolworker-9] running remaining "atexit" finalizers [info/poolworker-9] process exiting exitcode 0 [info/poolworker-4] process shutting down [debug/poolworker-4] running "atexit" finalizers priority >= 0 [debug/poolworker-4] running remaining "atexit" finalizers [info/poolworker-4] process exiting exitcode 0 [debug/poolworker-10] worker got sentinel -- exiting [info/poolworker-10] process shutting down [debug/poolworker-10] running "atexit" finalizers priority >= 0 [debug/poolworker-10] running remaining "atexit" finalizers [info/poolworker-10] process exiting exitcode 0 [debug/poolworker-8] worker got sentinel -- exiting [info/poolworker-8] process shutting down [debug/poolworker-8] running "atexit" finalizers priority >= 0 [debug/poolworker-8] running remaining "atexit" finalizers [info/poolworker-8] process exiting exitcode 0 [debug/poolworker-3] worker got sentinel -- exiting [info/poolworker-3] process shutting down [debug/poolworker-3] running "atexit" finalizers priority >= 0 [debug/poolworker-3] running remaining "atexit" finalizers [info/poolworker-3] process exiting exitcode 0 [debug/mainprocess] terminating workers [debug/mainprocess] joining task handler [debug/mainprocess] joining result handler [debug/mainprocess] joining pool workers 

and when doesn't is:

[debug/mainprocess] created semlock handle 6 [debug/mainprocess] created semlock handle 7 [debug/mainprocess] created semlock handle 10 [debug/mainprocess] created semlock handle 11 [info/poolworker-1] child process calling self.run() [info/poolworker-2] child process calling self.run() [info/poolworker-3] child process calling self.run() [info/poolworker-8] child process calling self.run() [info/poolworker-5] child process calling self.run() [info/poolworker-4] child process calling self.run() [info/poolworker-9] child process calling self.run() [info/poolworker-6] child process calling self.run() [info/poolworker-7] child process calling self.run() [info/poolworker-10] child process calling self.run() 

not complete solution, found way make code work in either guise: interpreter or code in running script. think problem has following note in multiprocessing docs:

functionality within package requires main method importable children. covered in programming guidelines worth pointing out here. means examples, such multiprocessing.pool examples not work in interactive interpreter.

i'm not sure why limitation exists, , why can still use pool interactive interpreter , not, oh well....

to around following in module might use multiprocessing:

import __main__ __should_multithread__ = false if hasattr(__main__,'__file__'):     __should_multithread__ = true 

the rest of code within module can check flag see if should use pool or execute without parallelization. doing this, can still use , test parallelized functions in modules interactive interpreter, run more slowly.


Comments

Popular posts from this blog

java - SNMP4J General Variable Binding Error -

windows - Python Service Installation - "Could not find PythonClass entry" -

Determine if a XmlNode is empty or null in C#? -