多任务实现之进程-process

单核CPU多任务实现的原理,是由操作系统中的调度算法:

  1. 时间片轮转
  2. 优先级调度
    并发:看着像一起执行

多核才有并行

fork()创建新进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import os
ret = os.fork()# 创建一个新进程,父子进程都从这里开始
# 父进程的返回值会大于0(子进程ID),而子进程的返回值为0
print(ret)
# 父进程返回的是子进程ID,子进程中返回的是0
if ret>0:
print("---父进程--%d-"%os.getpid())
else:
print("---子进程---%d-%d-"%(os.getpid(),os.getppid()))
print("----over---")
# 13854
# ---父进程--13748-
# ----over---
# 0
# ---子进程---13846-13748-
# ----over---

多次fork的进程数 = 2的^fork次数的增长,所以要防止fork炸弹

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import os
import time
#父进程
ret = os.fork()
if ret==0:
#子进程
print("--1--")
else:
#父进程
print("--2--")
#父子进程
ret = os.fork()
if ret==0:
#孙子
#儿子
print("--11--")
else:
#儿子
#父进程
print("--22--")
# --2--
# --22--
# --11--
# --1--
# --11--
# --22--

跨平台创建进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process
import time
# 用Process创建的子进程,会保证所有的子进程运行完成
def test():
for _ in range(5):
print("---test---")
time.sleep(1)
if __name__ == '__main__':
p = Process(target=test)
p.start() #让这个进程开始执行test函数里的代码
# p.join()# 堵塞住进程,保证先完成子进程的代码
p.join(1) # 表示主进程愿意等待子进程完成的时间,到了这个时间之后,
# 不管子进程有没有完成自己的代码,父进程都会去执行了
for _ in range(3):
print("---main---")
time.sleep(1)
# ---test---
# ---main---
# ---main------test---
# ---test---
#
# ---main------test---
#
# ---test---

进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from multiprocessing import Pool
import os
import time
def worker(num):
for i in range(2):
print("===pid=%d==num=%d="%(os.getpid(), num))
time.sleep(1)
if __name__ == '__main__':
# 进程池中对最多保存3个进程
pool = Pool(3)
for i in range(5):
print("---%d---"%i)
# 向进程池中添加任务
# 注意:如果添加的任务数量超过了 进程池中进程的个数的话,那么不会导致添加不进入
# 添加到进程中的任务 如果还没有被执行的话,那么此时 他们会等待进程池中的
# 进程完成一个任务之后,会自动的去用刚刚的那个进程 完成当前的新任务
pool.apply_async(worker, (i,))# 非阻塞子进程,使用进程池的所有进程
# pool.apply 主进程使用进程池中的一个进程,
# 要等到这个进程工作完成之后才运行另一个进程,也就失去了多进程的意义
pool.close()#关闭进程池,相当于不能够再次添加新任务了
pool.join()# 主进程 创建/添加任务后,主进程默认不会等待进程池中的任务执行完后才结束
# 而是当主进程的任务做完之后立马结束,如果这个地方没join,会导致
# 进程池中的任务不会执行
# ---0---
# ---1---
# ---2---
# ---3---
# ---4---
# ===pid=15225==num=1=
# ===pid=15224==num=2=
# ===pid=15223==num=0=
# ===pid=15225==num=1====pid=15224==num=2====pid=15223==num=0=
#
#
# ===pid=15224==num=3====pid=15223==num=4=
#
# ===pid=15223==num=4====pid=15224==num=3=

进程间通信Queue(队列:先进先出)

想要完成进程间的数据共享,需要一些方法:命名管道(pipe)/信号(sign)/无名管道/共享内存/消息队列(缓存)/网络(socket)等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#修改import中的Queue为Manager
import os
from multiprocessing import Manager,Pool
def writer(q):
print("writer启动(%s),⽗进程为(%s)"%(os.getpid(),os.getppid()))
for i in "dongGe":
q.put(i)
def reader(q):
print("reader启动(%s),⽗进程为(%s)"%(os.getpid(),os.getppid()))
for i in range(q.qsize()):
print("reader从Queue获取到消息:%s"%q.get(True))
if __name__=="__main__":
print("(%s) start"%os.getpid())
q=Manager().Queue() # 使⽤Manager中的Queue来初始化
po=Pool()
# 使⽤阻塞模式创建进程
# 这样就不需要在reader中使⽤死循环了,可以让writer完全执⾏完成后,再⽤reader去读取
po.apply(writer, (q,))
po.apply(reader, (q,))
po.close()
po.join()
print("(%s) End" % os.getpid())
# (13748)start
# (13748)End
# writer启动(15731),⽗进程为(13748)
# reader启动(15733),⽗进程为(13748)
# reader从Queue获取到消息:d
# reader从Queue获取到消息:o
# reader从Queue获取到消息:n
# reader从Queue获取到消息:g
# reader从Queue获取到消息:G
# reader从Queue获取到消息:e
Share Comments