Python--线程&进程

Python--线程&进程

2020/4/20 更新,根据给社团新生的讲课内容适当进行了补充
2020/10/25 更新,整合内容

多线程和多进程概念

  • 当计算机运行程序时,就会创建包含代码和状态的进程。这些进程会通过计算机的一个或多个CPU执行。不过,同一时刻一个CPU只会执行一个进程,然后在不同进程之间快速切换,这样就给人以多个程序同时进行的感觉(所有进程都使用一个CPU,占用一定时间后切换给另一个进程)。同理,在一个进程中,程序的执行也是在不同的线程间进行切换的,每个线程执行程序的不同部分。
  • 例子

    • 多线程使得程序内部可以分出多个线程来做多件事情,而不会造成程序界面卡死。比如迅雷等多线程下载工具就是典型的多线程。一个下载任务进来,迅雷把文件平分成10份,然后开10个线程分别下载。这时主界面是一个单独的线程,并不会因为下载文件而卡死。而且主线程可以控制下属线程,比如某个线程下载缓慢甚至停止,主线程可以把它强行关掉并重启另外一个线程。

    • 另外就是一些程序的打印功能,比如记事本、Adobe Reader,打印的时候就只能打印,无法在主界面进行操作,而Word就有“后台打印”的功能,点了打印命令之后,还可以回到主界面进行修改、保存等操作。

    • 进程是程序在计算机上的一次执行活动。当你运行一个程序,你就启动了一个进程。显然,程序是死的(静态的),进程是活的(动态的)。进程可以分为系统进程和用户进程。凡是用于完成操作系统的各种功能的进程就是系统进程,它们就是处于运行状态下的操作系统本身;用户进程就不必我多讲了吧,所有由你启动的进程都是用户进程。进程是操作系统进行资源分配的单位。

  • 当电脑如果是一个多核的 CPU 的时候,情况可能会有些不同:

多核CPU即多个CPU组成,这些CPU集成在一个芯片里,可以通过内部总线来交互数据,共享数据,这些CPU中分配出一个独立的核执行操作系统,这些CPU通过总线来交互数据,并且工作是并行的,资源分配是由操作系统来完成的,操作系统来决定程序CPU的控制权分配,所以一个多核CPU的工作效率大多体现在操作系统的分配上,因为一个CPU基本上可以执行很多个程序,然后来回跳转,所以当你的CPU核过多时,操作系统在分配时可能会导致部分CPU闲置。

看戏.png

Python 进程池

-当要启动大量子进程时,使用进程池批量创建子进程的方法更常见。这时用Process动态生成多进程时过于麻烦,进程池Pool发挥作用的机会到了
-multiprocessing模块提供了一个Pool类来代表进程池对象
-Pool可以提供指定数量的进程供用户调用,默认大小是CPU的核数。当有新的请求提交到Pool中,如果池还没满,就会创建新的进程,否则就会等待直到池中有进程结束。

注意:Pool对象调用join()方法会等待所有子进程执行完毕,调用join()前必须先调用close(),调用close()后就不能添加新的Process了

实例方法

类的方法

apply(func[, args[, kwds]]):同步进程池

apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池

close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

terminate() : 结束工作进程,不在处理未完成的任务

join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。pool.join()必须使用在

创建进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

def run_task(name):
print("Task %s (pid = %s) is running ..." % (name, os.getpid()))
time.sleep(random.random() * 3)
print("Task %s end" % (name,))
if __name__ == '__main__':
print("Current process %s." % (os.getpid()))
p = Pool(processes=3)
for i in range(5):
p.apply_async(run_task, args=(i,))
print("Waiting for all subprocess")
p.close()
p.join()
print("All subprocess done")


#运行结果
#Current process 11268.
#Waiting for all subprocess
#Task 0 (pid = 15428) is running ...
#Task 1 (pid = 17956) is running ...
#Task 2 (pid = 1324) is running ...
#Task 2 end
#Task 3 (pid = 1324) is running ...
#Task 1 end
#Task 4 (pid = 17956) is running ...
#Task 3 end
#Task 0 end
#Task 4 end
#All subprocess done

同步进程池&异步进程池

仍以上例:

apply_async为异步执行,即不堵塞,当碰到子进程后,主进程说:让我先运行个够,等到操作系统进行进程切换的时候,再交给子进程运行。若没有p.join(),则会出现由于我们的程序太短,还没等到操作系统进行进程切换,主进程就运行完毕了,子进程自然没法运行。

