博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程池
阅读量:5066 次
发布时间:2019-06-12

本文共 4220 字,大约阅读时间需要 14 分钟。

版本一:

1 # -*- coding:utf-8 -*- 2 import Queue 3 import threading 4  5  6 class ThreadPool(object): 7  8     def __init__(self, max_num=20): 9         self.queue = Queue.Queue(max_num)10         for i in xrange(max_num):11             self.queue.put(threading.Thread)12 13     def get_thread(self):14         return self.queue.get()15 16     def add_thread(self):17         self.queue.put(threading.Thread)18 19 """20 pool = ThreadPool(10)21 22 def func(arg, p):23     print arg24     import time25     time.sleep(2)26     p.add_thread()27 28 29 for i in xrange(30):30     thread = pool.get_thread()31     t = thread(target=func, args=(i, pool))32     t.start()33 """

版本二:

1 # -*- coding:utf-8 -*-  2   3 import queue  4 import threading  5 import contextlib  6 import time  7   8 StopEvent = object()  9  10  11 class ThreadPool(object): 12  13     def __init__(self, max_num, max_task_num = None): 14         if max_task_num: 15             self.q = queue.Queue(max_task_num) 16         else: 17             self.q = queue.Queue() 18         self.max_num = max_num 19         self.cancel = False 20         self.terminal = False 21         self.generate_list = [] 22         self.free_list = [] 23  24     def run(self, func, args, callback=None): 25         """ 26         线程池执行一个任务 27         :param func: 任务函数 28         :param args: 任务函数所需参数 29         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 30         :return: 如果线程池已经终止,则返回True否则None 31         """ 32         if self.cancel: 33             return 34         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 35             self.generate_thread() 36         w = (func, args, callback,) 37         self.q.put(w) 38  39     def generate_thread(self): 40         """ 41         创建一个线程 42         """ 43         t = threading.Thread(target=self.call) 44         t.start() 45  46     def call(self): 47         """ 48         循环去获取任务函数并执行任务函数 49         """ 50         current_thread = threading.currentThread() 51         self.generate_list.append(current_thread) 52  53         event = self.q.get() 54         while event != StopEvent: 55  56             func, arguments, callback = event 57             try: 58                 result = func(*arguments) 59                 success = True 60             except Exception as e: 61                 success = False 62                 result = None 63  64             if callback is not None: 65                 try: 66                     callback(success, result) 67                 except Exception as e: 68                     pass 69  70             with self.worker_state(self.free_list, current_thread): 71                 if self.terminal: 72                     event = StopEvent 73                 else: 74                     event = self.q.get() 75         else: 76  77             self.generate_list.remove(current_thread) 78  79     def close(self): 80         """ 81         执行完所有的任务后,所有线程停止 82         """ 83         self.cancel = True 84         full_size = len(self.generate_list) 85         while full_size: 86             self.q.put(StopEvent) 87             full_size -= 1 88  89     def terminate(self): 90         """ 91         无论是否还有任务,终止线程 92         """ 93         self.terminal = True 94  95         while self.generate_list: 96             self.q.put(StopEvent) 97  98         self.q.queue.clear() 99 100     @contextlib.contextmanager101     def worker_state(self, state_list, worker_thread):102         """103         用于记录线程中正在等待的线程数104         """105         state_list.append(worker_thread)106         try:107             yield108         finally:109             state_list.remove(worker_thread)110 111 112 113 # How to use114 115 116 pool = ThreadPool(5)117 118 def callback(status, result):119     # status, execute action status120     # result, execute action return value121     pass122 123 124 def action(i):125     print(i)126 127 for i in range(30):128     ret = pool.run(action, (i,), callback)129 130 time.sleep(5)131 print(len(pool.generate_list), len(pool.free_list))132 print(len(pool.generate_list), len(pool.free_list))133 # pool.close()134 # pool.terminate()

 

转载于:https://www.cnblogs.com/yzls/p/9478277.html

你可能感兴趣的文章
Abstract Factory Pattern
查看>>
C# 实现Bresenham算法(vs2010)
查看>>
基于iSCSI的SQL Server 2012群集测试(一)--SQL群集安装
查看>>
list 容器 排序函数.xml
查看>>
Activity启动过程中获取组件宽高的五种方式
查看>>
java导出Excel表格简单的方法
查看>>
SQLite数据库简介
查看>>
利用堆实现堆排序&amp;优先队列
查看>>
Mono源码学习笔记:Console类(四)
查看>>
Android学习路线(十二)Activity生命周期——启动一个Activity
查看>>
《Genesis-3D开源游戏引擎完整实例教程-跑酷游戏篇03:暂停游戏》
查看>>
CPU,寄存器,一缓二缓.... RAM ROM 外部存储器等简介
查看>>
windows下编译FreeSwitch
查看>>
git .gitignore 文件不起作用
查看>>
Alan Turing的纪录片观后感
查看>>
c#自定义控件中的事件处理
查看>>
二十六、Android WebView缓存
查看>>
django Models 常用的字段和参数
查看>>
IOS--沙盒机制
查看>>
使用 JointCode.Shuttle 访问任意 AppDomain 的服务
查看>>