线程
什么是线程:
一条流水线的工作过程,cpu的最小执行单位
线程的创建与进程相同
线程和进程的效率对比:
线程的效率非常高,并且线程开启不需要消耗什么资源
创建进程的两种方式
from threading import Threadimport timedef func(n): print(n)if __name__ == '__main__': for i in range(10000): t = Thread(target=func, args=(i,)) t.start() print("主进程运行完毕")
'''线程类 继承 Theard '''class MyThread(Thread): def __init__(self,name): super().__init__() self.name = name # 线程中 必须重写 run 方法 def run(self): time.sleep(1) print(self.name)if __name__ == '__main__': thread = MyThread("张无忌") # 开启线程 thread.start() # 等待 线程运行完毕才 向后运行 thread.join() time.sleep(1) print("主进程运行")
线程之间数据共享:
import timefrom threading import Threadfrom multiprocessing import Processnum = 100def func(): global num num = 0if __name__ == '__main__': t = Thread(target=func,) t.start() t.join() print(num) # 0
锁(同步锁\互斥锁): Lock
保证数据安全,但是牺牲了效率,同步执行锁内的代码
死锁现象 :
互相抢到了对方的需要的锁,导致双方相互等待,程序没法进行
import timefrom threading import Thread, Lockclass MyThread(Thread): def __init__(self, lockA, lockB): super().__init__() self.lockA = lockA self.lockB = lockB def run(self): self.f1() self.f2() def f1(self): self.lockA.acquire() print('我拿了A锁') self.lockB.acquire() print('我是一个很好的客户!') self.lockB.release() self.lockA.release() def f2(self): self.lockB.acquire() time.sleep(0.1) print('我拿到了B锁') self.lockA.acquire() print('我是一名合格的技师') self.lockA.release() self.lockB.release()if __name__ == '__main__': lockA = Lock() lockB = Lock() t1 = MyThread(lockA, lockB) t1.start() t2 = MyThread(lockA, lockB) t2.start() print('我是经理')
解决死锁:
递归锁 RLock
可以多次acquire,通过一个计数器来记录被锁了多少次,只有计数器为0的时候,大家才能继续抢锁
from threading import Thread, RLockimport timeclass MyThread(Thread): def __init__(self, lookA, lookB): super().__init__() self.lockA = lookA self.lockB = lookB def run(self): self.f1() self.f2() def f1(self): self.lockA.acquire() print("我拿到了锁A") self.lockB.acquire() print("我是f1") self.lockB.release() self.lockA.release() def f2(self): self.lockB.acquire() print("我拿到了锁B") time.sleep(1) self.lockA.acquire() print("我是f2") self.lockA.release() self.lockB.release()if __name__ == '__main__': lockA = lockB = RLock() t1 = MyThread(lockA, lockB) t1.start() t2 = MyThread(lockA, lockB) t2.start() print("我是劳保")
守护进程 :
- 主进程代码结束程序并没有结束,并且主进程还存在,进程等待其他的子进程执行结束以后,为子进程收尸,注意一个问题:主进程的代码运行结束守护进程跟着结束,
守护线程: t.daemon = True
主线程等待所有非守护线程的结束才结束,主线程的代码运行结束,还要等待非守护线程的执行完毕.这个过程中守护线程还存在
所有非守护线程结束才结束,主线程的代码结束,只要还有非守护线程,那么守护线程也不会结束
from threading import Threadimport timedef func1(): time.sleep(3) print(11111111)def func2(aa): time.sleep(1) print(2222222222)if __name__ == '__main__': t = Thread(target=func1,) t.start() t1 = Thread(target=func2,) t.daemon = True t1.start() print("主进程")
信号量: Semaphore
控制同时能够进入锁内去执行代码的线程数量(进程数量),维护了一个计数器,刚开始创建信号量的时候假如设置的是4个房间,进入一次acquire就减1 ,出来一个就+1,如果计数器为0,那么其他的任务等待,这样其他的任务和正在执行的任务是一个同步的状态,而进入acquire里面去执行的那4个任务是异步执行的.
import timefrom threading import Thread, Semaphoredef func1(s): s.acquire() time.sleep(1) print('大宝剑!!!') s.release()if __name__ == '__main__': # 设置要 同时 运行线程的 个数 s = Semaphore(4) for i in range(10): t = Thread(target=func1,args=(s,)) t.start()
主线程等待子线程的 原因
import timefrom threading import Threadfrom multiprocessing import Processdef func(n): time.sleep(5) print(n)if __name__ == '__main__': # 主线程等待的是子线程的任务全部执行完毕 t = Thread(target=func, args=('我是子线程',)) t.start() # 速度非常快 # 主进程等待的是给子进程收尸 # p = Process(target=func,args=('我是子进程',)) # p.start() # print('主进程结束!!') print('主线程结束')
线程的一些其他方法
from threading import Threadimport threadingdef func(): # 当前线程 print("子线程 id >>>>", threading.get_ident()) # 当前线程的对象 print("当前线程 >>>>", threading.current_thread()) # 子线程的名字 print("子线程 name>>>", threading.current_thread().getName()) # Thread-1 # 主线程的对象 print("主线程对象>>>", threading.main_thread()) print("当前线程id 号>>>>>>>>",threading.get_ident())if __name__ == '__main__': t = Thread(target=func, ) t.start() # 当前正在运行的所有线程 print("正在运行的所有线程>>>", threading.enumerate()) # 开启线程 数量 print("以开启线程 数量>>>", threading.active_count()) # 当前线程的对象 print("当前线程 name>>>>", threading.current_thread().getName()) # 主线程的对象 print("主线程的对象>>>", threading.main_thread()) # 当前线程id 号 print("当前线程id 号>>>>>>>>",threading.get_ident()) print(threading.stack_size())
线程事件
from threading import Thread, Evente = Event() # e的状态有两种,False True,当事件对象的状态为False的时候,wait的地方会阻塞e.set() # 将事件对象的状态改为Truee.clear() # 将事件对象的状态改为Flaseprint('在这里等待')e.wait() # 阻塞print('还没好!!')
线程队列
import queue
先进先出
import queue#先进先出 FIFOq=queue.Queue()q.put('first')q.put('second')q.put('third')# q.put_nowait() #没有数据就报错,可以通过try来搞print(q.get())print(q.get())print(q.get())# q.get_nowait() #没有数据就报错,可以通过try来搞'''firstsecondthird'''
后进先出 类似于栈
import queueq=queue.LifoQueue() #队列,类似于栈,栈我们提过吗,是不是先进后出的顺序啊q.put('first')q.put('second')q.put('third')# q.put_nowait()print(q.get())print(q.get())print(q.get())# q.get_nowait()'''thirdsecondfirst'''
优先级队列
import queue# 优先级 队列q = queue.PriorityQueue()# put放入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高# 不能是字典q1 = queue.LifoQueue()# q.put((3,"d"))# q.put((2,"c"))# q.put((1,"f"))# 如果两个值的优先级一样,那么按照后面的值的acsii码顺序来排序,如果字符串第一个数元素相同,比较第二个元素的acsii码顺序q.put((10,"f"))q.put((10,"c"))# 优先级相同的两个数据,他们后面的值必须是相同的数据类型才能比较,可以是元祖,也是通过元素的ascii码顺序来排序q.put((100,"f"))q.put((110,"c"))# print(q.get())print(q.get())print(q.get())
线程池
切换 ProcessPoolExecutor, ThreadPoolExecutor 为 进程池和线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutorimport timedef func(n): time.sleep(1) print(n) return n**2if __name__ == '__main__': pp = ThreadPoolExecutor() pp_list = [] res = pp.submit(func,1) # print(res) for i in range(10): res = pp.submit(func,i) # 异步提交 10 个任务 pp_list.append(res) pp.shutdown() # 等待任务全部执行完 = close + join print(pp_list) #
> 将来的 at 0x15b33b4dc50 state=running> >>正在运行 >> 挂起 # time.sleep(5) print([i.result() for i in pp_list]) for res in pp_list: print(res.result()) # 跟 res.get() 一样 print(pp_list) # # finished returned int 结束 返回 int
from concurrent.futures import ThreadPoolExecutor, as_completeddef task2(n): url = "https://www.baidu.com" res = requests.get(url) print(n) if res.status_code == 200: return "yes!" else: return "no!"if __name__ == '__main__': # max_workers 最多使用的线程数量 with ThreadPoolExecutor(max_workers=5) as executor: task_list = [executor.submit(task2, x) for x in range(0, 10)] # 对结果的处理,as_completed 返回一个生成器,等待每一个任务完成的时候就会返回一个任务,参数是futures序列 # item 表示的是Future对象 # 使用 future.result 获取函数结果 for item in as_completed(task_list): print(item.result())
线程池的 map
异步执行的,map自带join功能
from threading import current_threadimport timefrom concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutordef func(n): time.sleep(1) return n ** 2if __name__ == '__main__': # 开启线程池 个数 4 个 t = ThreadPoolExecutor(max_workers=4) gen = t.map(func, range(10)) # 异步执行的,map自带join功能 print(gen) #.result_iterator at 0x0000020F83747308> for i in range(10): print(i) print([ii for ii in gen])
进程中的线程池:
- Pool 的源码 返回一个 线程池
def Pool(processes=None, initializer=None, initargs=()): f rom ..pool import ThreadPool return ThreadPool(processes, initializer, initargs)
- 用法
def task2(n): url = "https://www.baidu.com" res = requests.get(url) print(n) if res.status_code == 200: return "yes!" else: return "no!"if __name__ == '__main__': p = Pool(5) res = p.map(task2, range(10)) print(res)
回调函数 add_done_callback
# Executor 线程池from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutorimport time# 当前 线程from threading import current_threaddef func(n): time.sleep(1) # 获取当前进程的名字 # print(current_thread().getName()) print("current_thread>>>>", current_thread().name) # print(n) return n ** 2def func2(n): # 回调函数 print(n.result()) # 获取当前进程的名字 print("current_thread>>>>", current_thread().name) # ThreadPoolExecutor-0_0if __name__ == '__main__': # 开启进程池 # pp = ProcessPoolExecutor(max_workers=4) # 开启 线程池 pp = ThreadPoolExecutor(max_workers=4) for i in range(10): # 回调函数的使用 res = pp.submit(func, i).add_done_callback(func2) # print(res.result())
线程池的一些方法
import timefrom concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutordef func(n): time.sleep(1) # print(n,current_thread().ident) return n**2def func2(n): print(n)if __name__ == '__main__': t_p = ThreadPoolExecutor(max_workers = 4) # t_p = ProcessPoolExecutor(max_workers = 4) # t_p.submit(func,n = 1) t_res_list = [] for i in range(10): # res_obj = t_p.submit(func,i) #异步提交了这个10个任务, res_obj = t_p.submit(func,i) #异步提交了这个10个任务, # res_obj.result() #他和get一样 t_res_list.append(res_obj) t_p.shutdown() # close + join print('t_res_list',t_res_list) for e_res in t_res_list: print(e_res.result())