想要子进程执行,就告诉主进程:你等着所有子进程执行完毕后,在运行剩余部分,就是p.join()。

而若改apply_async为apply,则阻塞主进程。主进程开始运行,碰到子进程,操作系统切换到子进程,等待子进程运行结束后,再切换到另外一个子进程,直到所有子进程运行完毕再切换到主进程,运行剩余的部分。

这样的效率明显不高,而且这样和单进程就几乎没啥两样了,所以建议使用apply_async,而不是apply。

Python 多线程

多线程类似于同时执行多个不同的程序,多线程有如下有优点:

  • 可以把运行时间长的程序放到后台去处理
  • 用户界面可以更加吸引人,比如用进度条去显示处理的进度。
  • 可以加速程序的运行
  • 在一些需要等待的任务实现上,如用户输入,文件读写,网络收发数据,线程就会很有用,这种情况下我们可以释放一些珍贵的资源,如内存占用

使用threading模块创建多线程

  • threading模块一般通过两种方式创建多线程:第一种方式是把一个函数传入并创建一个Thread实例,然后调用start方法开始执行;第二种方式是直接从threading.Thread 继承并创建线程类,然后重写init方法和run方法。

例子:创建多线程流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class MyThread(threading.Thread):
def __init__(self, name, urls):
threading.Thread.__init__(self, name=name)
self.urls = urls

def run(self):
print("Current %s is running..." % threading.current_thread().name)
for url in self.urls:
print("%s ---->>> %s" % (threading.current_thread().name, url))
time.sleep(random.randint(1, 3))
print("%s ended." % threading.current_thread().name)
print("%s is running..." % threading.current_thread().name)
t1 = MyThread(name="Thread_1", urls=["url_1", "url_2", "url_3"])
t2 = MyThread(name="Thread_2", urls=["url_4", "url_5", "url_6"])
t1.start()
t2.start()
t1.join()
t2.join()
print("%s ended." % threading.current_thread().name)


#结果
#MainThread is running...
#Current Thread_1 is running...
#Thread_1 ---->>> url_1
#Current Thread_2 is running...
#Thread_2 ---->>> url_4
#Thread_2 ---->>> url_5
#Thread_1 ---->>> url_2
#Thread_2 ---->>> url_6
#Thread_1 ---->>> url_3
#Thread_1 ended.
#Thread_2 ended.
#MainThread ended.

线程同步

  • 如果多个线程共同对某个数据修改,则由于修改的先后可能会导致某次修改被“吞”,出现不可预料的结果,为了保证数据的正确性,需要对多线程进行同步。使用Thread对象的Lock和Rlock可以实现简单的线程同步(线程锁,访问数据时锁死数据防止别的线程修改),这两个对象都有acquire方法(获取锁)和release方法(释放锁),对于每次只允许一个线程操作的数据,可以将其操作放在acquire和release之间

  • 对于Lock对象而言,如果一个线程连续两次进行acquire操作,那么由于第一次acquire之后没有release,第二次acquire将挂起该线程(此时该线程还在等待获取锁),这会导致Lock对象永远不会release,使得线程死锁。

  • Rlock对象允许一个线程多次对其进行acquire操作,因为在其内部有一个counter变量记录acquire的次数,而且每一次acquire操作后必须有个release操作与之对应,在所有的release操作完成后,别的线程才可以申请Rlock对象。

  • 获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try…finally来确保锁一定会被释放。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import threading
mylock = threading.RLock()
num = 0

class MyThread(threading.Thread):

def __init__(self, name):
threading.Thread.__init__(self, name=name)

# 一个线程多次访问锁
def run(self):
global num
while True:
# 获取锁
mylock.acquire()
print("%s locked, number: %d" % (threading.current_thread().name, num))
if num >= 4:
mylock.release()
print("%s released, number: %d" % (threading.current_thread().name, num))
break
num += 1
print("%s released, number: %d" % (threading.current_thread().name, num))
mylock.release()


if __name__ == '__main__':
thread1 = MyThread("thread_1")
thread2 = MyThread("thread_2")
thread2.start()
thread1.start()

Python全局解释锁

  • 在python的原始解释器CPython中存在GIL(Global Interpreter Lock),因此在解释执行Python的代码时,会产生互斥锁(在一个线程修改变量时加锁,则其他线程等待加锁的变量解锁后再执行)来限制线程对共享资源的访问,直到解释器遇到I/O操作或者操作次数达到一定数目时才会释放GIL。由于全局解释锁的存在,在进行多线程操作时不能调用多个CPU内核,只能用一个内核,所以在进行CPU密集操作时不推荐使用多线程,更倾向于使用多进程。

