find.py 4.11 KB
Newer Older
1
# A parallelized "find(1)" using the thread module.
Guido van Rossum's avatar
Guido van Rossum committed
2 3 4 5

# This demonstrates the use of a work queue and worker threads.
# It really does do more stats/sec when using multiple threads,
# although the improvement is only about 20-30 percent.
6 7
# (That was 8 years ago.  In 2002, on Linux, I can't measure
# a speedup. :-( )
Guido van Rossum's avatar
Guido van Rossum committed
8 9 10 11

# I'm too lazy to write a command line parser for the full find(1)
# command line syntax, so the predicate it searches for is wired-in,
# see function selector() below.  (It currently searches for files with
12
# world write permission.)
Guido van Rossum's avatar
Guido van Rossum committed
13 14

# Usage: parfind.py [-w nworkers] [directory] ...
15
# Default nworkers is 4
Guido van Rossum's avatar
Guido van Rossum committed
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36


import sys
import getopt
import string
import time
import os
from stat import *
import thread


# Work queue class.  Usage:
#   wq = WorkQ()
#   wq.addwork(func, (arg1, arg2, ...)) # one or more calls
#   wq.run(nworkers)
# The work is done when wq.run() completes.
# The function calls executed by the workers may add more work.
# Don't use keyboard interrupts!

class WorkQ:

Tim Peters's avatar
Tim Peters committed
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
    # Invariants:

    # - busy and work are only modified when mutex is locked
    # - len(work) is the number of jobs ready to be taken
    # - busy is the number of jobs being done
    # - todo is locked iff there is no work and somebody is busy

    def __init__(self):
        self.mutex = thread.allocate()
        self.todo = thread.allocate()
        self.todo.acquire()
        self.work = []
        self.busy = 0

    def addwork(self, func, args):
        job = (func, args)
        self.mutex.acquire()
        self.work.append(job)
        self.mutex.release()
        if len(self.work) == 1:
            self.todo.release()

    def _getwork(self):
        self.todo.acquire()
        self.mutex.acquire()
        if self.busy == 0 and len(self.work) == 0:
            self.mutex.release()
            self.todo.release()
            return None
        job = self.work[0]
        del self.work[0]
        self.busy = self.busy + 1
        self.mutex.release()
        if len(self.work) > 0:
            self.todo.release()
        return job

    def _donework(self):
        self.mutex.acquire()
        self.busy = self.busy - 1
        if self.busy == 0 and len(self.work) == 0:
            self.todo.release()
        self.mutex.release()

    def _worker(self):
        time.sleep(0.00001)     # Let other threads run
        while 1:
            job = self._getwork()
            if not job:
                break
            func, args = job
            apply(func, args)
            self._donework()

    def run(self, nworkers):
        if not self.work:
            return # Nothing to do
        for i in range(nworkers-1):
            thread.start_new(self._worker, ())
        self._worker()
        self.todo.acquire()
Guido van Rossum's avatar
Guido van Rossum committed
98 99 100 101 102


# Main program

def main():
Tim Peters's avatar
Tim Peters committed
103 104 105 106 107 108 109
    nworkers = 4
    opts, args = getopt.getopt(sys.argv[1:], '-w:')
    for opt, arg in opts:
        if opt == '-w':
            nworkers = string.atoi(arg)
    if not args:
        args = [os.curdir]
Guido van Rossum's avatar
Guido van Rossum committed
110

Tim Peters's avatar
Tim Peters committed
111 112 113
    wq = WorkQ()
    for dir in args:
        wq.addwork(find, (dir, selector, wq))
Guido van Rossum's avatar
Guido van Rossum committed
114

Tim Peters's avatar
Tim Peters committed
115 116 117
    t1 = time.time()
    wq.run(nworkers)
    t2 = time.time()
Guido van Rossum's avatar
Guido van Rossum committed
118

Tim Peters's avatar
Tim Peters committed
119
    sys.stderr.write('Total time ' + `t2-t1` + ' sec.\n')
Guido van Rossum's avatar
Guido van Rossum committed
120 121 122 123 124 125


# The predicate -- defines what files we look for.
# Feel free to change this to suit your purpose

def selector(dir, name, fullname, stat):
126 127
    # Look for world writable files that are not symlinks
    return (stat[ST_MODE] & 0002) != 0 and not S_ISLNK(stat[ST_MODE])
Guido van Rossum's avatar
Guido van Rossum committed
128 129 130 131 132


# The find procedure -- calls wq.addwork() for subdirectories

def find(dir, pred, wq):
Tim Peters's avatar
Tim Peters committed
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
    try:
        names = os.listdir(dir)
    except os.error, msg:
        print `dir`, ':', msg
        return
    for name in names:
        if name not in (os.curdir, os.pardir):
            fullname = os.path.join(dir, name)
            try:
                stat = os.lstat(fullname)
            except os.error, msg:
                print `fullname`, ':', msg
                continue
            if pred(dir, name, fullname, stat):
                print fullname
            if S_ISDIR(stat[ST_MODE]):
                if not os.path.ismount(fullname):
                    wq.addwork(find, (fullname, pred, wq))
Guido van Rossum's avatar
Guido van Rossum committed
151 152 153 154 155


# Call the main program

main()