博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python 线程
阅读量:7081 次
发布时间:2019-06-28

本文共 9497 字,大约阅读时间需要 31 分钟。

线程

  • 什么是线程:

    一条流水线的工作过程,cpu的最小执行单位

    线程的创建与进程相同

  • 线程和进程的效率对比:

    线程的效率非常高,并且线程开启不需要消耗什么资源

  • 创建进程的两种方式

    from threading import Threadimport timedef func(n):    print(n)if __name__ == '__main__':    for i in range(10000):        t = Thread(target=func, args=(i,))        t.start()    print("主进程运行完毕")
    '''线程类 继承 Theard '''class MyThread(Thread):    def __init__(self,name):        super().__init__()        self.name = name    # 线程中 必须重写 run 方法    def run(self):        time.sleep(1)        print(self.name)if __name__ == '__main__':    thread = MyThread("张无忌")    # 开启线程    thread.start()    # 等待 线程运行完毕才 向后运行    thread.join()    time.sleep(1)    print("主进程运行")

线程之间数据共享:

  • import timefrom threading import Threadfrom multiprocessing import Processnum = 100def func():    global num    num = 0if __name__ == '__main__':    t = Thread(target=func,)    t.start()    t.join()    print(num)  # 0

锁(同步锁\互斥锁): Lock

  • 保证数据安全,但是牺牲了效率,同步执行锁内的代码

  • 死锁现象 :

    互相抢到了对方的需要的锁,导致双方相互等待,程序没法进行

    import timefrom threading import Thread, Lockclass MyThread(Thread):   def __init__(self, lockA, lockB):       super().__init__()       self.lockA = lockA       self.lockB = lockB   def run(self):       self.f1()       self.f2()   def f1(self):       self.lockA.acquire()       print('我拿了A锁')       self.lockB.acquire()       print('我是一个很好的客户!')       self.lockB.release()       self.lockA.release()   def f2(self):       self.lockB.acquire()       time.sleep(0.1)       print('我拿到了B锁')       self.lockA.acquire()       print('我是一名合格的技师')       self.lockA.release()       self.lockB.release()if __name__ == '__main__':   lockA = Lock()   lockB = Lock()   t1 = MyThread(lockA, lockB)   t1.start()   t2 = MyThread(lockA, lockB)   t2.start()   print('我是经理')

    解决死锁:

递归锁 RLock

  • 可以多次acquire,通过一个计数器来记录被锁了多少次,只有计数器为0的时候,大家才能继续抢锁

    from threading import Thread, RLockimport timeclass MyThread(Thread):    def __init__(self, lookA, lookB):        super().__init__()        self.lockA = lookA        self.lockB = lookB    def run(self):        self.f1()        self.f2()    def f1(self):        self.lockA.acquire()        print("我拿到了锁A")        self.lockB.acquire()        print("我是f1")        self.lockB.release()        self.lockA.release()    def f2(self):        self.lockB.acquire()        print("我拿到了锁B")        time.sleep(1)        self.lockA.acquire()        print("我是f2")        self.lockA.release()        self.lockB.release()if __name__ == '__main__':    lockA = lockB = RLock()    t1 = MyThread(lockA, lockB)    t1.start()    t2 = MyThread(lockA, lockB)    t2.start()    print("我是劳保")

守护进程 :

  • 主进程代码结束程序并没有结束,并且主进程还存在,进程等待其他的子进程执行结束以后,为子进程收尸,注意一个问题:主进程的代码运行结束守护进程跟着结束,

守护线程: t.daemon = True

  • 主线程等待所有非守护线程的结束才结束,主线程的代码运行结束,还要等待非守护线程的执行完毕.这个过程中守护线程还存在

  • 所有非守护线程结束才结束,主线程的代码结束,只要还有非守护线程,那么守护线程也不会结束

    from threading import Threadimport timedef func1():    time.sleep(3)    print(11111111)def func2(aa):    time.sleep(1)    print(2222222222)if __name__ == '__main__':    t = Thread(target=func1,)    t.start()    t1 = Thread(target=func2,)    t.daemon = True    t1.start()    print("主进程")

