python

examples

examples.py🐍
"""
14 - Concurrency: Examples
Run this file to see threading, multiprocessing, and asyncio in action!
"""

print("=" * 60)
print("CONCURRENCY - EXAMPLES")
print("=" * 60)

import threading
import time
import queue
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio

# =============================================================================
# 1. BASIC THREADING
# =============================================================================
print("\n--- 1. Basic Threading ---\n")

def worker(name, delay):
    print(f"Worker {name} starting...")
    time.sleep(delay)
    print(f"Worker {name} done!")

# Create and start threads
thread1 = threading.Thread(target=worker, args=("A", 1))
thread2 = threading.Thread(target=worker, args=("B", 0.5))

start = time.time()
thread1.start()
thread2.start()

thread1.join()
thread2.join()
print(f"Total time: {time.time() - start:.2f}s (concurrent!)")

# =============================================================================
# 2. THREAD WITH RESULTS
# =============================================================================
print("\n--- 2. Thread with Results ---\n")

results = {}
results_lock = threading.Lock()

def fetch_data(url, index):
    time.sleep(0.5)  # Simulate network delay
    with results_lock:
        results[index] = f"Data from {url}"

threads = []
urls = ["site1.com", "site2.com", "site3.com"]

for i, url in enumerate(urls):
    t = threading.Thread(target=fetch_data, args=(url, i))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Results: {results}")

# =============================================================================
# 3. THREADPOOLEXECUTOR
# =============================================================================
print("\n--- 3. ThreadPoolExecutor ---\n")

def process_item(item):
    time.sleep(0.3)
    return f"Processed: {item}"

items = ["a", "b", "c", "d", "e"]

start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
    results = list(executor.map(process_item, items))
print(f"Results: {results}")
print(f"Time: {time.time() - start:.2f}s")

# =============================================================================
# 4. THREAD SAFETY - LOCK
# =============================================================================
print("\n--- 4. Thread Safety with Lock ---\n")

counter = 0
counter_lock = threading.Lock()

def increment(times):
    global counter
    for _ in range(times):
        with counter_lock:
            counter += 1

threads = [threading.Thread(target=increment, args=(10000,)) 
           for _ in range(5)]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Counter (with lock): {counter}")

# =============================================================================
# 5. THREADING QUEUE
# =============================================================================
print("\n--- 5. Threading Queue ---\n")

def producer(q, items):
    for item in items:
        q.put(item)
        print(f"Produced: {item}")
        time.sleep(0.1)

def consumer(q, num_items):
    processed = []
    for _ in range(num_items):
        item = q.get()
        processed.append(f"Done: {item}")
        q.task_done()
    return processed

q = queue.Queue()
items = [1, 2, 3, 4, 5]

prod_thread = threading.Thread(target=producer, args=(q, items))
prod_thread.start()

# Give producer a head start
time.sleep(0.2)

# Process in main thread
results = []
for _ in range(len(items)):
    item = q.get()
    results.append(item)

print(f"Consumed: {results}")

# =============================================================================
# 6. BASIC MULTIPROCESSING (Sequential Demo)
# =============================================================================
print("\n--- 6. Multiprocessing Concepts ---\n")

# Note: Multiprocessing works best when run as main script
# Here we demonstrate the concept

import math

def cpu_intensive(n):
    """CPU-bound task"""
    return sum(i ** 2 for i in range(n))

# Sequential
start = time.time()
results = [cpu_intensive(100000) for _ in range(4)]
seq_time = time.time() - start
print(f"Sequential time for 4 tasks: {seq_time:.2f}s")

# With ThreadPoolExecutor (limited by GIL for CPU tasks)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_intensive, [100000] * 4))
thread_time = time.time() - start
print(f"ThreadPool time: {thread_time:.2f}s")

print("\nNote: For true CPU parallelism, use ProcessPoolExecutor")
print("(Run separately to avoid multiprocessing issues in demos)")

# =============================================================================
# 7. ASYNCIO BASICS
# =============================================================================
print("\n--- 7. Asyncio Basics ---\n")

async def async_task(name, delay):
    print(f"Task {name} starting...")
    await asyncio.sleep(delay)
    print(f"Task {name} done!")
    return f"Result from {name}"

async def async_main():
    # Run tasks concurrently
    results = await asyncio.gather(
        async_task("A", 0.5),
        async_task("B", 0.3),
        async_task("C", 0.4),
    )
    return results

start = time.time()
results = asyncio.run(async_main())
print(f"Results: {results}")
print(f"Time: {time.time() - start:.2f}s (concurrent!)")

# =============================================================================
# 8. ASYNCIO WITH CREATE_TASK
# =============================================================================
print("\n--- 8. Asyncio create_task ---\n")

