Python平行化程式設計小筆記

平行化程式設計分兩種multi-threading跟multi-processing,前者是一個process裡面有多個thread在跑,thread間共用code section等資料,register和stack不共用,多個processes則是連code section都不共用。

multiprocessing每個都可以指派到不同的CPU上,所以適合效能限制在CPU的情況;multi-threading會在同一個,所以適合效能限制在IO的情況。

平行化設計有兩個重點:

  • 如何處理race condition
  • 如何process或thread間如何共用資料

race condition處理方式就是對於可能會產生競爭的地方加鎖,常用鎖有幾種

  • semaphore: 對資源計數,有幾個用幾個
  • mutex lock: sleep waiting 有人在用資源就睡覺等待
  • Spin lock : busy waiting 有人在用資源就一直要要到得到為止

其它還有很好用的方法像是Event,也可以做為條件式控制process或thread

另外一個重點,如何共用資料
在python裡是透過Manager, Pipe, Value, Array這幾個

  • Manager是統一管理
  • Pipe是連線收發機制
    其它是比較單一共享結構的方法

但程式一下子分出很多個平行化程式可能會很難管理,它們會互相打架
這時候就需要Fork-join模型,需要平行化的地方fork出process平行處理,處理完再join回一個process

範例:

1
2
3
4
5
6
7
8
from multiprocessing import Pool

def f(x):
return x*x

if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import Process
import os

def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())

def f(name):
info('function f')
print('hello', name)

if __name__ == '__main__':
info('main line')
p = Process(target=f, args=("GG", ))
p.start()
p.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Process, Lock
import time
def f(l, i):
if i == 5:
time.sleep(2)
l.acquire()

try:
print("hello", i)
finally:
l.release()

if __name__ == "__main__":
lock = Lock()

for num in range(10):
Process(target=f, args=(lock, num)).start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Process, Value, Array

def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]

if __name__ == "__main__":
num = Value("d", 0.0)
arr = Array("i", range(1))

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print(num.value)
print(arr[:])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Process, Pipe
import os

def func(conn):
conn.send("Hi I'm your subprocess. My ID is {}".format(os.getpid()))
print("ID {} receive main_process message: ".format(os.getpid()), conn.recv())
conn.close()

if __name__ == "__main__":
main_conn, sub_conn = Pipe()
process_list = []

for i in range(2):
proc = Process(target=func, args=(sub_conn,))
process_list.append(proc)
proc.start()
print("I'm main process, receive sub process message: ", main_conn.recv())
main_conn.send("Remember I'm your Master")

for each_process in process_list:
each_process.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process, Manager

def func(dic, my_list, n):
dic["Process_{}".format(n)] = "1"
dic["2"] = 2
dic[0.25] = None
my_list.append(n)

if __name__ == "__main__":
with Manager() as manager:
dic = manager.dict()
my_list = manager.list(range(5))

process_list = []
for i in range(10):
proc = Process(target=func, args=(dic, my_list, i))
proc.start()
process_list.append(proc)

for each_process in process_list:
each_process.join()

print(dic)
print(my_list)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from time import sleep
from random import random
from multiprocessing import Process, Event

def task(event, number):
print(f"Process {number} waiting...", flush=True)
event.wait()
value = random()
sleep(value)
print(f"Process {number} got {value}", flush=True)

if __name__ == "__main__":

event = Event()
processes = [Process(target=task, args=(event, i)) for i in range(5)]

for process in processes:
process.start()

print("main process blocking")
sleep(2)

event.set()

for process in processes:
process.join()

1
2
3
4
5
6
7
8
9
10
11
12
13
import multiprocessing, time, signal

p = multiprocessing.Process(target=time.sleep, args=(1000, ))
print(p, p.is_alive())

p.start()
print(p, p.is_alive())

p.terminate()
time.sleep(0.1)
print(p, p.is_alive())

print(p.exitcode == -signal.SIGTERM)
1
2
3
4
5
6
7
8
9
10
11
12
import multiprocessing as mp

def foo(q):
q.put('hello')

if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
return x*x

if __name__ == "__main__":
with Pool(processes=4) as pool:

print(pool.map(f, range(10)))

for i in pool.imap_unordered(f, range(10)):
print(i)

res = pool.map_async(f, range(10))
print(res.get(timeout=1))

res = pool.apply_async(f, (20, ))
print(res.get(timeout=1))

res = pool.apply_async(os.getpid, ())
print(res.get(timeout=1))

multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])

res = pool.apply_async(time.sleep, (10, ))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a Timeout Err")

print("work")

print("pool end")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import abc
import threading
from time import sleep
from random import random
from typing import Final

class MagicAPI():

def __init__(self):
pass

def buy_or_sell(self):
return "buy" if random() > 0.5 else "sell"

class TradingSystem(abc.ABC):

def __init__(self, api: MagicAPI, symbol: str, time_frame: int,
system_id: int, system_label: str) -> None:

# init api...
self.api = api
self.symbol = symbol
self.time_frame = time_frame
self.system_id = system_id
self.system_label = system_label
thread = threading.Thread(target=self.system_loop)
self.thread = thread
print(f"thread {self.system_id} start", flush=True)
thread.start()

@abc.abstractmethod
def place_buy_order(self):
pass

@abc.abstractmethod
def place_sell_order(self):
pass

@abc.abstractmethod
def system_loop(self):
pass

class GeniusTradingSystem(TradingSystem):

def __init__(self, system_id: int):
super().__init__(MagicAPI(), "IBM", 5, system_id, "SMART_AI_TS")

def place_buy_order(self):
print(f"ID {self.system_id}: buy", flush=True)

def place_sell_order(self):
print(f"ID {self.system_id}: sell", flush=True)

def system_loop(self):

while True:

buy_or_sell: str = self.api.buy_or_sell()
if "buy" == buy_or_sell:
self.place_buy_order()
elif "sell" == buy_or_sell:
self.place_sell_order()

sleep(self.time_frame)

if __name__ == "__main__":

NUMBER_OF_TRADING_SYSTEM: Final[int] = 3

trading_systems = [GeniusTradingSystem(i) for i in range(NUMBER_OF_TRADING_SYSTEM)]

for i in range(NUMBER_OF_TRADING_SYSTEM):
trading_systems[i].thread.join()