python

exercises

exercises.py🐍
"""
14 - Concurrency: Exercises
Practice threading, multiprocessing, and asyncio.
"""

print("=" * 60)
print("CONCURRENCY EXERCISES")
print("=" * 60)

import threading
import time
import queue
from concurrent.futures import ThreadPoolExecutor
import asyncio

# =============================================================================
# EXERCISE 1: Basic Threading
# Create 3 threads that print numbers 1-5 with their thread name.
# =============================================================================
print("\n--- Exercise 1: Basic Threading ---")

# Your code here:


# =============================================================================
# EXERCISE 2: Thread-Safe Counter
# Create a thread-safe counter class with increment/decrement/get methods.
# =============================================================================
print("\n--- Exercise 2: Thread-Safe Counter ---")

# Your code here:


# Test:
# counter = ThreadSafeCounter()
# # Multiple threads incrementing
# print(counter.get())


# =============================================================================
# EXERCISE 3: Producer-Consumer with Queue
# Implement a producer that adds items and consumers that process them.
# =============================================================================
print("\n--- Exercise 3: Producer-Consumer ---")

# Your code here:


# =============================================================================
# EXERCISE 4: ThreadPoolExecutor
# Use ThreadPoolExecutor to download multiple URLs concurrently.
# =============================================================================
print("\n--- Exercise 4: ThreadPoolExecutor ---")

# Your code here:


# Test:
# urls = ["url1", "url2", "url3"]
# results = download_all(urls)


# =============================================================================
# EXERCISE 5: Async Basic
# Create async functions that simulate API calls and run them concurrently.
# =============================================================================
print("\n--- Exercise 5: Async Basic ---")

# Your code here:


# =============================================================================
# EXERCISE 6: Async with Timeout
# Implement an async function that handles timeouts gracefully.
# =============================================================================
print("\n--- Exercise 6: Async Timeout ---")

# Your code here:


# =============================================================================
# EXERCISE 7: Async Rate Limiter
# Implement rate limiting using asyncio.Semaphore.
# =============================================================================
print("\n--- Exercise 7: Rate Limiter ---")

# Your code here:


# =============================================================================
# EXERCISE 8: Async Queue
# Implement async producer-consumer pattern.
# =============================================================================
print("\n--- Exercise 8: Async Queue ---")

# Your code here:


# =============================================================================
# SOLUTIONS
# =============================================================================
print("\n\n" + "=" * 60)
print("SOLUTIONS")
print("=" * 60)

# SOLUTION 1
print("\n--- Solution 1: Basic Threading ---")

def print_numbers(name):
    for i in range(1, 6):
        print(f"Thread {name}: {i}")
        time.sleep(0.1)

threads = [threading.Thread(target=print_numbers, args=(f"T{i}",)) 
           for i in range(3)]

for t in threads:
    t.start()
for t in threads:
    t.join()

# SOLUTION 2
print("\n--- Solution 2: Thread-Safe Counter ---")

class ThreadSafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()
    
    def increment(self):
        with self._lock:
            self._value += 1
    
    def decrement(self):
        with self._lock:
            self._value -= 1
    
    def get(self):
        with self._lock:
            return self._value

counter = ThreadSafeCounter()

def increment_many():
    for _ in range(1000):
        counter.increment()

