多任务实现之线程-threads

多线程:共享内存空间,提高CPU使用效率

进程是资源分配的单位,线程是CPU调度的单位:启动一个进程之后,会有一个主线程去依次执行

进程号0:用来进程之间的切换,而进程号1:用来生成新进程(所有进程的父进程)

进程与线程创建多任务的区别

创建了5个线程,分别各个执行,不会相互干扰
同时即使主线程完成任务了,也不会退出,而是要等到子线程完成之后才退出,往往是为了收回子线程的资源

线程例子1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import threading
from time import ctime,sleep
def music(func):
for i in range(2):
print( "I was listening to %s. %s" %(func,ctime()))
sleep(1)
def move(func):
for i in range(2):
print( "I was at the %s! %s" %(func,ctime()))
sleep(5)
# 主线程中 加两个子线程
threads = [] # 装载多个线程的数组 看着像是同时进行的
t1 = threading.Thread(target=music,args=(u'爱情买卖',))
threads.append(t1)
t2 = threading.Thread(target=move,args=(u'阿凡达',))
threads.append(t2)
if __name__ == '__main__':
for t in threads:# 子线程
t.setDaemon(True)
""" 线程声明为守护线程,必须在start() 方法调用之前设置
如果不设置为守护线程程序会被无限挂起.
子线程启动后,父线程也继续执行下去,当父线程执行完最后一条语句print "all over %s" %ctime()后
没有等待子线程,直接就退出了,同时子线程也一同结束"""
t.start()#开始线程活动
t.join()
""" 在子线程完成运行之前,这个子线程的父线程将一直被阻塞.join()实际上意味着等到队列为空,再执行别的操作
join()方法的位置是在for循环外的,也就是说必须等待for循环里的两个进程都结束后,才去执行主进程 """
print( "all over %s" %ctime()) #主线程

