python

examples

examples.py🐍
"""
Real World Projects - Examples

Learn to build complete, production-ready applications.
"""

from typing import Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
from abc import ABC, abstractmethod
import json
import hashlib
import uuid
from enum import Enum, auto
from collections import defaultdict

# =============================================================================
# PROJECT 1: TASK MANAGEMENT SYSTEM
# =============================================================================

class TaskStatus(Enum):
    """Task status enumeration."""
    PENDING = auto()
    IN_PROGRESS = auto()
    COMPLETED = auto()
    CANCELLED = auto()


class Priority(Enum):
    """Task priority levels."""
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4


@dataclass
class Task:
    """Represents a task in the system."""
    title: str
    description: str = ""
    priority: Priority = Priority.MEDIUM
    status: TaskStatus = TaskStatus.PENDING
    assigned_to: Optional[str] = None
    due_date: Optional[datetime] = None
    tags: list[str] = field(default_factory=list)
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: Optional[datetime] = None
    
    def to_dict(self) -> dict:
        """Convert task to dictionary."""
        return {
            "id": self.id,
            "title": self.title,
            "description": self.description,
            "priority": self.priority.name,
            "status": self.status.name,
            "assigned_to": self.assigned_to,
            "due_date": self.due_date.isoformat() if self.due_date else None,
            "tags": self.tags,
            "created_at": self.created_at.isoformat(),
            "updated_at": self.updated_at.isoformat() if self.updated_at else None
        }


class TaskRepository(ABC):
    """Abstract repository for task storage."""
    
    @abstractmethod
    def save(self, task: Task) -> None:
        pass
    
    @abstractmethod
    def get(self, task_id: str) -> Optional[Task]:
        pass
    
    @abstractmethod
    def get_all(self) -> list[Task]:
        pass
    
    @abstractmethod
    def delete(self, task_id: str) -> bool:
        pass


class InMemoryTaskRepository(TaskRepository):
    """In-memory implementation of task repository."""
    
    def __init__(self):
        self._tasks: dict[str, Task] = {}
    
    def save(self, task: Task) -> None:
        task.updated_at = datetime.now()
        self._tasks[task.id] = task
    
    def get(self, task_id: str) -> Optional[Task]:
        return self._tasks.get(task_id)
    
    def get_all(self) -> list[Task]:
        return list(self._tasks.values())
    
    def delete(self, task_id: str) -> bool:
        if task_id in self._tasks:
            del self._tasks[task_id]
            return True
        return False


class TaskService:
    """Business logic for task management."""
    
    def __init__(self, repository: TaskRepository):
        self.repo = repository
    
    def create_task(self, title: str, **kwargs) -> Task:
        """Create a new task."""
        task = Task(title=title, **kwargs)
        self.repo.save(task)
        return task
    
    def update_status(self, task_id: str, status: TaskStatus) -> Optional[Task]:
        """Update task status."""
        task = self.repo.get(task_id)
        if task:
            task.status = status
            self.repo.save(task)
        return task
    
    def assign_task(self, task_id: str, user: str) -> Optional[Task]:
        """Assign task to a user."""
        task = self.repo.get(task_id)
        if task:
            task.assigned_to = user
            self.repo.save(task)
        return task
    
    def get_tasks_by_status(self, status: TaskStatus) -> list[Task]:
        """Get all tasks with a specific status."""
        return [t for t in self.repo.get_all() if t.status == status]
    
    def get_tasks_by_priority(self, priority: Priority) -> list[Task]:
        """Get all tasks with a specific priority."""
        return [t for t in self.repo.get_all() if t.priority == priority]
    
    def get_overdue_tasks(self) -> list[Task]:
        """Get all overdue tasks."""
        now = datetime.now()
        return [
            t for t in self.repo.get_all()
            if t.due_date and t.due_date < now and t.status != TaskStatus.COMPLETED
        ]


# =============================================================================
# PROJECT 2: EVENT SYSTEM
# =============================================================================

@dataclass
class Event:
    """Represents an event in the system."""
    name: str
    data: dict = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)
    source: str = ""


