Docs

concurrency

14 - Concurrency

๐Ÿ“Œ What You'll Learn

  • โ€ขThreading
  • โ€ขMultiprocessing
  • โ€ขAsync/await (asyncio)
  • โ€ขWhen to use each approach
  • โ€ขGIL (Global Interpreter Lock)
  • โ€ขThread safety

โšก Why Concurrency?

Concurrency allows your program to handle multiple tasks efficiently:

  • โ€ขI/O-bound tasks: Network requests, file operations (use threading/asyncio)
  • โ€ขCPU-bound tasks: Heavy calculations (use multiprocessing)

๐Ÿ”’ The GIL (Global Interpreter Lock)

Python's GIL allows only one thread to execute Python code at a time.

  • โ€ขThreading: Good for I/O-bound tasks (waiting for network, disk)
  • โ€ขMultiprocessing: Good for CPU-bound tasks (calculations)
  • โ€ขAsyncio: Good for many I/O-bound tasks

๐Ÿงต Threading

Basic Thread

import threading
import time

def worker(name):
    print(f"Worker {name} starting")
    time.sleep(2)
    print(f"Worker {name} done")

# Create threads
thread1 = threading.Thread(target=worker, args=("A",))
thread2 = threading.Thread(target=worker, args=("B",))

# Start threads
thread1.start()
thread2.start()

# Wait for completion
thread1.join()
thread2.join()

print("All done!")

Thread with Return Value

import threading

results = {}

def fetch_data(url, result_key):
    # Simulate fetch
    time.sleep(1)
    results[result_key] = f"Data from {url}"

threads = []
urls = ["url1", "url2", "url3"]

for i, url in enumerate(urls):
    t = threading.Thread(target=fetch_data, args=(url, i))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(results)

ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor
import time

def fetch_url(url):
    time.sleep(1)  # Simulate network delay
    return f"Content from {url}"

urls = ["url1", "url2", "url3", "url4"]

# Using context manager
with ThreadPoolExecutor(max_workers=3) as executor:
    # Submit all tasks
    futures = [executor.submit(fetch_url, url) for url in urls]

    # Get results
    for future in futures:
        print(future.result())

# Or use map
with ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(fetch_url, urls))
    print(results)

Thread Safety - Lock

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:  # Thread-safe
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(5)]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Counter: {counter}")  # 500000

๐Ÿ”„ Multiprocessing

Basic Process

from multiprocessing import Process
import os

def worker(name):
    print(f"Worker {name}, PID: {os.getpid()}")

if __name__ == "__main__":
    processes = []
    for i in range(4):
        p = Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import math

def heavy_calculation(n):
    """CPU-intensive task"""
    return sum(math.factorial(i) for i in range(n))

if __name__ == "__main__":
    numbers = [100, 200, 300, 400]

    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(heavy_calculation, numbers))

    print(results)

Sharing Data Between Processes

from multiprocessing import Process, Value, Array, Manager

# Shared value
def increment_counter(counter):
    for _ in range(1000):
        with counter.get_lock():
            counter.value += 1

if __name__ == "__main__":
    counter = Value('i', 0)  # 'i' = integer

    processes = [Process(target=increment_counter, args=(counter,))
                 for _ in range(4)]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"Counter: {counter.value}")

# Using Manager for complex objects
def add_to_list(shared_list, item):
    shared_list.append(item)

if __name__ == "__main__":
    with Manager() as manager:
        shared_list = manager.list()

        processes = [Process(target=add_to_list, args=(shared_list, i))
                     for i in range(5)]

        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print(list(shared_list))

๐Ÿ”€ Asyncio (async/await)

Basic Async

import asyncio

async def say_hello(name, delay):
    await asyncio.sleep(delay)  # Non-blocking sleep
    print(f"Hello, {name}!")

async def main():
    # Run concurrently
    await asyncio.gather(
        say_hello("Alice", 2),
        say_hello("Bob", 1),
        say_hello("Charlie", 3),
    )

# Run the event loop
asyncio.run(main())

Async Task Creation

import asyncio

async def fetch_data(url):
    print(f"Fetching {url}...")
    await asyncio.sleep(1)
    return f"Data from {url}"

async def main():
    # Create tasks
    task1 = asyncio.create_task(fetch_data("url1"))
    task2 = asyncio.create_task(fetch_data("url2"))
    task3 = asyncio.create_task(fetch_data("url3"))

    # Wait for all
    results = await asyncio.gather(task1, task2, task3)
    print(results)

asyncio.run(main())

Async with Timeout

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "Done"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2)
    except asyncio.TimeoutError:
        print("Operation timed out!")

asyncio.run(main())

Async Context Manager

import asyncio

class AsyncResource:
    async def __aenter__(self):
        print("Acquiring resource...")
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Releasing resource...")
        await asyncio.sleep(1)

    async def do_something(self):
        print("Doing something...")
        await asyncio.sleep(1)

async def main():
    async with AsyncResource() as resource:
        await resource.do_something()

asyncio.run(main())

Async Iterator

import asyncio

class AsyncCounter:
    def __init__(self, stop):
        self.current = 0
        self.stop = stop

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.current >= self.stop:
            raise StopAsyncIteration
        await asyncio.sleep(0.5)
        self.current += 1
        return self.current

async def main():
    async for num in AsyncCounter(5):
        print(num)

asyncio.run(main())

๐Ÿ”„ Queue-Based Concurrency

Threading Queue

import threading
import queue
import time

def producer(q):
    for i in range(5):
        q.put(f"item-{i}")
        time.sleep(0.5)
    q.put(None)  # Signal end

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Processing: {item}")
        q.task_done()

q = queue.Queue()
producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

Asyncio Queue

import asyncio

async def producer(queue):
    for i in range(5):
        await queue.put(f"item-{i}")
        await asyncio.sleep(0.5)
    await queue.put(None)

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Processing: {item}")

async def main():
    queue = asyncio.Queue()

    await asyncio.gather(
        producer(queue),
        consumer(queue),
    )

asyncio.run(main())

๐Ÿ“Š Comparison

FeatureThreadingMultiprocessingAsyncio
Best forI/O-boundCPU-boundMany I/O ops
GILYesNo (separate processes)Yes
MemorySharedSeparateShared
OverheadLowHighVery low
ComplexityMediumMediumHigher

๐ŸŽฏ When to Use What

Use Threading when:

  • โ€ขWaiting for I/O (network, files)
  • โ€ขNeed shared memory
  • โ€ขTasks are I/O-bound

Use Multiprocessing when:

  • โ€ขCPU-intensive calculations
  • โ€ขNeed true parallelism
  • โ€ขTasks are CPU-bound

Use Asyncio when:

  • โ€ขMany concurrent I/O operations
  • โ€ขBuilding network services
  • โ€ขNeed high concurrency

๐Ÿ“‹ Summary

# Threading
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
    results = executor.map(io_bound_function, items)

# Multiprocessing
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    results = executor.map(cpu_bound_function, items)

# Asyncio
async def main():
    await asyncio.gather(*[async_function(x) for x in items])
asyncio.run(main())

๐ŸŽฏ Next Steps

After mastering concurrency, proceed to 15_database to learn about database operations with SQLite and SQLAlchemy!

Concurrency - Python Tutorial | DeepML