Contents
hide
Simple process example
#!/usr/bin/python from multiprocessing import Process def fun(name): print(f'hello {name}') def main(): p = Process(target=fun, args=('Peter',)) p.start() if __name__ == '__main__': main()
Now create a new process and pass the value.
def fun(name):
print(f'hello {name}')
Function prints a passed parameter.
def main():
p = Process(target=fun, args=('Peter',))
p.start()
New process has been created. The args are provides the data to passed. The multiprocessing code has been placed insid main guard.
if __name__ == '__main__':
main()
Python Multiprocessing Join
#!/usr/bin/python from multiprocessing import Process import time def fun(): print('starting fun') time.sleep(2) print('finishing fun') def main(): p = Process(target=fun) p.start() p.join() if __name__ == '__main__': print('starting main') main() print('finishing main')
OUTPUT
starting main
starting fun
finishing fun
finishing main
It’s important to call’s the join methods of multiprocesing, after the start methods has been created.
#!/usr/bin/python from multiprocessing import Process import time def fun(val): print(f'starting fun with {val} s') time.sleep(val) print(f'finishing fun with {val} s') def main(): p1 = Process(target=fun, args=(3, )) p1.start() # p1.join() p2 = Process(target=fun, args=(2, )) p2.start() # p2.join() p3 = Process(target=fun, args=(1, )) p3.start() # p3.join() p1.join() p2.join() p3.join() print('finished main') if __name__ == '__main__': main()
Python Multiprocessing is_alive
#!/usr/bin/python from multiprocessing import Process import time def fun(): print('calling fun') time.sleep(2) def main(): print('main fun') p = Process(target=fun) p.start() p.join() print(f'Process p is alive: {p.is_alive()}') if __name__ == '__main__': main()
Python Multiprocessing Process Id
#!/usr/bin/python from multiprocessing import Process import os def fun(): print('--------------------------') print('calling fun') print('parent process id:', os.getppid()) print('process id:', os.getpid()) def main(): print('main fun') print('process id:', os.getpid()) p1 = Process(target=fun) p1.start() p1.join() p2 = Process(target=fun) p2.start() p2.join() if __name__ == '__main__': main()
OUTPUT
main fun process id: 7605 -------------------------- calling fun parent process id: 7605 process id: 7606 -------------------------- calling fun parent process id: 7605 process id: 7607
Naming processes
#!/usr/bin/python from multiprocessing import Process, current_process import time def worker(): name = current_process().name print(name, 'Starting') time.sleep(2) print(name, 'Exiting') def service(): name = current_process().name print(name, 'Starting') time.sleep(3) print(name, 'Exiting') if __name__ == '__main__': service = Process(name='Service 1', target=service) worker1 = Process(name='Worker 1', target=worker) worker2 = Process(target=worker) # use default name worker1.start() worker2.start() service.start()
OUTPUT
Worker 1 Starting Process-3 Starting Service 1 Starting Worker 1 Exiting Process-3 Exiting Service 1 Exiting
Subclassing Process
#!/usr/bin/python import time from multiprocessing import Process class Worker(Process): def run(self): print(f'In {self.name}') time.sleep(2) def main(): worker = Worker() worker.start() worker2 = Worker() worker2.start() worker.join() worker2.join() if __name__ == '__main__': main()
Python multiprocessing Pool
#!/usr/bin/python import time from timeit import default_timer as timer from multiprocessing import Pool, cpu_count def square(n): time.sleep(2) return n * n def main(): start = timer() print(f'starting computations on {cpu_count()} cores') values = (2, 4, 6, 8) with Pool() as pool: res = pool.map(square, values) print(res) end = timer() print(f'elapsed time: {end - start}') if __name__ == '__main__': main()
OUTPUT
starting computations on 4 cores [4, 16, 36, 64] elapsed time: 2.0256662130013865
Multiple arguments
#!/usr/bin/python import time from timeit import default_timer as timer from multiprocessing import Pool, cpu_count def power(x, n): time.sleep(1) return x ** n def main(): start = timer() print(f'starting computations on {cpu_count()} cores') values = ((2, 2), (4, 3), (5, 5)) with Pool() as pool: res = pool.starmap(power, values) print(res) end = timer() print(f'elapsed time: {end - start}') if __name__ == '__main__': main()
OUTPUT
starting computations on 4 cores
[4, 64, 3125]
elapsed time: 1.0230950259974634
Multiple functions
#!/usr/bin/python from multiprocessing import Pool import functools def inc(x): return x + 1 def dec(x): return x - 1 def add(x, y): return x + y def smap(f): return f() def main(): f_inc = functools.partial(inc, 4) f_dec = functools.partial(dec, 2) f_add = functools.partial(add, 3, 4) with Pool() as pool: res = pool.map(smap, [f_inc, f_dec, f_add]) print(res) if __name__ == '__main__': main()
OUTPUT
[5, 1, 7]
Python Multiprocessing π calculation
#!/usr/bin/python from decimal import Decimal, getcontext from timeit import default_timer as timer def pi(precision): getcontext().prec = precision return sum(1/Decimal(16)**k * (Decimal(4)/(8*k+1) - Decimal(2)/(8*k+4) - Decimal(1)/(8*k+5) - Decimal(1)/(8*k+6)) for k in range (precision)) start = timer() values = (1000, 1500, 2000) data = list(map(pi, values)) print(data) end = timer() print(f'sequentially: {end - start}')
OUTPUT
sequentially: 0.5738053179993585
Below the following example of we use pool of processes to be calculate three approximations.
#!/usr/bin/python from decimal import Decimal, getcontext from timeit import default_timer as timer from multiprocessing import Pool, current_process import time def pi(precision): getcontext().prec=precision return sum(1/Decimal(16)**k * (Decimal(4)/(8*k+1) - Decimal(2)/(8*k+4) - Decimal(1)/(8*k+5) - Decimal(1)/(8*k+6)) for k in range (precision)) def main(): start = timer() with Pool(3) as pool: values = (1000, 1500, 2000) data = pool.map(pi, values) print(data) end = timer() print(f'paralelly: {end - start}') if __name__ == '__main__': main()
OUTPUT
paralelly: 0.38216479000038817
Separate memory in a process
#!/usr/bin/python from multiprocessing import Process, current_process data = [1, 2] def fun(): global data data.extend((3, 4, 5)) print(f'Result in {current_process().name}: {data}') def main(): worker = Process(target=fun) worker.start() worker.join() print(f'Result in main: {data}') if __name__ == '__main__': main()
OUTPUT
Result in Process-1: [1, 2, 3, 4, 5]
Result in main: [1, 2]
Sharing state between processes
#!/usr/bin/python from multiprocessing import Process, Value from time import sleep def f(counter): sleep(1) with counter.get_lock(): counter.value += 1 print(f'Counter: {counter.value}') def main(): counter = Value('i', 0) processes = [Process(target=f, args=(counter, )) for _ in range(30)] for p in processes: p.start() for p in processes: p.join() if __name__ == '__main__': main()
Message passing with queues
#!/usr/bin/python from multiprocessing import Process, Queue import random def rand_val(queue): num = random.random() queue.put(num) def main(): queue = Queue() processes = [Process(target=rand_val, args=(queue,)) for _ in range(4)] for p in processes: p.start() for p in processes: p.join() results = [queue.get() for _ in processes] print(results) if __name__ == "__main__": main()
#!/usr/bin/python from multiprocessing import Queue, Process, current_process def worker(queue): name = current_process().name print(f'{name} data received: {queue.get()}') def main(): queue = Queue() queue.put("wood") queue.put("sky") queue.put("cloud") queue.put("ocean") processes = [Process(target=worker, args=(queue,)) for _ in range(4)] for p in processes: p.start() for p in processes: p.join() if __name__ == "__main__": main()
OUTPUT
Process-1 data received: wood
Process-2 data received: sky
Process-3 data received: cloud
Process-4 data received: ocean
Queue order
#!/usr/bin/python from multiprocessing import Process, Queue import time import random def square(idx, x, queue): time.sleep(random.randint(1, 3)) queue.put((idx, x * x)) def main(): data = [2, 4, 6, 3, 5, 8, 9, 7] queue = Queue() processes = [Process(target=square, args=(idx, val, queue)) for idx, val in enumerate(data)] for p in processes: p.start() for p in processes: p.join() unsorted_result = [queue.get() for _ in processes] result = [val[1] for val in sorted(unsorted_result)] print(result) if __name__ == '__main__': main()