class EventHandler(ABC):
    """Abstract event handler."""
    
    @abstractmethod
    def handle(self, event: Event) -> None:
        pass
    
    @abstractmethod
    def can_handle(self, event: Event) -> bool:
        pass


class EventBus:
    """Central event bus for publishing and subscribing to events."""
    
    def __init__(self):
        self._handlers: dict[str, list[EventHandler]] = defaultdict(list)
        self._middleware: list = []
        self._event_history: list[Event] = []
    
    def subscribe(self, event_name: str, handler: EventHandler) -> None:
        """Subscribe a handler to an event type."""
        self._handlers[event_name].append(handler)
    
    def unsubscribe(self, event_name: str, handler: EventHandler) -> None:
        """Unsubscribe a handler from an event type."""
        if handler in self._handlers[event_name]:
            self._handlers[event_name].remove(handler)
    
    def add_middleware(self, middleware) -> None:
        """Add middleware for event processing."""
        self._middleware.append(middleware)
    
    def publish(self, event: Event) -> None:
        """Publish an event to all subscribers."""
        # Run middleware
        for mw in self._middleware:
            event = mw(event)
            if event is None:
                return  # Middleware blocked the event
        
        # Record history
        self._event_history.append(event)
        
        # Dispatch to handlers
        for handler in self._handlers.get(event.name, []):
            if handler.can_handle(event):
                try:
                    handler.handle(event)
                except Exception as e:
                    print(f"Handler error: {e}")
    
    def get_history(self, event_name: Optional[str] = None) -> list[Event]:
        """Get event history, optionally filtered by name."""
        if event_name:
            return [e for e in self._event_history if e.name == event_name]
        return self._event_history.copy()


class LoggingHandler(EventHandler):
    """Handler that logs all events."""
    
    def __init__(self):
        self.logs: list[str] = []
    
    def can_handle(self, event: Event) -> bool:
        return True
    
    def handle(self, event: Event) -> None:
        log_entry = f"[{event.timestamp}] {event.name}: {event.data}"
        self.logs.append(log_entry)
        print(log_entry)


# =============================================================================
# PROJECT 3: CACHING SYSTEM
# =============================================================================

class CacheEntry:
    """Represents a cache entry with expiration."""
    
    def __init__(self, value: Any, ttl_seconds: int = 300):
        self.value = value
        self.created_at = datetime.now()
        self.ttl_seconds = ttl_seconds
        self.access_count = 0
    
    def is_expired(self) -> bool:
        """Check if entry has expired."""
        elapsed = (datetime.now() - self.created_at).total_seconds()
        return elapsed > self.ttl_seconds
    
    def access(self) -> Any:
        """Access the cached value."""
        self.access_count += 1
        return self.value


class CacheStrategy(ABC):
    """Abstract cache eviction strategy."""
    
    @abstractmethod
    def evict(self, cache: dict[str, CacheEntry]) -> str:
        """Return key to evict."""
        pass


class LRUStrategy(CacheStrategy):
    """Least Recently Used eviction strategy."""
    
    def __init__(self):
        self.access_order: list[str] = []
    
    def record_access(self, key: str) -> None:
        """Record key access."""
        if key in self.access_order:
            self.access_order.remove(key)
        self.access_order.append(key)
    
    def evict(self, cache: dict[str, CacheEntry]) -> str:
        """Evict least recently used key."""
        for key in self.access_order:
            if key in cache:
                self.access_order.remove(key)
                return key
        return next(iter(cache.keys())) if cache else ""


