python
examples
examples.py🐍python
"""
Real World Projects - Examples
Learn to build complete, production-ready applications.
"""
from typing import Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
from abc import ABC, abstractmethod
import json
import hashlib
import uuid
from enum import Enum, auto
from collections import defaultdict
# =============================================================================
# PROJECT 1: TASK MANAGEMENT SYSTEM
# =============================================================================
class TaskStatus(Enum):
"""Task status enumeration."""
PENDING = auto()
IN_PROGRESS = auto()
COMPLETED = auto()
CANCELLED = auto()
class Priority(Enum):
"""Task priority levels."""
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class Task:
"""Represents a task in the system."""
title: str
description: str = ""
priority: Priority = Priority.MEDIUM
status: TaskStatus = TaskStatus.PENDING
assigned_to: Optional[str] = None
due_date: Optional[datetime] = None
tags: list[str] = field(default_factory=list)
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
created_at: datetime = field(default_factory=datetime.now)
updated_at: Optional[datetime] = None
def to_dict(self) -> dict:
"""Convert task to dictionary."""
return {
"id": self.id,
"title": self.title,
"description": self.description,
"priority": self.priority.name,
"status": self.status.name,
"assigned_to": self.assigned_to,
"due_date": self.due_date.isoformat() if self.due_date else None,
"tags": self.tags,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
class TaskRepository(ABC):
"""Abstract repository for task storage."""
@abstractmethod
def save(self, task: Task) -> None:
pass
@abstractmethod
def get(self, task_id: str) -> Optional[Task]:
pass
@abstractmethod
def get_all(self) -> list[Task]:
pass
@abstractmethod
def delete(self, task_id: str) -> bool:
pass
class InMemoryTaskRepository(TaskRepository):
"""In-memory implementation of task repository."""
def __init__(self):
self._tasks: dict[str, Task] = {}
def save(self, task: Task) -> None:
task.updated_at = datetime.now()
self._tasks[task.id] = task
def get(self, task_id: str) -> Optional[Task]:
return self._tasks.get(task_id)
def get_all(self) -> list[Task]:
return list(self._tasks.values())
def delete(self, task_id: str) -> bool:
if task_id in self._tasks:
del self._tasks[task_id]
return True
return False
class TaskService:
"""Business logic for task management."""
def __init__(self, repository: TaskRepository):
self.repo = repository
def create_task(self, title: str, **kwargs) -> Task:
"""Create a new task."""
task = Task(title=title, **kwargs)
self.repo.save(task)
return task
def update_status(self, task_id: str, status: TaskStatus) -> Optional[Task]:
"""Update task status."""
task = self.repo.get(task_id)
if task:
task.status = status
self.repo.save(task)
return task
def assign_task(self, task_id: str, user: str) -> Optional[Task]:
"""Assign task to a user."""
task = self.repo.get(task_id)
if task:
task.assigned_to = user
self.repo.save(task)
return task
def get_tasks_by_status(self, status: TaskStatus) -> list[Task]:
"""Get all tasks with a specific status."""
return [t for t in self.repo.get_all() if t.status == status]
def get_tasks_by_priority(self, priority: Priority) -> list[Task]:
"""Get all tasks with a specific priority."""
return [t for t in self.repo.get_all() if t.priority == priority]
def get_overdue_tasks(self) -> list[Task]:
"""Get all overdue tasks."""
now = datetime.now()
return [
t for t in self.repo.get_all()
if t.due_date and t.due_date < now and t.status != TaskStatus.COMPLETED
]
# =============================================================================
# PROJECT 2: EVENT SYSTEM
# =============================================================================
@dataclass
class Event:
"""Represents an event in the system."""
name: str
data: dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
source: str = ""
class EventHandler(ABC):
"""Abstract event handler."""
@abstractmethod
def handle(self, event: Event) -> None:
pass
@abstractmethod
def can_handle(self, event: Event) -> bool:
pass
class EventBus:
"""Central event bus for publishing and subscribing to events."""
def __init__(self):
self._handlers: dict[str, list[EventHandler]] = defaultdict(list)
self._middleware: list = []
self._event_history: list[Event] = []
def subscribe(self, event_name: str, handler: EventHandler) -> None:
"""Subscribe a handler to an event type."""
self._handlers[event_name].append(handler)
def unsubscribe(self, event_name: str, handler: EventHandler) -> None:
"""Unsubscribe a handler from an event type."""
if handler in self._handlers[event_name]:
self._handlers[event_name].remove(handler)
def add_middleware(self, middleware) -> None:
"""Add middleware for event processing."""
self._middleware.append(middleware)
def publish(self, event: Event) -> None:
"""Publish an event to all subscribers."""
# Run middleware
for mw in self._middleware:
event = mw(event)
if event is None:
return # Middleware blocked the event
# Record history
self._event_history.append(event)
# Dispatch to handlers
for handler in self._handlers.get(event.name, []):
if handler.can_handle(event):
try:
handler.handle(event)
except Exception as e:
print(f"Handler error: {e}")
def get_history(self, event_name: Optional[str] = None) -> list[Event]:
"""Get event history, optionally filtered by name."""
if event_name:
return [e for e in self._event_history if e.name == event_name]
return self._event_history.copy()
class LoggingHandler(EventHandler):
"""Handler that logs all events."""
def __init__(self):
self.logs: list[str] = []
def can_handle(self, event: Event) -> bool:
return True
def handle(self, event: Event) -> None:
log_entry = f"[{event.timestamp}] {event.name}: {event.data}"
self.logs.append(log_entry)
print(log_entry)
# =============================================================================
# PROJECT 3: CACHING SYSTEM
# =============================================================================
class CacheEntry:
"""Represents a cache entry with expiration."""
def __init__(self, value: Any, ttl_seconds: int = 300):
self.value = value
self.created_at = datetime.now()
self.ttl_seconds = ttl_seconds
self.access_count = 0
def is_expired(self) -> bool:
"""Check if entry has expired."""
elapsed = (datetime.now() - self.created_at).total_seconds()
return elapsed > self.ttl_seconds
def access(self) -> Any:
"""Access the cached value."""
self.access_count += 1
return self.value
class CacheStrategy(ABC):
"""Abstract cache eviction strategy."""
@abstractmethod
def evict(self, cache: dict[str, CacheEntry]) -> str:
"""Return key to evict."""
pass
class LRUStrategy(CacheStrategy):
"""Least Recently Used eviction strategy."""
def __init__(self):
self.access_order: list[str] = []
def record_access(self, key: str) -> None:
"""Record key access."""
if key in self.access_order:
self.access_order.remove(key)
self.access_order.append(key)
def evict(self, cache: dict[str, CacheEntry]) -> str:
"""Evict least recently used key."""
for key in self.access_order:
if key in cache:
self.access_order.remove(key)
return key
return next(iter(cache.keys())) if cache else ""
class Cache:
"""Configurable caching system."""
def __init__(self, max_size: int = 100, default_ttl: int = 300,
strategy: Optional[CacheStrategy] = None):
self.max_size = max_size
self.default_ttl = default_ttl
self.strategy = strategy or LRUStrategy()
self._cache: dict[str, CacheEntry] = {}
self._hits = 0
self._misses = 0
def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
entry = self._cache.get(key)
if entry is None:
self._misses += 1
return None
if entry.is_expired():
del self._cache[key]
self._misses += 1
return None
self._hits += 1
if isinstance(self.strategy, LRUStrategy):
self.strategy.record_access(key)
return entry.access()
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Set value in cache."""
if len(self._cache) >= self.max_size:
evict_key = self.strategy.evict(self._cache)
if evict_key:
del self._cache[evict_key]
self._cache[key] = CacheEntry(value, ttl or self.default_ttl)
if isinstance(self.strategy, LRUStrategy):
self.strategy.record_access(key)
def delete(self, key: str) -> bool:
"""Delete key from cache."""
if key in self._cache:
del self._cache[key]
return True
return False
def clear(self) -> None:
"""Clear all cache entries."""
self._cache.clear()
@property
def hit_rate(self) -> float:
"""Calculate cache hit rate."""
total = self._hits + self._misses
return self._hits / total if total > 0 else 0
def stats(self) -> dict:
"""Get cache statistics."""
return {
"size": len(self._cache),
"max_size": self.max_size,
"hits": self._hits,
"misses": self._misses,
"hit_rate": self.hit_rate
}
def cached(ttl: int = 300):
"""Decorator to cache function results."""
cache = Cache(default_ttl=ttl)
def decorator(func):
def wrapper(*args, **kwargs):
# Create cache key from arguments
key = hashlib.md5(
f"{func.__name__}:{args}:{kwargs}".encode()
).hexdigest()
result = cache.get(key)
if result is not None:
return result
result = func(*args, **kwargs)
cache.set(key, result)
return result
wrapper.cache = cache
return wrapper
return decorator
# =============================================================================
# PROJECT 4: DATA PIPELINE
# =============================================================================
class PipelineStep(ABC):
"""Abstract pipeline step."""
@property
@abstractmethod
def name(self) -> str:
pass
@abstractmethod
def process(self, data: Any) -> Any:
pass
class TransformStep(PipelineStep):
"""Transformation step in pipeline."""
def __init__(self, name: str, transform_func):
self._name = name
self._transform = transform_func
@property
def name(self) -> str:
return self._name
def process(self, data: Any) -> Any:
return self._transform(data)
class FilterStep(PipelineStep):
"""Filter step in pipeline."""
def __init__(self, name: str, predicate):
self._name = name
self._predicate = predicate
@property
def name(self) -> str:
return self._name
def process(self, data: list) -> list:
return [item for item in data if self._predicate(item)]
class Pipeline:
"""Data processing pipeline."""
def __init__(self, name: str = "Pipeline"):
self.name = name
self.steps: list[PipelineStep] = []
self.error_handlers: list = []
self._metrics: dict = {}
def add_step(self, step: PipelineStep) -> 'Pipeline':
"""Add a processing step."""
self.steps.append(step)
return self
def add_error_handler(self, handler) -> 'Pipeline':
"""Add error handler."""
self.error_handlers.append(handler)
return self
def execute(self, data: Any) -> Any:
"""Execute the pipeline."""
result = data
for step in self.steps:
step_name = step.name
start_time = datetime.now()
try:
result = step.process(result)
# Record metrics
elapsed = (datetime.now() - start_time).total_seconds()
self._metrics[step_name] = {
"duration": elapsed,
"success": True
}
except Exception as e:
self._metrics[step_name] = {
"error": str(e),
"success": False
}
# Try error handlers
handled = False
for handler in self.error_handlers:
try:
result = handler(step, data, e)
handled = True
break
except Exception:
continue
if not handled:
raise
return result
def get_metrics(self) -> dict:
"""Get pipeline execution metrics."""
return self._metrics.copy()
# =============================================================================
# PROJECT 5: CONFIGURATION MANAGER
# =============================================================================
class ConfigSource(ABC):
"""Abstract configuration source."""
@abstractmethod
def load(self) -> dict:
pass
@abstractmethod
def priority(self) -> int:
pass
class DictConfigSource(ConfigSource):
"""Dictionary-based config source."""
def __init__(self, config: dict, priority: int = 0):
self._config = config
self._priority = priority
def load(self) -> dict:
return self._config.copy()
def priority(self) -> int:
return self._priority
class ConfigManager:
"""Centralized configuration management."""
def __init__(self):
self._sources: list[ConfigSource] = []
self._config: dict = {}
self._validators: dict[str, list] = {}
def add_source(self, source: ConfigSource) -> None:
"""Add a configuration source."""
self._sources.append(source)
self._sources.sort(key=lambda s: s.priority(), reverse=True)
self._reload()
def _reload(self) -> None:
"""Reload configuration from all sources."""
self._config = {}
for source in reversed(self._sources):
self._config.update(source.load())
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value."""
parts = key.split('.')
value = self._config
for part in parts:
if isinstance(value, dict) and part in value:
value = value[part]
else:
return default
return value
def set(self, key: str, value: Any) -> None:
"""Set configuration value (in memory only)."""
parts = key.split('.')
config = self._config
for part in parts[:-1]:
if part not in config:
config[part] = {}
config = config[part]
config[parts[-1]] = value
def add_validator(self, key: str, validator) -> None:
"""Add a validator for a config key."""
if key not in self._validators:
self._validators[key] = []
self._validators[key].append(validator)
def validate(self) -> list[str]:
"""Validate configuration. Returns list of errors."""
errors = []
for key, validators in self._validators.items():
value = self.get(key)
for validator in validators:
try:
if not validator(value):
errors.append(f"Validation failed for {key}: {value}")
except Exception as e:
errors.append(f"Validator error for {key}: {e}")
return errors
def as_dict(self) -> dict:
"""Get all configuration as dictionary."""
return self._config.copy()
# =============================================================================
# PROJECT 6: SIMPLE STATE MACHINE
# =============================================================================
class State:
"""Represents a state in the state machine."""
def __init__(self, name: str):
self.name = name
self._on_enter: list = []
self._on_exit: list = []
def on_enter(self, callback) -> 'State':
"""Add enter callback."""
self._on_enter.append(callback)
return self
def on_exit(self, callback) -> 'State':
"""Add exit callback."""
self._on_exit.append(callback)
return self
def enter(self, context: dict) -> None:
"""Execute enter callbacks."""
for callback in self._on_enter:
callback(context)
def exit(self, context: dict) -> None:
"""Execute exit callbacks."""
for callback in self._on_exit:
callback(context)
class Transition:
"""Represents a transition between states."""
def __init__(self, from_state: str, to_state: str, event: str):
self.from_state = from_state
self.to_state = to_state
self.event = event
self._guards: list = []
self._actions: list = []
def add_guard(self, guard) -> 'Transition':
"""Add a guard condition."""
self._guards.append(guard)
return self
def add_action(self, action) -> 'Transition':
"""Add a transition action."""
self._actions.append(action)
return self
def can_transition(self, context: dict) -> bool:
"""Check if transition is allowed."""
return all(guard(context) for guard in self._guards)
def execute(self, context: dict) -> None:
"""Execute transition actions."""
for action in self._actions:
action(context)
class StateMachine:
"""Simple state machine implementation."""
def __init__(self, initial_state: str):
self._states: dict[str, State] = {}
self._transitions: list[Transition] = []
self._current_state = initial_state
self._context: dict = {}
self._history: list[str] = [initial_state]
def add_state(self, state: State) -> 'StateMachine':
"""Add a state."""
self._states[state.name] = state
return self
def add_transition(self, transition: Transition) -> 'StateMachine':
"""Add a transition."""
self._transitions.append(transition)
return self
@property
def current_state(self) -> str:
return self._current_state
@property
def context(self) -> dict:
return self._context
def trigger(self, event: str) -> bool:
"""Trigger an event."""
for transition in self._transitions:
if (transition.from_state == self._current_state and
transition.event == event and
transition.can_transition(self._context)):
# Exit current state
if self._current_state in self._states:
self._states[self._current_state].exit(self._context)
# Execute transition
transition.execute(self._context)
# Enter new state
self._current_state = transition.to_state
self._history.append(self._current_state)
if self._current_state in self._states:
self._states[self._current_state].enter(self._context)
return True
return False
def get_history(self) -> list[str]:
"""Get state transition history."""
return self._history.copy()
# =============================================================================
# DEMONSTRATION
# =============================================================================
if __name__ == "__main__":
print("Real World Projects Examples")
print("="*60)
# 1. Task Management
print("\n1. Task Management System:")
repo = InMemoryTaskRepository()
service = TaskService(repo)
task1 = service.create_task(
"Implement feature",
priority=Priority.HIGH,
tags=["backend", "urgent"]
)
task2 = service.create_task(
"Write tests",
priority=Priority.MEDIUM
)
service.update_status(task1.id, TaskStatus.IN_PROGRESS)
service.assign_task(task1.id, "developer@example.com")
print(f"Created tasks: {[t.title for t in repo.get_all()]}")
print(f"High priority: {[t.title for t in service.get_tasks_by_priority(Priority.HIGH)]}")
# 2. Event System
print("\n2. Event System:")
bus = EventBus()
logger = LoggingHandler()
bus.subscribe("user.created", logger)
bus.subscribe("user.updated", logger)
bus.publish(Event("user.created", {"user_id": 123, "email": "test@example.com"}))
bus.publish(Event("user.updated", {"user_id": 123, "field": "name"}))
print(f"Events processed: {len(logger.logs)}")
# 3. Cache System
print("\n3. Cache System:")
cache = Cache(max_size=100, default_ttl=300)
cache.set("user:123", {"name": "John", "email": "john@example.com"})
user = cache.get("user:123")
print(f"Cached user: {user}")
print(f"Cache stats: {cache.stats()}")
# 4. Data Pipeline
print("\n4. Data Pipeline:")
pipeline = Pipeline("data-processor")
pipeline.add_step(TransformStep("uppercase", lambda data: [x.upper() for x in data]))
pipeline.add_step(FilterStep("filter_short", lambda x: len(x) > 3))
pipeline.add_step(TransformStep("sort", lambda data: sorted(data)))
result = pipeline.execute(["hello", "world", "hi", "python"])
print(f"Pipeline result: {result}")
print(f"Pipeline metrics: {pipeline.get_metrics()}")
# 5. Configuration
print("\n5. Configuration Manager:")
config = ConfigManager()
config.add_source(DictConfigSource({
"database": {"host": "localhost", "port": 5432},
"debug": True
}))
print(f"Database host: {config.get('database.host')}")
print(f"Debug mode: {config.get('debug')}")
# 6. State Machine
print("\n6. State Machine:")
sm = StateMachine("draft")
sm.add_state(State("draft").on_enter(lambda ctx: print(" Entered draft")))
sm.add_state(State("review").on_enter(lambda ctx: print(" Entered review")))
sm.add_state(State("published").on_enter(lambda ctx: print(" Entered published")))
sm.add_transition(Transition("draft", "review", "submit"))
sm.add_transition(Transition("review", "published", "approve"))
sm.add_transition(Transition("review", "draft", "reject"))
print(f"Initial state: {sm.current_state}")
sm.trigger("submit")
print(f"After submit: {sm.current_state}")
sm.trigger("approve")
print(f"After approve: {sm.current_state}")
print(f"History: {sm.get_history()}")