那么多线程适合干啥?

对于I/O密集操作,多线程可以明显提高效率,例如Python爬虫开发,绝大多数时间爬虫是在等待socket返回数据,网络I/O的操作延迟比CPU大很多

GIL 例子1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# -*- coding:utf-8 -*-

from threading import Thread, RLock

lock = RLock()
value = 0


class MyThread(Thread):

def __init__(self):
super().__init__()

def run(self):
global value
for i in range(50000):
value += 1


if __name__ == '__main__':
thread_1 = MyThread()
thread_2 = MyThread()
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
# 结果会是你想象的 value != 100000 吗
print(value)

我们得到的结果是 100000,你可能会很好奇,难道不会因为多线程竞争导致有些加操作无效了吗,这里就是全局解释锁在起作用,确保当前语句在被解释的时候变量被上锁。

很神奇的是,当你修改累加次数更大如5000000时,这个结果每次运行就会不同,我个人觉得这是因为当某个操作达到一定次数的时候,全局解释锁被释放导致竞争中一些加操作被吞了。

另外的一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# -*- coding:utf-8 -*-

import threading
import time
zero = 0
lock = threading.Lock()


def change_zero():
# lock.acquire()
global zero
for i in range(100000):
zero += 1
zero -= 1
# lock.release()


if __name__ == '__main__':
th1 = threading.Thread(target=change_zero)
th2 = threading.Thread(target=change_zero)
th1.start()
th2.start()
th1.join()
th2.join()
# zero == 0 ??? 是这样吗?
print(zero)

结果当然不为 0 而且每次都不一样,这是因为在解释完 zero += 1 后有可能全局解释锁给了别的线程,但是在累加次数比较小的时候结果是0。

python 的这一点和编译性语言完全不同。

GIL 例子2

我们来验证在 gil 的情况下多线程对于 CPU 密集操作和 I/O 密集操作造成了什么影响。

  • CPU 密集操作即需要大量的计算的操作
  • I/O 密集操作即需要频繁的进行输入输出的操作

I/O 密集操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# -*- coding:utf-8 -*-
from threading import Thread
import time


time_1 = 0
time_2 = 0


def my_counter():
for temp in range(5000):
with open("test.txt", 'w+') as fp:
fp.write("Hello, Hello, This is cyx\n")
print("Hello, Hello, This is cyx")
fp.close()


def test1():
thread_array = {}
start_time = time.time()
for tid in range(2):
t = Thread(target=my_counter)
t.start()
t.join() # 模拟单线程
end_time = time.time()
return end_time - start_time


def test2():
thread_array = {}
start_time = time.time()
for tid in range(2):
t = Thread(target=my_counter)
t.start()
thread_array[tid] = t
for i in range(2):
thread_array[i].join()
end_time = time.time()
return end_time - start_time


if __name__ == '__main__':
time_1 = test1()
time_2 = test2()
print("test1: Total time= {}".format(time_1))
print("test2: Total time= {}".format(time_2))

执行以下可以发现,2线程的执行时间基本达到了单线程的一半时间,可以看出相当的有效

而对于 CPU 密集操作就不是那么有效了,稍微修改一下例子

CPU 密集操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# -*- coding:utf-8 -*-
from threading import Thread
import time


def my_counter():
i = 0
for temp in range(1000000):
i = i + 1


def test1():
thread_array = {}
start_time = time.time()
for tid in range(2):
t = Thread(target=my_counter)
t.start()
t.join() # 模拟单线程
end_time = time.time()
print("Total time: {}".format(end_time - start_time))


def test2():
thread_array = {}
start_time = time.time()
for tid in range(2):
t = Thread(target=my_counter)
t.start()
thread_array[tid] = t

for i in range(2):
thread_array[i].join()
end_time = time.time()
print("Total time: {}".format(end_time - start_time))


if __name__ == '__main__':
test1()
test2()

运行可以看到,2线程相对于单线程的时间没有减少反而增多了,因为 GIL 上锁的机制导致并没有提高很大的速度(很有可能还减慢了速度)

所以,最好使用多线程的方式还是读写文件等一些 I/O 操作者,这样才可以充分发挥多线程的优势。

Author

Ctwo

Posted on

2020-10-25

Updated on

2020-10-25

Licensed under

Comments