信号量: Semaphore

  • 控制同时能够进入锁内去执行代码的线程数量(进程数量),维护了一个计数器,刚开始创建信号量的时候假如设置的是4个房间,进入一次acquire就减1 ,出来一个就+1,如果计数器为0,那么其他的任务等待,这样其他的任务和正在执行的任务是一个同步的状态,而进入acquire里面去执行的那4个任务是异步执行的.

    import timefrom threading import Thread, Semaphoredef func1(s):    s.acquire()    time.sleep(1)    print('大宝剑!!!')    s.release()if __name__ == '__main__':    # 设置要 同时 运行线程的 个数    s = Semaphore(4)    for i in range(10):        t = Thread(target=func1,args=(s,))        t.start()

主线程等待子线程的 原因

import timefrom threading import Threadfrom multiprocessing import Processdef func(n):    time.sleep(5)    print(n)if __name__ == '__main__':    # 主线程等待的是子线程的任务全部执行完毕    t = Thread(target=func, args=('我是子线程',))    t.start()  # 速度非常快    # 主进程等待的是给子进程收尸    # p = Process(target=func,args=('我是子进程',))    # p.start()    # print('主进程结束!!')    print('主线程结束')

线程的一些其他方法

from threading import Threadimport threadingdef func():    # 当前线程    print("子线程  id >>>>", threading.get_ident())    # 当前线程的对象    print("当前线程 >>>>", threading.current_thread())    # 子线程的名字    print("子线程 name>>>", threading.current_thread().getName())  # Thread-1    # 主线程的对象    print("主线程对象>>>", threading.main_thread())    print("当前线程id 号>>>>>>>>",threading.get_ident())if __name__ == '__main__':    t = Thread(target=func, )    t.start()    # 当前正在运行的所有线程    print("正在运行的所有线程>>>", threading.enumerate())    # 开启线程 数量    print("以开启线程 数量>>>", threading.active_count())    # 当前线程的对象    print("当前线程 name>>>>", threading.current_thread().getName())    # 主线程的对象    print("主线程的对象>>>", threading.main_thread())    # 当前线程id 号    print("当前线程id 号>>>>>>>>",threading.get_ident())    print(threading.stack_size())

线程事件

from threading import Thread, Evente = Event()  # e的状态有两种,False True,当事件对象的状态为False的时候,wait的地方会阻塞e.set()  # 将事件对象的状态改为Truee.clear()  # 将事件对象的状态改为Flaseprint('在这里等待')e.wait()  # 阻塞print('还没好!!')

线程队列

  • import queue

  • 先进先出

    import queue#先进先出 FIFOq=queue.Queue()q.put('first')q.put('second')q.put('third')# q.put_nowait() #没有数据就报错,可以通过try来搞print(q.get())print(q.get())print(q.get())# q.get_nowait() #没有数据就报错,可以通过try来搞'''firstsecondthird'''
  • 后进先出 类似于栈

    import queueq=queue.LifoQueue() #队列,类似于栈,栈我们提过吗,是不是先进后出的顺序啊q.put('first')q.put('second')q.put('third')# q.put_nowait()print(q.get())print(q.get())print(q.get())# q.get_nowait()'''thirdsecondfirst'''
  • 优先级队列

    import queue# 优先级 队列q = queue.PriorityQueue()# put放入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高# 不能是字典q1 = queue.LifoQueue()# q.put((3,"d"))# q.put((2,"c"))# q.put((1,"f"))# 如果两个值的优先级一样,那么按照后面的值的acsii码顺序来排序,如果字符串第一个数元素相同,比较第二个元素的acsii码顺序q.put((10,"f"))q.put((10,"c"))# 优先级相同的两个数据,他们后面的值必须是相同的数据类型才能比较,可以是元祖,也是通过元素的ascii码顺序来排序q.put((100,"f"))q.put((110,"c"))# print(q.get())print(q.get())print(q.get())

线程池

  • 切换 ProcessPoolExecutor, ThreadPoolExecutor 为 进程池和线程池

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutorimport timedef func(n):    time.sleep(1)    print(n)    return n**2if __name__ == '__main__':    pp = ThreadPoolExecutor()    pp_list = []    res = pp.submit(func,1)    # print(res)    for i in range(10):        res = pp.submit(func,i) # 异步提交 10 个任务        pp_list.append(res)    pp.shutdown() # 等待任务全部执行完  =  close + join    print(pp_list)  # 
    > 将来的 at 0x15b33b4dc50 state=running> >>正在运行
    >> 挂起 # time.sleep(5) print([i.result() for i in pp_list]) for res in pp_list: print(res.result()) # 跟 res.get() 一样 print(pp_list) #
    # finished returned int 结束 返回 int
