python
exercises
exercises.py🐍python
"""
14 - Concurrency: Exercises
Practice threading, multiprocessing, and asyncio.
"""
print("=" * 60)
print("CONCURRENCY EXERCISES")
print("=" * 60)
import threading
import time
import queue
from concurrent.futures import ThreadPoolExecutor
import asyncio
# =============================================================================
# EXERCISE 1: Basic Threading
# Create 3 threads that print numbers 1-5 with their thread name.
# =============================================================================
print("\n--- Exercise 1: Basic Threading ---")
# Your code here:
# =============================================================================
# EXERCISE 2: Thread-Safe Counter
# Create a thread-safe counter class with increment/decrement/get methods.
# =============================================================================
print("\n--- Exercise 2: Thread-Safe Counter ---")
# Your code here:
# Test:
# counter = ThreadSafeCounter()
# # Multiple threads incrementing
# print(counter.get())
# =============================================================================
# EXERCISE 3: Producer-Consumer with Queue
# Implement a producer that adds items and consumers that process them.
# =============================================================================
print("\n--- Exercise 3: Producer-Consumer ---")
# Your code here:
# =============================================================================
# EXERCISE 4: ThreadPoolExecutor
# Use ThreadPoolExecutor to download multiple URLs concurrently.
# =============================================================================
print("\n--- Exercise 4: ThreadPoolExecutor ---")
# Your code here:
# Test:
# urls = ["url1", "url2", "url3"]
# results = download_all(urls)
# =============================================================================
# EXERCISE 5: Async Basic
# Create async functions that simulate API calls and run them concurrently.
# =============================================================================
print("\n--- Exercise 5: Async Basic ---")
# Your code here:
# =============================================================================
# EXERCISE 6: Async with Timeout
# Implement an async function that handles timeouts gracefully.
# =============================================================================
print("\n--- Exercise 6: Async Timeout ---")
# Your code here:
# =============================================================================
# EXERCISE 7: Async Rate Limiter
# Implement rate limiting using asyncio.Semaphore.
# =============================================================================
print("\n--- Exercise 7: Rate Limiter ---")
# Your code here:
# =============================================================================
# EXERCISE 8: Async Queue
# Implement async producer-consumer pattern.
# =============================================================================
print("\n--- Exercise 8: Async Queue ---")
# Your code here:
# =============================================================================
# SOLUTIONS
# =============================================================================
print("\n\n" + "=" * 60)
print("SOLUTIONS")
print("=" * 60)
# SOLUTION 1
print("\n--- Solution 1: Basic Threading ---")
def print_numbers(name):
for i in range(1, 6):
print(f"Thread {name}: {i}")
time.sleep(0.1)
threads = [threading.Thread(target=print_numbers, args=(f"T{i}",))
for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
# SOLUTION 2
print("\n--- Solution 2: Thread-Safe Counter ---")
class ThreadSafeCounter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
def decrement(self):
with self._lock:
self._value -= 1
def get(self):
with self._lock:
return self._value
counter = ThreadSafeCounter()
def increment_many():
for _ in range(1000):
counter.increment()
threads = [threading.Thread(target=increment_many) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Final counter: {counter.get()}")
# SOLUTION 3
print("\n--- Solution 3: Producer-Consumer ---")
def producer(q, items):
for item in items:
print(f"Producing: {item}")
q.put(item)
time.sleep(0.1)
q.put(None) # Poison pill
def consumer(q, name):
while True:
item = q.get()
if item is None:
q.put(None) # Pass poison pill
break
print(f"Consumer {name} processing: {item}")
time.sleep(0.15)
q = queue.Queue()
items = list(range(1, 6))
prod = threading.Thread(target=producer, args=(q, items))
cons1 = threading.Thread(target=consumer, args=(q, "A"))
cons2 = threading.Thread(target=consumer, args=(q, "B"))
prod.start()
cons1.start()
cons2.start()
prod.join()
cons1.join()
cons2.join()
# SOLUTION 4
print("\n--- Solution 4: ThreadPoolExecutor ---")
def download_url(url):
time.sleep(0.3) # Simulate download
return f"Content from {url}"
def download_all(urls):
with ThreadPoolExecutor(max_workers=3) as executor:
return list(executor.map(download_url, urls))
urls = ["site1.com", "site2.com", "site3.com", "site4.com"]
start = time.time()
results = download_all(urls)
print(f"Downloaded {len(results)} sites in {time.time() - start:.2f}s")
print(f"Results: {results}")
# SOLUTION 5
print("\n--- Solution 5: Async Basic ---")
async def fetch_api(endpoint, delay):
print(f"Fetching {endpoint}...")
await asyncio.sleep(delay)
return {"endpoint": endpoint, "data": f"Data from {endpoint}"}
async def fetch_all_apis():
apis = [
("users", 0.3),
("posts", 0.5),
("comments", 0.2),
]
tasks = [fetch_api(name, delay) for name, delay in apis]
results = await asyncio.gather(*tasks)
return results
start = time.time()
results = asyncio.run(fetch_all_apis())
print(f"Results: {results}")
print(f"Time: {time.time() - start:.2f}s")
# SOLUTION 6
print("\n--- Solution 6: Async Timeout ---")
async def slow_api_call(delay):
await asyncio.sleep(delay)
return "Success"
async def fetch_with_timeout(delay, timeout):
try:
result = await asyncio.wait_for(slow_api_call(delay), timeout=timeout)
return {"status": "success", "result": result}
except asyncio.TimeoutError:
return {"status": "timeout", "result": None}
async def demo_timeouts():
# This should succeed
result1 = await fetch_with_timeout(0.5, timeout=1.0)
print(f"Fast call: {result1}")
# This should timeout
result2 = await fetch_with_timeout(2.0, timeout=0.5)
print(f"Slow call: {result2}")
asyncio.run(demo_timeouts())
# SOLUTION 7
print("\n--- Solution 7: Rate Limiter ---")
class RateLimiter:
def __init__(self, max_concurrent):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute(self, task_id):
async with self.semaphore:
print(f"Task {task_id} started")
await asyncio.sleep(0.3)
print(f"Task {task_id} completed")
return task_id
async def run_rate_limited():
limiter = RateLimiter(max_concurrent=2)
tasks = [limiter.execute(i) for i in range(5)]
results = await asyncio.gather(*tasks)
return results
start = time.time()
results = asyncio.run(run_rate_limited())
print(f"Results: {results}")
print(f"Time: {time.time() - start:.2f}s")
# SOLUTION 8
print("\n--- Solution 8: Async Queue ---")
async def async_producer(queue, items):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(0.1)
await queue.put(None)
async def async_consumer(queue, name):
results = []
while True:
item = await queue.get()
if item is None:
await queue.put(None) # Signal other consumers
break
print(f"Consumer {name}: {item}")
results.append(item * 2)
await asyncio.sleep(0.15)
return results
async def run_async_queue():
queue = asyncio.Queue()
prod = asyncio.create_task(async_producer(queue, [1, 2, 3, 4, 5]))
cons1 = asyncio.create_task(async_consumer(queue, "A"))
cons2 = asyncio.create_task(async_consumer(queue, "B"))
await prod
results1 = await cons1
results2 = await cons2
return results1 + results2
results = asyncio.run(run_async_queue())
print(f"All results: {results}")
print("\n" + "=" * 60)
print("END OF EXERCISES")
print("=" * 60)