Multithreading Python FS Crawler -
i've written python function scours filesystem using provided directory pattern, optional 'actions' take provided @ each level. tried multi-threading since of volumes on network shares , want minimize io blocking. started using multiprocessing pool class, since convenient...(seriously, no pool class threading?) function unravels provided fs pattern as possible , submits newly returned paths pool until no new paths returned. got work great when using function , class directly, trying use function class , program seems hang. simplify rewrote function using threads instead of processes, , wrote simple threadpool class... same problem. here's simplified version of code still exhibits same problems:
file test1.py: ------------------------------------------------ import os import glob multiprocessing import pool def mapglob(pool,paths,pattern): results = [] paths = [os.path.join(p,pattern) p in paths] result in pool.map(glob.glob,paths): results += result return results def findallmypaths(): pool = pool(10) paths = ['/volumes'] follow = ['**','ptid_*','expid_*','slkid_*'] pattern in follow: paths = mapglob(pool,paths,pattern) return paths file test2.py: ---------------------------------------------------------------------------- test1 import findallmypaths allmypaths = findallmypaths()
now if call
>>>from test1 import findallmypaths >>>findallmypaths() >>>...long list of paths
this works fine, if try:
>>>from test2 import allmypaths
python hangs forever. action functions called (in example glob), never seem return... need please... parallelized version runs faster when works (6-20x faster depending on 'actions' being mapped @ each point in fs tree), i'd able use it.
also if change mapping function non-parallel version:
def mapglob(pool,paths,pattern): results = [] paths = [os.path.join(p,pattern) p in paths] path in paths: results += glob.glob(path) return results
everything runs fine.
edit:
i turned on debugging in multiprocessing see if me further. in case works get:
[debug/mainprocess] created semlock handle 5 [debug/mainprocess] created semlock handle 6 [debug/mainprocess] created semlock handle 9 [debug/mainprocess] created semlock handle 10 [info/poolworker-1] child process calling self.run() [info/poolworker-2] child process calling self.run() [info/poolworker-3] child process calling self.run() [info/poolworker-5] child process calling self.run() [info/poolworker-4] child process calling self.run() [info/poolworker-6] child process calling self.run() [info/poolworker-7] child process calling self.run() [info/poolworker-9] child process calling self.run() [info/poolworker-8] child process calling self.run() [info/poolworker-10] child process calling self.run() [debug/mainprocess] closing pool [subdebug/mainprocess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.pool'>> args (<queue.queue instance @ 0x34af918>, <multiprocessing.queues.simplequeue object @ 0x3494950>, <multiprocessing.queues.simplequeue object @ 0x34a61b0>, [<process(poolworker-1, started daemon)>, <process(poolworker-2, started daemon)>, <process(poolworker-3, started daemon)>, <process(poolworker-4, started daemon)>, <process(poolworker-5, started daemon)>, <process(poolworker-6, started daemon)>, <process(poolworker-7, started daemon)>, <process(poolworker-8, started daemon)>, <process(poolworker-9, started daemon)>, <process(poolworker-10, started daemon)>], <thread(thread-1, started daemon -1341648896)>, <thread(thread-2, started daemon -1341116416)>, {}) , kwargs {} [debug/mainprocess] finalizing pool [debug/mainprocess] helping task handler/workers finish [debug/mainprocess] removing tasks inqueue until task handler finished [debug/mainprocess] task handler got sentinel [debug/mainprocess] task handler sending sentinel result handler [debug/mainprocess] task handler sending sentinel workers [debug/mainprocess] task handler exiting [debug/mainprocess] result handler got sentinel [debug/mainprocess] ensuring outqueue not full [debug/mainprocess] result handler exiting: len(cache)=0, thread._state=0 [debug/poolworker-2] worker got sentinel -- exiting [debug/poolworker-1] worker got sentinel -- exiting [info/poolworker-2] process shutting down [debug/poolworker-7] worker got sentinel -- exiting [info/poolworker-1] process shutting down [info/poolworker-7] process shutting down [debug/poolworker-7] running "atexit" finalizers priority >= 0 [debug/poolworker-1] running "atexit" finalizers priority >= 0 [debug/poolworker-7] running remaining "atexit" finalizers [info/poolworker-7] process exiting exitcode 0 [debug/poolworker-1] running remaining "atexit" finalizers [info/poolworker-1] process exiting exitcode 0 [debug/poolworker-5] worker got sentinel -- exiting [debug/poolworker-2] running "atexit" finalizers priority >= 0 [info/poolworker-5] process shutting down [debug/poolworker-5] running "atexit" finalizers priority >= 0 [debug/poolworker-2] running remaining "atexit" finalizers [debug/poolworker-5] running remaining "atexit" finalizers [info/poolworker-2] process exiting exitcode 0 [info/poolworker-5] process exiting exitcode 0 [debug/poolworker-6] worker got sentinel -- exiting [info/poolworker-6] process shutting down [debug/poolworker-6] running "atexit" finalizers priority >= 0 [debug/poolworker-6] running remaining "atexit" finalizers [info/poolworker-6] process exiting exitcode 0 [debug/poolworker-4] worker got sentinel -- exiting [debug/poolworker-9] worker got sentinel -- exiting [info/poolworker-9] process shutting down [debug/poolworker-9] running "atexit" finalizers priority >= 0 [debug/poolworker-9] running remaining "atexit" finalizers [info/poolworker-9] process exiting exitcode 0 [info/poolworker-4] process shutting down [debug/poolworker-4] running "atexit" finalizers priority >= 0 [debug/poolworker-4] running remaining "atexit" finalizers [info/poolworker-4] process exiting exitcode 0 [debug/poolworker-10] worker got sentinel -- exiting [info/poolworker-10] process shutting down [debug/poolworker-10] running "atexit" finalizers priority >= 0 [debug/poolworker-10] running remaining "atexit" finalizers [info/poolworker-10] process exiting exitcode 0 [debug/poolworker-8] worker got sentinel -- exiting [info/poolworker-8] process shutting down [debug/poolworker-8] running "atexit" finalizers priority >= 0 [debug/poolworker-8] running remaining "atexit" finalizers [info/poolworker-8] process exiting exitcode 0 [debug/poolworker-3] worker got sentinel -- exiting [info/poolworker-3] process shutting down [debug/poolworker-3] running "atexit" finalizers priority >= 0 [debug/poolworker-3] running remaining "atexit" finalizers [info/poolworker-3] process exiting exitcode 0 [debug/mainprocess] terminating workers [debug/mainprocess] joining task handler [debug/mainprocess] joining result handler [debug/mainprocess] joining pool workers
and when doesn't is:
[debug/mainprocess] created semlock handle 6 [debug/mainprocess] created semlock handle 7 [debug/mainprocess] created semlock handle 10 [debug/mainprocess] created semlock handle 11 [info/poolworker-1] child process calling self.run() [info/poolworker-2] child process calling self.run() [info/poolworker-3] child process calling self.run() [info/poolworker-8] child process calling self.run() [info/poolworker-5] child process calling self.run() [info/poolworker-4] child process calling self.run() [info/poolworker-9] child process calling self.run() [info/poolworker-6] child process calling self.run() [info/poolworker-7] child process calling self.run() [info/poolworker-10] child process calling self.run()
not complete solution, found way make code work in either guise: interpreter or code in running script. think problem has following note in multiprocessing docs:
functionality within package requires main method importable children. covered in programming guidelines worth pointing out here. means examples, such multiprocessing.pool examples not work in interactive interpreter.
i'm not sure why limitation exists, , why can still use pool interactive interpreter , not, oh well....
to around following in module might use multiprocessing:
import __main__ __should_multithread__ = false if hasattr(__main__,'__file__'): __should_multithread__ = true
the rest of code within module can check flag see if should use pool or execute without parallelization. doing this, can still use , test parallelized functions in modules interactive interpreter, run more slowly.
Comments
Post a Comment