版本一:
1 # -*- coding:utf-8 -*- 2 import Queue 3 import threading 4 5 6 class ThreadPool(object): 7 8 def __init__(self, max_num=20): 9 self.queue = Queue.Queue(max_num)10 for i in xrange(max_num):11 self.queue.put(threading.Thread)12 13 def get_thread(self):14 return self.queue.get()15 16 def add_thread(self):17 self.queue.put(threading.Thread)18 19 """20 pool = ThreadPool(10)21 22 def func(arg, p):23 print arg24 import time25 time.sleep(2)26 p.add_thread()27 28 29 for i in xrange(30):30 thread = pool.get_thread()31 t = thread(target=func, args=(i, pool))32 t.start()33 """
版本二:
1 # -*- coding:utf-8 -*- 2 3 import queue 4 import threading 5 import contextlib 6 import time 7 8 StopEvent = object() 9 10 11 class ThreadPool(object): 12 13 def __init__(self, max_num, max_task_num = None): 14 if max_task_num: 15 self.q = queue.Queue(max_task_num) 16 else: 17 self.q = queue.Queue() 18 self.max_num = max_num 19 self.cancel = False 20 self.terminal = False 21 self.generate_list = [] 22 self.free_list = [] 23 24 def run(self, func, args, callback=None): 25 """ 26 线程池执行一个任务 27 :param func: 任务函数 28 :param args: 任务函数所需参数 29 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 30 :return: 如果线程池已经终止,则返回True否则None 31 """ 32 if self.cancel: 33 return 34 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 35 self.generate_thread() 36 w = (func, args, callback,) 37 self.q.put(w) 38 39 def generate_thread(self): 40 """ 41 创建一个线程 42 """ 43 t = threading.Thread(target=self.call) 44 t.start() 45 46 def call(self): 47 """ 48 循环去获取任务函数并执行任务函数 49 """ 50 current_thread = threading.currentThread() 51 self.generate_list.append(current_thread) 52 53 event = self.q.get() 54 while event != StopEvent: 55 56 func, arguments, callback = event 57 try: 58 result = func(*arguments) 59 success = True 60 except Exception as e: 61 success = False 62 result = None 63 64 if callback is not None: 65 try: 66 callback(success, result) 67 except Exception as e: 68 pass 69 70 with self.worker_state(self.free_list, current_thread): 71 if self.terminal: 72 event = StopEvent 73 else: 74 event = self.q.get() 75 else: 76 77 self.generate_list.remove(current_thread) 78 79 def close(self): 80 """ 81 执行完所有的任务后,所有线程停止 82 """ 83 self.cancel = True 84 full_size = len(self.generate_list) 85 while full_size: 86 self.q.put(StopEvent) 87 full_size -= 1 88 89 def terminate(self): 90 """ 91 无论是否还有任务,终止线程 92 """ 93 self.terminal = True 94 95 while self.generate_list: 96 self.q.put(StopEvent) 97 98 self.q.queue.clear() 99 100 @contextlib.contextmanager101 def worker_state(self, state_list, worker_thread):102 """103 用于记录线程中正在等待的线程数104 """105 state_list.append(worker_thread)106 try:107 yield108 finally:109 state_list.remove(worker_thread)110 111 112 113 # How to use114 115 116 pool = ThreadPool(5)117 118 def callback(status, result):119 # status, execute action status120 # result, execute action return value121 pass122 123 124 def action(i):125 print(i)126 127 for i in range(30):128 ret = pool.run(action, (i,), callback)129 130 time.sleep(5)131 print(len(pool.generate_list), len(pool.free_list))132 print(len(pool.generate_list), len(pool.free_list))133 # pool.close()134 # pool.terminate()