from concurrent.futures import ThreadPoolExecutor, as_completeddef task2(n):    url = "https://www.baidu.com"    res = requests.get(url)    print(n)    if res.status_code == 200:        return "yes!"    else:        return "no!"if __name__ == '__main__':    # max_workers 最多使用的线程数量    with ThreadPoolExecutor(max_workers=5) as executor:        task_list = [executor.submit(task2, x) for x in range(0, 10)]        # 对结果的处理,as_completed 返回一个生成器,等待每一个任务完成的时候就会返回一个任务,参数是futures序列        # item 表示的是Future对象        # 使用 future.result 获取函数结果        for item in as_completed(task_list):            print(item.result())

线程池的 map

异步执行的,map自带join功能

from threading import current_threadimport timefrom concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutordef func(n):    time.sleep(1)    return n ** 2if __name__ == '__main__':    # 开启线程池  个数 4 个    t = ThreadPoolExecutor(max_workers=4)    gen = t.map(func, range(10))  # 异步执行的,map自带join功能    print(gen)  # 
.result_iterator at 0x0000020F83747308> for i in range(10): print(i) print([ii for ii in gen])

进程中的线程池:

  • Pool 的源码 返回一个 线程池
def Pool(processes=None, initializer=None, initargs=()):    f  rom ..pool import ThreadPool    return ThreadPool(processes, initializer, initargs)
  • 用法
def task2(n):    url = "https://www.baidu.com"    res = requests.get(url)    print(n)    if res.status_code == 200:        return "yes!"    else:        return "no!"if __name__ == '__main__':    p = Pool(5)    res = p.map(task2, range(10))    print(res)

回调函数 add_done_callback

#  Executor 线程池from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutorimport time#                   当前 线程from threading import current_threaddef func(n):    time.sleep(1)    # 获取当前进程的名字    # print(current_thread().getName())    print("current_thread>>>>", current_thread().name)    # print(n)    return n ** 2def func2(n):    # 回调函数    print(n.result())    # 获取当前进程的名字    print("current_thread>>>>", current_thread().name)  # ThreadPoolExecutor-0_0if __name__ == '__main__':    # 开启进程池    # pp = ProcessPoolExecutor(max_workers=4)    # 开启 线程池    pp = ThreadPoolExecutor(max_workers=4)    for i in range(10):        # 回调函数的使用        res = pp.submit(func, i).add_done_callback(func2)    # print(res.result())

线程池的一些方法

import timefrom concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutordef func(n):    time.sleep(1)    # print(n,current_thread().ident)    return n**2def func2(n):    print(n)if __name__ == '__main__':    t_p = ThreadPoolExecutor(max_workers = 4)    # t_p = ProcessPoolExecutor(max_workers = 4)    # t_p.submit(func,n = 1)    t_res_list = []    for i in range(10):        # res_obj = t_p.submit(func,i) #异步提交了这个10个任务,        res_obj = t_p.submit(func,i) #异步提交了这个10个任务,        # res_obj.result() #他和get一样        t_res_list.append(res_obj)    t_p.shutdown()  # close + join    print('t_res_list',t_res_list)    for e_res in t_res_list:        print(e_res.result())

GIL锁 重点

img

转载于:https://www.cnblogs.com/zhang-zi-yi/p/10755811.html

你可能感兴趣的文章
Git取消跟踪某个文件
查看>>
基于容器服务的持续集成与云端交付(四)- 多种发布方式
查看>>
奇偶校验码
查看>>
我的友情链接
查看>>
省赛热身赛之Ordinal Numbers
查看>>
我的友情链接
查看>>
TCP/IP概述
查看>>
java nio学习笔记(一)
查看>>
触摸屏驱动
查看>>
时间命令
查看>>
Android异步下载图片并且缓存图片到本地
查看>>
sh命令
查看>>
LAMP优化内核篇
查看>>
strut2 上传文件
查看>>
StrutsPrepareAndExecuteFilter过滤器和url-pattern设置详解
查看>>
Java或web中所有路径问题
查看>>
Spring Boot上传图片
查看>>
zabbix安装配置
查看>>
Java8 Lambda表达式
查看>>
C#基础知识整理:基础知识(11) 值类型,引用类型
查看>>