class Cache:
    """Configurable caching system."""
    
    def __init__(self, max_size: int = 100, default_ttl: int = 300,
                 strategy: Optional[CacheStrategy] = None):
        self.max_size = max_size
        self.default_ttl = default_ttl
        self.strategy = strategy or LRUStrategy()
        self._cache: dict[str, CacheEntry] = {}
        self._hits = 0
        self._misses = 0
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache."""
        entry = self._cache.get(key)
        
        if entry is None:
            self._misses += 1
            return None
        
        if entry.is_expired():
            del self._cache[key]
            self._misses += 1
            return None
        
        self._hits += 1
        if isinstance(self.strategy, LRUStrategy):
            self.strategy.record_access(key)
        
        return entry.access()
    
    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        """Set value in cache."""
        if len(self._cache) >= self.max_size:
            evict_key = self.strategy.evict(self._cache)
            if evict_key:
                del self._cache[evict_key]
        
        self._cache[key] = CacheEntry(value, ttl or self.default_ttl)
        if isinstance(self.strategy, LRUStrategy):
            self.strategy.record_access(key)
    
    def delete(self, key: str) -> bool:
        """Delete key from cache."""
        if key in self._cache:
            del self._cache[key]
            return True
        return False
    
    def clear(self) -> None:
        """Clear all cache entries."""
        self._cache.clear()
    
    @property
    def hit_rate(self) -> float:
        """Calculate cache hit rate."""
        total = self._hits + self._misses
        return self._hits / total if total > 0 else 0
    
    def stats(self) -> dict:
        """Get cache statistics."""
        return {
            "size": len(self._cache),
            "max_size": self.max_size,
            "hits": self._hits,
            "misses": self._misses,
            "hit_rate": self.hit_rate
        }


def cached(ttl: int = 300):
    """Decorator to cache function results."""
    cache = Cache(default_ttl=ttl)
    
    def decorator(func):
        def wrapper(*args, **kwargs):
            # Create cache key from arguments
            key = hashlib.md5(
                f"{func.__name__}:{args}:{kwargs}".encode()
            ).hexdigest()
            
            result = cache.get(key)
            if result is not None:
                return result
            
            result = func(*args, **kwargs)
            cache.set(key, result)
            return result
        
        wrapper.cache = cache
        return wrapper
    
    return decorator


# =============================================================================
# PROJECT 4: DATA PIPELINE
# =============================================================================

class PipelineStep(ABC):
    """Abstract pipeline step."""
    
    @property
    @abstractmethod
    def name(self) -> str:
        pass
    
    @abstractmethod
    def process(self, data: Any) -> Any:
        pass


class TransformStep(PipelineStep):
    """Transformation step in pipeline."""
    
    def __init__(self, name: str, transform_func):
        self._name = name
        self._transform = transform_func
    
    @property
    def name(self) -> str:
        return self._name
    
    def process(self, data: Any) -> Any:
        return self._transform(data)


class FilterStep(PipelineStep):
    """Filter step in pipeline."""
    
    def __init__(self, name: str, predicate):
        self._name = name
        self._predicate = predicate
    
    @property
    def name(self) -> str:
        return self._name
    
    def process(self, data: list) -> list:
        return [item for item in data if self._predicate(item)]


class Pipeline:
    """Data processing pipeline."""
    
    def __init__(self, name: str = "Pipeline"):
        self.name = name
        self.steps: list[PipelineStep] = []
        self.error_handlers: list = []
        self._metrics: dict = {}
    
    def add_step(self, step: PipelineStep) -> 'Pipeline':
        """Add a processing step."""
        self.steps.append(step)
        return self
    
    def add_error_handler(self, handler) -> 'Pipeline':
        """Add error handler."""
        self.error_handlers.append(handler)
        return self
    
    def execute(self, data: Any) -> Any:
        """Execute the pipeline."""
        result = data
        
        for step in self.steps:
            step_name = step.name
            start_time = datetime.now()
            
            try:
                result = step.process(result)
                
                # Record metrics
                elapsed = (datetime.now() - start_time).total_seconds()
                self._metrics[step_name] = {
                    "duration": elapsed,
                    "success": True
                }
            except Exception as e:
                self._metrics[step_name] = {
                    "error": str(e),
                    "success": False
                }
                
                # Try error handlers
                handled = False
                for handler in self.error_handlers:
                    try:
                        result = handler(step, data, e)
                        handled = True
                        break
                    except Exception:
                        continue
                
                if not handled:
                    raise
        
        return result
    
    def get_metrics(self) -> dict:
        """Get pipeline execution metrics."""
        return self._metrics.copy()


# =============================================================================
# PROJECT 5: CONFIGURATION MANAGER
# =============================================================================

class ConfigSource(ABC):
    """Abstract configuration source."""
    
    @abstractmethod
    def load(self) -> dict:
        pass
    
    @abstractmethod
    def priority(self) -> int:
        pass


class DictConfigSource(ConfigSource):
    """Dictionary-based config source."""
    
    def __init__(self, config: dict, priority: int = 0):
        self._config = config
        self._priority = priority
    
    def load(self) -> dict:
        return self._config.copy()
    
    def priority(self) -> int:
        return self._priority


class ConfigManager:
    """Centralized configuration management."""
    
    def __init__(self):
        self._sources: list[ConfigSource] = []
        self._config: dict = {}
        self._validators: dict[str, list] = {}
    
    def add_source(self, source: ConfigSource) -> None:
        """Add a configuration source."""
        self._sources.append(source)
        self._sources.sort(key=lambda s: s.priority(), reverse=True)
        self._reload()
    
    def _reload(self) -> None:
        """Reload configuration from all sources."""
        self._config = {}
        for source in reversed(self._sources):
            self._config.update(source.load())
    
    def get(self, key: str, default: Any = None) -> Any:
        """Get configuration value."""
        parts = key.split('.')
        value = self._config
        
        for part in parts:
            if isinstance(value, dict) and part in value:
                value = value[part]
            else:
                return default
        
        return value
    
    def set(self, key: str, value: Any) -> None:
        """Set configuration value (in memory only)."""
        parts = key.split('.')
        config = self._config
        
        for part in parts[:-1]:
            if part not in config:
                config[part] = {}
            config = config[part]
        
        config[parts[-1]] = value
    
    def add_validator(self, key: str, validator) -> None:
        """Add a validator for a config key."""
        if key not in self._validators:
            self._validators[key] = []
        self._validators[key].append(validator)
    
    def validate(self) -> list[str]:
        """Validate configuration. Returns list of errors."""
        errors = []
        
        for key, validators in self._validators.items():
            value = self.get(key)
            for validator in validators:
                try:
                    if not validator(value):
                        errors.append(f"Validation failed for {key}: {value}")
                except Exception as e:
                    errors.append(f"Validator error for {key}: {e}")
        
        return errors
    
    def as_dict(self) -> dict:
        """Get all configuration as dictionary."""
        return self._config.copy()


# =============================================================================
# PROJECT 6: SIMPLE STATE MACHINE
# =============================================================================

class State:
    """Represents a state in the state machine."""
    
    def __init__(self, name: str):
        self.name = name
        self._on_enter: list = []
        self._on_exit: list = []
    
    def on_enter(self, callback) -> 'State':
        """Add enter callback."""
        self._on_enter.append(callback)
        return self
    
    def on_exit(self, callback) -> 'State':
        """Add exit callback."""
        self._on_exit.append(callback)
        return self
    
    def enter(self, context: dict) -> None:
        """Execute enter callbacks."""
        for callback in self._on_enter:
            callback(context)
    
    def exit(self, context: dict) -> None:
        """Execute exit callbacks."""
        for callback in self._on_exit:
            callback(context)


class Transition:
    """Represents a transition between states."""
    
    def __init__(self, from_state: str, to_state: str, event: str):
        self.from_state = from_state
        self.to_state = to_state
        self.event = event
        self._guards: list = []
        self._actions: list = []
    
    def add_guard(self, guard) -> 'Transition':
        """Add a guard condition."""
        self._guards.append(guard)
        return self
    
    def add_action(self, action) -> 'Transition':
        """Add a transition action."""
        self._actions.append(action)
        return self
    
    def can_transition(self, context: dict) -> bool:
        """Check if transition is allowed."""
        return all(guard(context) for guard in self._guards)
    
    def execute(self, context: dict) -> None:
        """Execute transition actions."""
        for action in self._actions:
            action(context)


class StateMachine:
    """Simple state machine implementation."""
    
    def __init__(self, initial_state: str):
        self._states: dict[str, State] = {}
        self._transitions: list[Transition] = []
        self._current_state = initial_state
        self._context: dict = {}
        self._history: list[str] = [initial_state]
    
    def add_state(self, state: State) -> 'StateMachine':
        """Add a state."""
        self._states[state.name] = state
        return self
    
    def add_transition(self, transition: Transition) -> 'StateMachine':
        """Add a transition."""
        self._transitions.append(transition)
        return self
    
    @property
    def current_state(self) -> str:
        return self._current_state
    
    @property
    def context(self) -> dict:
        return self._context
    
    def trigger(self, event: str) -> bool:
        """Trigger an event."""
        for transition in self._transitions:
            if (transition.from_state == self._current_state and
                transition.event == event and
                transition.can_transition(self._context)):
                
                # Exit current state
                if self._current_state in self._states:
                    self._states[self._current_state].exit(self._context)
                
                # Execute transition
                transition.execute(self._context)
                
                # Enter new state
                self._current_state = transition.to_state
                self._history.append(self._current_state)
                
                if self._current_state in self._states:
                    self._states[self._current_state].enter(self._context)
                
                return True
        
        return False
    
    def get_history(self) -> list[str]:
        """Get state transition history."""
        return self._history.copy()


# =============================================================================
# DEMONSTRATION
# =============================================================================

if __name__ == "__main__":
    print("Real World Projects Examples")
    print("="*60)
    
    # 1. Task Management
    print("\n1. Task Management System:")
    repo = InMemoryTaskRepository()
    service = TaskService(repo)
    
    task1 = service.create_task(
        "Implement feature",
        priority=Priority.HIGH,
        tags=["backend", "urgent"]
    )
    task2 = service.create_task(
        "Write tests",
        priority=Priority.MEDIUM
    )
    
    service.update_status(task1.id, TaskStatus.IN_PROGRESS)
    service.assign_task(task1.id, "developer@example.com")
    
    print(f"Created tasks: {[t.title for t in repo.get_all()]}")
    print(f"High priority: {[t.title for t in service.get_tasks_by_priority(Priority.HIGH)]}")
    
    # 2. Event System
    print("\n2. Event System:")
    bus = EventBus()
    logger = LoggingHandler()
    bus.subscribe("user.created", logger)
    bus.subscribe("user.updated", logger)
    
    bus.publish(Event("user.created", {"user_id": 123, "email": "test@example.com"}))
    bus.publish(Event("user.updated", {"user_id": 123, "field": "name"}))
    
    print(f"Events processed: {len(logger.logs)}")
    
    # 3. Cache System
    print("\n3. Cache System:")
    cache = Cache(max_size=100, default_ttl=300)
    
    cache.set("user:123", {"name": "John", "email": "john@example.com"})
    user = cache.get("user:123")
    print(f"Cached user: {user}")
    print(f"Cache stats: {cache.stats()}")
    
    # 4. Data Pipeline
    print("\n4. Data Pipeline:")
    pipeline = Pipeline("data-processor")
    pipeline.add_step(TransformStep("uppercase", lambda data: [x.upper() for x in data]))
    pipeline.add_step(FilterStep("filter_short", lambda x: len(x) > 3))
    pipeline.add_step(TransformStep("sort", lambda data: sorted(data)))
    
    result = pipeline.execute(["hello", "world", "hi", "python"])
    print(f"Pipeline result: {result}")
    print(f"Pipeline metrics: {pipeline.get_metrics()}")
    
    # 5. Configuration
    print("\n5. Configuration Manager:")
    config = ConfigManager()
    config.add_source(DictConfigSource({
        "database": {"host": "localhost", "port": 5432},
        "debug": True
    }))
    
    print(f"Database host: {config.get('database.host')}")
    print(f"Debug mode: {config.get('debug')}")
    
    # 6. State Machine
    print("\n6. State Machine:")
    sm = StateMachine("draft")
    
    sm.add_state(State("draft").on_enter(lambda ctx: print("  Entered draft")))
    sm.add_state(State("review").on_enter(lambda ctx: print("  Entered review")))
    sm.add_state(State("published").on_enter(lambda ctx: print("  Entered published")))
    
    sm.add_transition(Transition("draft", "review", "submit"))
    sm.add_transition(Transition("review", "published", "approve"))
    sm.add_transition(Transition("review", "draft", "reject"))
    
    print(f"Initial state: {sm.current_state}")
    sm.trigger("submit")
    print(f"After submit: {sm.current_state}")
    sm.trigger("approve")
    print(f"After approve: {sm.current_state}")
    print(f"History: {sm.get_history()}")
Examples - Python Tutorial | DeepML