threads = [threading.Thread(target=increment_many) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Final counter: {counter.get()}")

# SOLUTION 3
print("\n--- Solution 3: Producer-Consumer ---")

def producer(q, items):
    for item in items:
        print(f"Producing: {item}")
        q.put(item)
        time.sleep(0.1)
    q.put(None)  # Poison pill

def consumer(q, name):
    while True:
        item = q.get()
        if item is None:
            q.put(None)  # Pass poison pill
            break
        print(f"Consumer {name} processing: {item}")
        time.sleep(0.15)

q = queue.Queue()
items = list(range(1, 6))

prod = threading.Thread(target=producer, args=(q, items))
cons1 = threading.Thread(target=consumer, args=(q, "A"))
cons2 = threading.Thread(target=consumer, args=(q, "B"))

prod.start()
cons1.start()
cons2.start()

prod.join()
cons1.join()
cons2.join()

# SOLUTION 4
print("\n--- Solution 4: ThreadPoolExecutor ---")

def download_url(url):
    time.sleep(0.3)  # Simulate download
    return f"Content from {url}"

def download_all(urls):
    with ThreadPoolExecutor(max_workers=3) as executor:
        return list(executor.map(download_url, urls))

urls = ["site1.com", "site2.com", "site3.com", "site4.com"]
start = time.time()
results = download_all(urls)
print(f"Downloaded {len(results)} sites in {time.time() - start:.2f}s")
print(f"Results: {results}")

# SOLUTION 5
print("\n--- Solution 5: Async Basic ---")

async def fetch_api(endpoint, delay):
    print(f"Fetching {endpoint}...")
    await asyncio.sleep(delay)
    return {"endpoint": endpoint, "data": f"Data from {endpoint}"}

async def fetch_all_apis():
    apis = [
        ("users", 0.3),
        ("posts", 0.5),
        ("comments", 0.2),
    ]
    
    tasks = [fetch_api(name, delay) for name, delay in apis]
    results = await asyncio.gather(*tasks)
    return results

start = time.time()
results = asyncio.run(fetch_all_apis())
print(f"Results: {results}")
print(f"Time: {time.time() - start:.2f}s")

# SOLUTION 6
print("\n--- Solution 6: Async Timeout ---")

async def slow_api_call(delay):
    await asyncio.sleep(delay)
    return "Success"

async def fetch_with_timeout(delay, timeout):
    try:
        result = await asyncio.wait_for(slow_api_call(delay), timeout=timeout)
        return {"status": "success", "result": result}
    except asyncio.TimeoutError:
        return {"status": "timeout", "result": None}

async def demo_timeouts():
    # This should succeed
    result1 = await fetch_with_timeout(0.5, timeout=1.0)
    print(f"Fast call: {result1}")
    
    # This should timeout
    result2 = await fetch_with_timeout(2.0, timeout=0.5)
    print(f"Slow call: {result2}")

asyncio.run(demo_timeouts())

# SOLUTION 7
print("\n--- Solution 7: Rate Limiter ---")

class RateLimiter:
    def __init__(self, max_concurrent):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def execute(self, task_id):
        async with self.semaphore:
            print(f"Task {task_id} started")
            await asyncio.sleep(0.3)
            print(f"Task {task_id} completed")
            return task_id

async def run_rate_limited():
    limiter = RateLimiter(max_concurrent=2)
    
    tasks = [limiter.execute(i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    return results

start = time.time()
results = asyncio.run(run_rate_limited())
print(f"Results: {results}")
print(f"Time: {time.time() - start:.2f}s")

# SOLUTION 8
print("\n--- Solution 8: Async Queue ---")

async def async_producer(queue, items):
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(0.1)
    await queue.put(None)

async def async_consumer(queue, name):
    results = []
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # Signal other consumers
            break
        print(f"Consumer {name}: {item}")
        results.append(item * 2)
        await asyncio.sleep(0.15)
    return results

async def run_async_queue():
    queue = asyncio.Queue()
    
    prod = asyncio.create_task(async_producer(queue, [1, 2, 3, 4, 5]))
    cons1 = asyncio.create_task(async_consumer(queue, "A"))
    cons2 = asyncio.create_task(async_consumer(queue, "B"))
    
    await prod
    results1 = await cons1
    results2 = await cons2
    
    return results1 + results2

results = asyncio.run(run_async_queue())
print(f"All results: {results}")

print("\n" + "=" * 60)
print("END OF EXERCISES")
print("=" * 60)
Exercises - Python Tutorial | DeepML