async def fetch_url(url):
    print(f"Fetching {url}...")
    await asyncio.sleep(0.5)
    return f"Content from {url}"

async def main_with_tasks():
    # Create tasks - they start immediately
    task1 = asyncio.create_task(fetch_url("url1"))
    task2 = asyncio.create_task(fetch_url("url2"))
    task3 = asyncio.create_task(fetch_url("url3"))
    
    # Do other work while tasks run
    print("Tasks are running in background...")
    
    # Wait for results
    results = await asyncio.gather(task1, task2, task3)
    return results

results = asyncio.run(main_with_tasks())
print(f"Results: {results}")

# =============================================================================
# 9. ASYNCIO TIMEOUT
# =============================================================================
print("\n--- 9. Asyncio Timeout ---\n")

async def slow_operation():
    await asyncio.sleep(5)
    return "Completed"

async def with_timeout():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=1.0)
        print(f"Result: {result}")
    except asyncio.TimeoutError:
        print("Operation timed out!")

asyncio.run(with_timeout())

# =============================================================================
# 10. ASYNCIO QUEUE
# =============================================================================
print("\n--- 10. Asyncio Queue ---\n")

async def async_producer(queue, items):
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(0.1)
    await queue.put(None)  # Signal end

async def async_consumer(queue):
    results = []
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")
        results.append(item)
    return results

async def producer_consumer():
    queue = asyncio.Queue()
    
    # Run producer and consumer concurrently
    producer_task = asyncio.create_task(
        async_producer(queue, [1, 2, 3, 4, 5])
    )
    consumer_task = asyncio.create_task(async_consumer(queue))
    
    await producer_task
    results = await consumer_task
    return results

results = asyncio.run(producer_consumer())
print(f"Final results: {results}")

# =============================================================================
# 11. ASYNCIO SEMAPHORE
# =============================================================================
print("\n--- 11. Asyncio Semaphore (Rate Limiting) ---\n")

async def limited_task(semaphore, task_id):
    async with semaphore:
        print(f"Task {task_id} acquired semaphore")
        await asyncio.sleep(0.5)
        print(f"Task {task_id} releasing semaphore")
        return task_id

async def rate_limited():
    # Allow max 2 concurrent tasks
    semaphore = asyncio.Semaphore(2)
    
    tasks = [limited_task(semaphore, i) for i in range(5)]
    results = await asyncio.gather(*tasks)
    return results

start = time.time()
results = asyncio.run(rate_limited())
print(f"Results: {results}")
print(f"Time: {time.time() - start:.2f}s")

# =============================================================================
# 12. PRACTICAL EXAMPLE - PARALLEL DOWNLOADS
# =============================================================================
print("\n--- 12. Practical: Parallel Downloads ---\n")

async def download_file(url, delay):
    """Simulate file download"""
    print(f"Starting download: {url}")
    await asyncio.sleep(delay)
    size = len(url) * 1000  # Fake size
    print(f"Completed: {url} ({size} bytes)")
    return {"url": url, "size": size}

async def download_all(urls):
    # Create download tasks with varying delays
    tasks = [download_file(url, 0.5 + i * 0.1) 
             for i, url in enumerate(urls)]
    
    # Execute all concurrently
    results = await asyncio.gather(*tasks)
    
    # Calculate totals
    total_size = sum(r["size"] for r in results)
    return results, total_size

urls = [
    "https://example.com/file1.zip",
    "https://example.com/file2.zip",
    "https://example.com/file3.zip",
]

start = time.time()
results, total = asyncio.run(download_all(urls))
print(f"\nTotal size: {total} bytes")
print(f"Time: {time.time() - start:.2f}s")

# =============================================================================
# 13. COMPARISON - SYNC VS ASYNC
# =============================================================================
print("\n--- 13. Comparison: Sync vs Async ---\n")

def sync_task(n):
    time.sleep(0.2)
    return n * 2

async def async_task_simple(n):
    await asyncio.sleep(0.2)
    return n * 2

# Synchronous
start = time.time()
sync_results = [sync_task(i) for i in range(5)]
sync_time = time.time() - start
print(f"Sync: {sync_results} in {sync_time:.2f}s")

# Asynchronous
async def run_async():
    tasks = [async_task_simple(i) for i in range(5)]
    return await asyncio.gather(*tasks)

start = time.time()
async_results = asyncio.run(run_async())
async_time = time.time() - start
print(f"Async: {async_results} in {async_time:.2f}s")

print(f"\nAsync is {sync_time/async_time:.1f}x faster!")

print("\n" + "=" * 60)
print("END OF EXAMPLES")
print("=" * 60)
Examples - Python Tutorial | DeepML