python
examples
examples.py🐍python
"""
14 - Concurrency: Examples
Run this file to see threading, multiprocessing, and asyncio in action!
"""
print("=" * 60)
print("CONCURRENCY - EXAMPLES")
print("=" * 60)
import threading
import time
import queue
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
# =============================================================================
# 1. BASIC THREADING
# =============================================================================
print("\n--- 1. Basic Threading ---\n")
def worker(name, delay):
print(f"Worker {name} starting...")
time.sleep(delay)
print(f"Worker {name} done!")
# Create and start threads
thread1 = threading.Thread(target=worker, args=("A", 1))
thread2 = threading.Thread(target=worker, args=("B", 0.5))
start = time.time()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Total time: {time.time() - start:.2f}s (concurrent!)")
# =============================================================================
# 2. THREAD WITH RESULTS
# =============================================================================
print("\n--- 2. Thread with Results ---\n")
results = {}
results_lock = threading.Lock()
def fetch_data(url, index):
time.sleep(0.5) # Simulate network delay
with results_lock:
results[index] = f"Data from {url}"
threads = []
urls = ["site1.com", "site2.com", "site3.com"]
for i, url in enumerate(urls):
t = threading.Thread(target=fetch_data, args=(url, i))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Results: {results}")
# =============================================================================
# 3. THREADPOOLEXECUTOR
# =============================================================================
print("\n--- 3. ThreadPoolExecutor ---\n")
def process_item(item):
time.sleep(0.3)
return f"Processed: {item}"
items = ["a", "b", "c", "d", "e"]
start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(process_item, items))
print(f"Results: {results}")
print(f"Time: {time.time() - start:.2f}s")
# =============================================================================
# 4. THREAD SAFETY - LOCK
# =============================================================================
print("\n--- 4. Thread Safety with Lock ---\n")
counter = 0
counter_lock = threading.Lock()
def increment(times):
global counter
for _ in range(times):
with counter_lock:
counter += 1
threads = [threading.Thread(target=increment, args=(10000,))
for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Counter (with lock): {counter}")
# =============================================================================
# 5. THREADING QUEUE
# =============================================================================
print("\n--- 5. Threading Queue ---\n")
def producer(q, items):
for item in items:
q.put(item)
print(f"Produced: {item}")
time.sleep(0.1)
def consumer(q, num_items):
processed = []
for _ in range(num_items):
item = q.get()
processed.append(f"Done: {item}")
q.task_done()
return processed
q = queue.Queue()
items = [1, 2, 3, 4, 5]
prod_thread = threading.Thread(target=producer, args=(q, items))
prod_thread.start()
# Give producer a head start
time.sleep(0.2)
# Process in main thread
results = []
for _ in range(len(items)):
item = q.get()
results.append(item)
print(f"Consumed: {results}")
# =============================================================================
# 6. BASIC MULTIPROCESSING (Sequential Demo)
# =============================================================================
print("\n--- 6. Multiprocessing Concepts ---\n")
# Note: Multiprocessing works best when run as main script
# Here we demonstrate the concept
import math
def cpu_intensive(n):
"""CPU-bound task"""
return sum(i ** 2 for i in range(n))
# Sequential
start = time.time()
results = [cpu_intensive(100000) for _ in range(4)]
seq_time = time.time() - start
print(f"Sequential time for 4 tasks: {seq_time:.2f}s")
# With ThreadPoolExecutor (limited by GIL for CPU tasks)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_intensive, [100000] * 4))
thread_time = time.time() - start
print(f"ThreadPool time: {thread_time:.2f}s")
print("\nNote: For true CPU parallelism, use ProcessPoolExecutor")
print("(Run separately to avoid multiprocessing issues in demos)")
# =============================================================================
# 7. ASYNCIO BASICS
# =============================================================================
print("\n--- 7. Asyncio Basics ---\n")
async def async_task(name, delay):
print(f"Task {name} starting...")
await asyncio.sleep(delay)
print(f"Task {name} done!")
return f"Result from {name}"
async def async_main():
# Run tasks concurrently
results = await asyncio.gather(
async_task("A", 0.5),
async_task("B", 0.3),
async_task("C", 0.4),
)
return results
start = time.time()
results = asyncio.run(async_main())
print(f"Results: {results}")
print(f"Time: {time.time() - start:.2f}s (concurrent!)")
# =============================================================================
# 8. ASYNCIO WITH CREATE_TASK
# =============================================================================
print("\n--- 8. Asyncio create_task ---\n")
async def fetch_url(url):
print(f"Fetching {url}...")
await asyncio.sleep(0.5)
return f"Content from {url}"
async def main_with_tasks():
# Create tasks - they start immediately
task1 = asyncio.create_task(fetch_url("url1"))
task2 = asyncio.create_task(fetch_url("url2"))
task3 = asyncio.create_task(fetch_url("url3"))
# Do other work while tasks run
print("Tasks are running in background...")
# Wait for results
results = await asyncio.gather(task1, task2, task3)
return results
results = asyncio.run(main_with_tasks())
print(f"Results: {results}")
# =============================================================================
# 9. ASYNCIO TIMEOUT
# =============================================================================
print("\n--- 9. Asyncio Timeout ---\n")
async def slow_operation():
await asyncio.sleep(5)
return "Completed"
async def with_timeout():
try:
result = await asyncio.wait_for(slow_operation(), timeout=1.0)
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(with_timeout())
# =============================================================================
# 10. ASYNCIO QUEUE
# =============================================================================
print("\n--- 10. Asyncio Queue ---\n")
async def async_producer(queue, items):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(0.1)
await queue.put(None) # Signal end
async def async_consumer(queue):
results = []
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed: {item}")
results.append(item)
return results
async def producer_consumer():
queue = asyncio.Queue()
# Run producer and consumer concurrently
producer_task = asyncio.create_task(
async_producer(queue, [1, 2, 3, 4, 5])
)
consumer_task = asyncio.create_task(async_consumer(queue))
await producer_task
results = await consumer_task
return results
results = asyncio.run(producer_consumer())
print(f"Final results: {results}")
# =============================================================================
# 11. ASYNCIO SEMAPHORE
# =============================================================================
print("\n--- 11. Asyncio Semaphore (Rate Limiting) ---\n")
async def limited_task(semaphore, task_id):
async with semaphore:
print(f"Task {task_id} acquired semaphore")
await asyncio.sleep(0.5)
print(f"Task {task_id} releasing semaphore")
return task_id
async def rate_limited():
# Allow max 2 concurrent tasks
semaphore = asyncio.Semaphore(2)
tasks = [limited_task(semaphore, i) for i in range(5)]
results = await asyncio.gather(*tasks)
return results
start = time.time()
results = asyncio.run(rate_limited())
print(f"Results: {results}")
print(f"Time: {time.time() - start:.2f}s")
# =============================================================================
# 12. PRACTICAL EXAMPLE - PARALLEL DOWNLOADS
# =============================================================================
print("\n--- 12. Practical: Parallel Downloads ---\n")
async def download_file(url, delay):
"""Simulate file download"""
print(f"Starting download: {url}")
await asyncio.sleep(delay)
size = len(url) * 1000 # Fake size
print(f"Completed: {url} ({size} bytes)")
return {"url": url, "size": size}
async def download_all(urls):
# Create download tasks with varying delays
tasks = [download_file(url, 0.5 + i * 0.1)
for i, url in enumerate(urls)]
# Execute all concurrently
results = await asyncio.gather(*tasks)
# Calculate totals
total_size = sum(r["size"] for r in results)
return results, total_size
urls = [
"https://example.com/file1.zip",
"https://example.com/file2.zip",
"https://example.com/file3.zip",
]
start = time.time()
results, total = asyncio.run(download_all(urls))
print(f"\nTotal size: {total} bytes")
print(f"Time: {time.time() - start:.2f}s")
# =============================================================================
# 13. COMPARISON - SYNC VS ASYNC
# =============================================================================
print("\n--- 13. Comparison: Sync vs Async ---\n")
def sync_task(n):
time.sleep(0.2)
return n * 2
async def async_task_simple(n):
await asyncio.sleep(0.2)
return n * 2
# Synchronous
start = time.time()
sync_results = [sync_task(i) for i in range(5)]
sync_time = time.time() - start
print(f"Sync: {sync_results} in {sync_time:.2f}s")
# Asynchronous
async def run_async():
tasks = [async_task_simple(i) for i in range(5)]
return await asyncio.gather(*tasks)
start = time.time()
async_results = asyncio.run(run_async())
async_time = time.time() - start
print(f"Async: {async_results} in {async_time:.2f}s")
print(f"\nAsync is {sync_time/async_time:.1f}x faster!")
print("\n" + "=" * 60)
print("END OF EXAMPLES")
print("=" * 60)