平行化程式設計分兩種multi-threading跟multi-processing,前者是一個process裡面有多個thread在跑,thread間共用code section等資料,register和stack不共用,多個processes則是連code section都不共用。
multiprocessing每個都可以指派到不同的CPU上,所以適合效能限制在CPU的情況;multi-threading會在同一個,所以適合效能限制在IO的情況。
平行化設計有兩個重點:
如何處理race condition
如何process或thread間如何共用資料
race condition處理方式就是對於可能會產生競爭的地方加鎖,常用鎖有幾種
semaphore: 對資源計數,有幾個用幾個
mutex lock: sleep waiting 有人在用資源就睡覺等待
Spin lock : busy waiting 有人在用資源就一直要要到得到為止
其它還有很好用的方法像是Event,也可以做為條件式控制process或thread
另外一個重點,如何共用資料 在python裡是透過Manager, Pipe, Value, Array這幾個
Manager是統一管理
Pipe是連線收發機制 其它是比較單一共享結構的方法
但程式一下子分出很多個平行化程式可能會很難管理,它們會互相打架 這時候就需要Fork-join模型,需要平行化的地方fork出process平行處理,處理完再join回一個process
範例:
1 2 3 4 5 6 7 8 from multiprocessing import Pooldef f (x ): return x*x if __name__ == '__main__' : with Pool(5 ) as p: print (p.map (f, [1 , 2 , 3 ]))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from multiprocessing import Processimport osdef info (title ): print (title) print ('module name:' , __name__) print ('parent process:' , os.getppid()) print ('process id:' , os.getpid()) def f (name ): info('function f' ) print ('hello' , name) if __name__ == '__main__' : info('main line' ) p = Process(target=f, args=("GG" , )) p.start() p.join()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from multiprocessing import Process, Lockimport timedef f (l, i ): if i == 5 : time.sleep(2 ) l.acquire() try : print ("hello" , i) finally : l.release() if __name__ == "__main__" : lock = Lock() for num in range (10 ): Process(target=f, args=(lock, num)).start()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from multiprocessing import Process, Value, Arraydef f (n, a ): n.value = 3.1415927 for i in range (len (a)): a[i] = -a[i] if __name__ == "__main__" : num = Value("d" , 0.0 ) arr = Array("i" , range (1 )) p = Process(target=f, args=(num, arr)) p.start() p.join() print (num.value) print (arr[:])
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from multiprocessing import Process, Pipeimport osdef func (conn ): conn.send("Hi I'm your subprocess. My ID is {}" .format (os.getpid())) print ("ID {} receive main_process message: " .format (os.getpid()), conn.recv()) conn.close() if __name__ == "__main__" : main_conn, sub_conn = Pipe() process_list = [] for i in range (2 ): proc = Process(target=func, args=(sub_conn,)) process_list.append(proc) proc.start() print ("I'm main process, receive sub process message: " , main_conn.recv()) main_conn.send("Remember I'm your Master" ) for each_process in process_list: each_process.join()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from multiprocessing import Process, Managerdef func (dic, my_list, n ): dic["Process_{}" .format (n)] = "1" dic["2" ] = 2 dic[0.25 ] = None my_list.append(n) if __name__ == "__main__" : with Manager() as manager: dic = manager.dict () my_list = manager.list (range (5 )) process_list = [] for i in range (10 ): proc = Process(target=func, args=(dic, my_list, i)) proc.start() process_list.append(proc) for each_process in process_list: each_process.join() print (dic) print (my_list)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from time import sleepfrom random import randomfrom multiprocessing import Process, Eventdef task (event, number ): print (f"Process {number} waiting..." , flush=True ) event.wait() value = random() sleep(value) print (f"Process {number} got {value} " , flush=True ) if __name__ == "__main__" : event = Event() processes = [Process(target=task, args=(event, i)) for i in range (5 )] for process in processes: process.start() print ("main process blocking" ) sleep(2 ) event.set () for process in processes: process.join()
1 2 3 4 5 6 7 8 9 10 11 12 13 import multiprocessing, time, signalp = multiprocessing.Process(target=time.sleep, args=(1000 , )) print (p, p.is_alive())p.start() print (p, p.is_alive())p.terminate() time.sleep(0.1 ) print (p, p.is_alive())print (p.exitcode == -signal.SIGTERM)
1 2 3 4 5 6 7 8 9 10 11 12 import multiprocessing as mpdef foo (q ): q.put('hello' ) if __name__ == '__main__' : mp.set_start_method('spawn' ) q = mp.Queue() p = mp.Process(target=foo, args=(q,)) p.start() print (q.get()) p.join()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 from multiprocessing import Pool, TimeoutErrorimport timeimport osdef f (x ): return x*x if __name__ == "__main__" : with Pool(processes=4 ) as pool: print (pool.map (f, range (10 ))) for i in pool.imap_unordered(f, range (10 )): print (i) res = pool.map_async(f, range (10 )) print (res.get(timeout=1 )) res = pool.apply_async(f, (20 , )) print (res.get(timeout=1 )) res = pool.apply_async(os.getpid, ()) print (res.get(timeout=1 )) multiple_results = [pool.apply_async(os.getpid, ()) for i in range (4 )] print ([res.get(timeout=1 ) for res in multiple_results]) res = pool.apply_async(time.sleep, (10 , )) try : print (res.get(timeout=1 )) except TimeoutError: print ("We lacked patience and got a Timeout Err" ) print ("work" ) print ("pool end" )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 import abcimport threadingfrom time import sleepfrom random import randomfrom typing import Finalclass MagicAPI (): def __init__ (self ): pass def buy_or_sell (self ): return "buy" if random() > 0.5 else "sell" class TradingSystem (abc.ABC): def __init__ (self, api: MagicAPI, symbol: str , time_frame: int , system_id: int , system_label: str ) -> None : self.api = api self.symbol = symbol self.time_frame = time_frame self.system_id = system_id self.system_label = system_label thread = threading.Thread(target=self.system_loop) self.thread = thread print (f"thread {self.system_id} start" , flush=True ) thread.start() @abc.abstractmethod def place_buy_order (self ): pass @abc.abstractmethod def place_sell_order (self ): pass @abc.abstractmethod def system_loop (self ): pass class GeniusTradingSystem (TradingSystem ): def __init__ (self, system_id: int ): super ().__init__(MagicAPI(), "IBM" , 5 , system_id, "SMART_AI_TS" ) def place_buy_order (self ): print (f"ID {self.system_id} : buy" , flush=True ) def place_sell_order (self ): print (f"ID {self.system_id} : sell" , flush=True ) def system_loop (self ): while True : buy_or_sell: str = self.api.buy_or_sell() if "buy" == buy_or_sell: self.place_buy_order() elif "sell" == buy_or_sell: self.place_sell_order() sleep(self.time_frame) if __name__ == "__main__" : NUMBER_OF_TRADING_SYSTEM: Final[int ] = 3 trading_systems = [GeniusTradingSystem(i) for i in range (NUMBER_OF_TRADING_SYSTEM)] for i in range (NUMBER_OF_TRADING_SYSTEM): trading_systems[i].thread.join()