Docs
README
14 - Concurrency
๐ What You'll Learn
- โขThreading
- โขMultiprocessing
- โขAsync/await (asyncio)
- โขWhen to use each approach
- โขGIL (Global Interpreter Lock)
- โขThread safety
โก Why Concurrency?
Concurrency allows your program to handle multiple tasks efficiently:
- โขI/O-bound tasks: Network requests, file operations (use threading/asyncio)
- โขCPU-bound tasks: Heavy calculations (use multiprocessing)
๐ The GIL (Global Interpreter Lock)
Python's GIL allows only one thread to execute Python code at a time.
- โขThreading: Good for I/O-bound tasks (waiting for network, disk)
- โขMultiprocessing: Good for CPU-bound tasks (calculations)
- โขAsyncio: Good for many I/O-bound tasks
๐งต Threading
Basic Thread
import threading
import time
def worker(name):
print(f"Worker {name} starting")
time.sleep(2)
print(f"Worker {name} done")
# Create threads
thread1 = threading.Thread(target=worker, args=("A",))
thread2 = threading.Thread(target=worker, args=("B",))
# Start threads
thread1.start()
thread2.start()
# Wait for completion
thread1.join()
thread2.join()
print("All done!")
Thread with Return Value
import threading
results = {}
def fetch_data(url, result_key):
# Simulate fetch
time.sleep(1)
results[result_key] = f"Data from {url}"
threads = []
urls = ["url1", "url2", "url3"]
for i, url in enumerate(urls):
t = threading.Thread(target=fetch_data, args=(url, i))
threads.append(t)
t.start()
for t in threads:
t.join()
print(results)
ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_url(url):
time.sleep(1) # Simulate network delay
return f"Content from {url}"
urls = ["url1", "url2", "url3", "url4"]
# Using context manager
with ThreadPoolExecutor(max_workers=3) as executor:
# Submit all tasks
futures = [executor.submit(fetch_url, url) for url in urls]
# Get results
for future in futures:
print(future.result())
# Or use map
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch_url, urls))
print(results)
Thread Safety - Lock
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # Thread-safe
counter += 1
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Counter: {counter}") # 500000
๐ Multiprocessing
Basic Process
from multiprocessing import Process
import os
def worker(name):
print(f"Worker {name}, PID: {os.getpid()}")
if __name__ == "__main__":
processes = []
for i in range(4):
p = Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import math
def heavy_calculation(n):
"""CPU-intensive task"""
return sum(math.factorial(i) for i in range(n))
if __name__ == "__main__":
numbers = [100, 200, 300, 400]
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(heavy_calculation, numbers))
print(results)
Sharing Data Between Processes
from multiprocessing import Process, Value, Array, Manager
# Shared value
def increment_counter(counter):
for _ in range(1000):
with counter.get_lock():
counter.value += 1
if __name__ == "__main__":
counter = Value('i', 0) # 'i' = integer
processes = [Process(target=increment_counter, args=(counter,))
for _ in range(4)]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Counter: {counter.value}")
# Using Manager for complex objects
def add_to_list(shared_list, item):
shared_list.append(item)
if __name__ == "__main__":
with Manager() as manager:
shared_list = manager.list()
processes = [Process(target=add_to_list, args=(shared_list, i))
for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print(list(shared_list))
๐ Asyncio (async/await)
Basic Async
import asyncio
async def say_hello(name, delay):
await asyncio.sleep(delay) # Non-blocking sleep
print(f"Hello, {name}!")
async def main():
# Run concurrently
await asyncio.gather(
say_hello("Alice", 2),
say_hello("Bob", 1),
say_hello("Charlie", 3),
)
# Run the event loop
asyncio.run(main())
Async Task Creation
import asyncio
async def fetch_data(url):
print(f"Fetching {url}...")
await asyncio.sleep(1)
return f"Data from {url}"
async def main():
# Create tasks
task1 = asyncio.create_task(fetch_data("url1"))
task2 = asyncio.create_task(fetch_data("url2"))
task3 = asyncio.create_task(fetch_data("url3"))
# Wait for all
results = await asyncio.gather(task1, task2, task3)
print(results)
asyncio.run(main())
Async with Timeout
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "Done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())
Async Context Manager
import asyncio
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource...")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource...")
await asyncio.sleep(1)
async def do_something(self):
print("Doing something...")
await asyncio.sleep(1)
async def main():
async with AsyncResource() as resource:
await resource.do_something()
asyncio.run(main())
Async Iterator
import asyncio
class AsyncCounter:
def __init__(self, stop):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.5)
self.current += 1
return self.current
async def main():
async for num in AsyncCounter(5):
print(num)
asyncio.run(main())
๐ Queue-Based Concurrency
Threading Queue
import threading
import queue
import time
def producer(q):
for i in range(5):
q.put(f"item-{i}")
time.sleep(0.5)
q.put(None) # Signal end
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"Processing: {item}")
q.task_done()
q = queue.Queue()
producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
Asyncio Queue
import asyncio
async def producer(queue):
for i in range(5):
await queue.put(f"item-{i}")
await asyncio.sleep(0.5)
await queue.put(None)
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Processing: {item}")
async def main():
queue = asyncio.Queue()
await asyncio.gather(
producer(queue),
consumer(queue),
)
asyncio.run(main())
๐ Comparison
| Feature | Threading | Multiprocessing | Asyncio |
|---|---|---|---|
| Best for | I/O-bound | CPU-bound | Many I/O ops |
| GIL | Yes | No (separate processes) | Yes |
| Memory | Shared | Separate | Shared |
| Overhead | Low | High | Very low |
| Complexity | Medium | Medium | Higher |
๐ฏ When to Use What
Use Threading when:
- โขWaiting for I/O (network, files)
- โขNeed shared memory
- โขTasks are I/O-bound
Use Multiprocessing when:
- โขCPU-intensive calculations
- โขNeed true parallelism
- โขTasks are CPU-bound
Use Asyncio when:
- โขMany concurrent I/O operations
- โขBuilding network services
- โขNeed high concurrency
๐ Summary
# Threading
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
results = executor.map(io_bound_function, items)
# Multiprocessing
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
results = executor.map(cpu_bound_function, items)
# Asyncio
async def main():
await asyncio.gather(*[async_function(x) for x in items])
asyncio.run(main())
๐ฏ Next Steps
After mastering concurrency, proceed to 15_database to learn about database operations with SQLite and SQLAlchemy!