线程例子2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import _thread
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
#继承父类threading.Thread
def __init__(self, delay):
threading.Thread.__init__(self)
self.delay = delay
def run(self):
#把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
print( "Starting " + self.name)
# name属性中保存的是当前线程的名字
print_time(threadName=self.name,delay=self.delay,counter=5)
print( "Exiting " + self.name)
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
_thread.exit()
time.sleep(delay)
print( "%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
thread1 = myThread( delay=1)
thread2 = myThread( delay=2)
# 开启线程
thread1.start()
thread2.start()
print( "Exiting Main Thread")

多线程的复杂度

• 资源、数据的安全性:锁保护
• 原子性:数据操作是天然互斥的(不可分割,要不成功,要不失败)
• 同步等待:wait() notify() notifyAll()
• 死锁:多个线程对资源互锁,造成死锁
• 容灾:任何线程出现错误,整个进程都会停止

互斥问题

死锁:在线程间共享多个资源的时候,如果两个线程分别占有⼀部分资源并且同时等待对⽅的资源,就会造成死锁

线程同进程的执行顺利一样都是不确定的,是由调度算法决定的

线程之间共享全局变量,进程不共享,所以才有进程间的通信,所以才有线程的资源竞争,互斥,死锁等问题
同时创建线程时,传递的参数是列表也是共享的

互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from threading import Thread, Lock
import time
# 参数
l = [22,33]
#线程之间共享全局变量
g_num = 0
def test1(l):
global g_num
#这个线程和test2线程都在抢着对这个锁进行上锁,如果有1方成功的上锁,那么导致另外
#一方会堵塞(一直等待)到这个锁被解开为止
mutex.acquire()
for i in range(1000000):
g_num += 1
mutex.release()#用来对mutex指向的这个锁 进行解锁,,,只要开了锁,那么接下来会让所有因为
#这个锁 被上了锁 而堵塞的线程 进行抢着上锁
print("---test1---g_num=%d"%g_num)
l.append(11)
print(l)
def test2(l):
global g_num
mutex.acquire()
for i in range(1000000):
g_num += 1
mutex.release()
print("---test2---g_num=%d"%g_num)
l.append(44)
print(l)
#创建一把互斥锁,这个锁默认是没有上锁的
mutex = Lock()
p1 = Thread(target=test1,args=(l,))
p1.start()
#延时一会,保证p1线程中的事情做完
#time.sleep(3) #取消屏蔽之后 再次运行程序,结果会不一样
p2 = Thread(target=test2,args=(l,))
p2.start()
print("---g_num=%d---"%g_num)

全局变量在多个线程中共享,为了保证正确运行需要锁
非全局变量在每个线程中各有一份,不会共享,当然了不需要加锁

同步线程(继承类)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import threading
from datetime import datetime
import time
import _thread
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, delay):
threading.Thread.__init__(self)
# self.threadID = threadID
self.delay = delay
def run(self):
print( "Starting " + self.name)
# 如果多个线程共同对某个数据修改,则可能出现不可预料的结果,
# 为了保证数据的正确性,需要对多个线程进行同步.
# 线程同步(为了确保数据的完整没有被其它的线程修改,必须要得锁和解锁
# 所以也导致了要一个线程全部结束之后,才能执行下一个的线程)
# 这会导致同步阻塞,也就是只有一个线程工作完之后,才能进行下一个线程
# 获得锁,成功获得锁定后返回True
# 可选的timeout参数不填时将一直阻塞直到获得锁定
# 否则超时后将返回False
threadLock.acquire()
print_time(threadName=self.name, delay=self.delay, counter=5)
# 释放锁
threadLock.release()
print( "Exiting " + self.name)
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
_thread.exit()
time.sleep(delay)
print( "%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
# 线程同步
threadLock = threading.Lock()
threads = []
thread1 = myThread( delay=1)
thread2 = myThread(delay=2)
# 开启线程
n = datetime.now()
thread1.start()
thread1.join()#因为没有queue可继续执行下面的线程
thread2.start() #
# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有线程完成
for t in threads:
t.join()
print("Exiting Main Thread")
print(datetime.now()-n)

线程quere用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import queue, time, threading, datetime
class Job:
def __init__(self, name):
self.name = name
def do(self):
time.sleep(2)
print("\t[Info] Job({0}) is done!".format(self.name))
que = queue.Queue()
#FIFO即First in First Out,先进先出 a[len(a):1]=[queue] 加在后面的
# lifoqueue = queue.LifoQueue()#LIFO即Last in First Out
# 后进先出 a[0:0]=[queue] 加在最前面的 入栈 出栈
for i in range(20):
# 保存要去工作的信息 在queue中
que.put(Job(str(i + 1)))
#调用队列对象的put()方法在队尾插入一个项目
# put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1
# 如果队列当前满且block为1,put()方法就使调用线程暂停,直到空出一个数据单元.
# 如果block为0,put方法将引发Queue.Full异常
print("\t[Info] Queue size={0}...".format(que.qsize()))#返回队列的大小
# # Start activity to digest queue.
st = datetime.datetime.now()
#需要40秒
while que.qsize() > 0:
job = que.get()
job.do()
td = datetime.datetime.now() - st
print("\t[Info] Spending time={0}!".format(td))
#q.qsize() 返回队列的大小
# q.empty() 如果队列为空,返回True,反之False
# q.full() 如果队列满了,返回True,反之False
# q.full 与 maxsize 大小对应
# q.get([block[, timeout]]) 获取队列,timeout等待时间
# q.get_nowait() 相当q.get(False)
# 非阻塞 q.put(item) 写入队列,timeout等待时间
# q.put_nowait(item) 相当q.put(item, False)
# q.task_done() 在完成一项工作之后,q.task_done()函数向任务已经完成的队列发送一个信号
# q.join() 实际上意味着等到队列为空,再执行别的操作

线程quere

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import queue, time, threading, datetime
class Job:
def __init__(self, name):
self.name = name
def do(self):
time.sleep(2)
print("\t[Info] Job({0}) is done!".format(self.name))
que = queue.Queue()
for i in range(20):
que.put(Job(str(i + 1)))
print("\t[Info] Queue size={0}...".format(que.qsize()))
def doJob(*args):
queue = args[0]
while queue.qsize() > 0:
job = queue.get()
#调用队列对象的get()方法从队头删除并返回一个项目.可选参数为block,默认为True.
# 如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用.
# 如果队列为空且block为False,队列将引发Queue.Empty异常
job.do()
# Open three threads
thd1 = threading.Thread(target=doJob, name='Thd1', args=(que,))
thd2 = threading.Thread(target=doJob, name='Thd2', args=(que,))
thd3 = threading.Thread(target=doJob, name='Thd3', args=(que,))
# thd4 = threading.Thread(target=doJob, name='Thd3', args=(que,))
# thd5 = threading.Thread(target=doJob, name='Thd3', args=(que,))
# # Start activity to digest queue.
st = datetime.datetime.now()
thd1.start()
thd2.start()
thd3.start()
# thd1.join()
#在子线程完成运行之前,这个子线程的父线程将一直被阻塞.
# join() 实际上意味着等到队列为空,再执行别的操作
#這會讓呼叫 join() 方法的線程被 blocked, 一直到被呼叫 join() 的線程結束為止
#不会再进行下面的线程了
# thd4.start()
# thd5.start()
#首先當 Thread 類別被實例化, 你可以呼叫物件上面的方法 start() 來啟動該線程,
# 一旦線程啟動, 它的狀態會變成 "alive" ;
# Wait for all threads to terminate.
while thd1.is_alive() or thd2.is_alive() or thd3.is_alive():
time.sleep(1)
# 當執行完畢 run() 後狀態便不在是 "alive".
td = datetime.datetime.now() - st
print("\t[Info] Spending time={0}!".format(td))

推荐使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import queue
import threading
from datetime import datetime
SHARE_Q = queue.Queue() # 构造一个不限制大小的的队列
_WORKER_THREAD_NUM = 4 # 设置线程的个数
def worker():
global SHARE_Q
while not SHARE_Q.empty():
one_dict = SHARE_Q.get() # 获得任务
# to worker ...
SHARE_Q.task_done()
def main():
a= datetime.now()
threads = []
global SHARE_Q
library_list =['9787550215184','9787550206267']
for i in library_list:
SHARE_Q.put(i)
for i in range(_WORKER_THREAD_NUM):
thr = threading.Thread(target=worker)
# 必须要有 target去指定函数
thr.start()
threads.append(thr)
for thread in threads:
thread.join()
SHARE_Q.join()
print(datetime.now() -a)#0:00:00.376745
print('success')
if __name__ == '__main__':
main()

Share Comments