python

exercises

exercises.py🐍
"""
Python Automation and Scripting - Exercises
Practice file management, system automation, scheduling, and web scraping
"""

import os
import shutil
import time
from pathlib import Path
from datetime import datetime, timedelta
from typing import List, Dict, Callable, Optional, Any
from dataclasses import dataclass
from collections import defaultdict
import re
import json


# ============================================================
# EXERCISE 1: File Organizer
# ============================================================
"""
Create a file organizer that sorts files into folders based on rules.
"""

@dataclass
class FileRule:
    """Rule for organizing files."""
    name: str
    condition: Callable[[Path], bool]
    destination: str


class FileOrganizer:
    """
    Organize files based on configurable rules.
    
    Example:
        >>> organizer = FileOrganizer('~/Downloads')
        >>> organizer.add_rule('images', lambda f: f.suffix.lower() in ['.jpg', '.png'], 'Images')
        >>> organizer.add_rule('documents', lambda f: f.suffix.lower() in ['.pdf', '.doc'], 'Documents')
        >>> organizer.organize(dry_run=True)
    """
    
    def __init__(self, source_dir: str):
        self.source = Path(source_dir).expanduser()
        self.rules: List[FileRule] = []
    
    def add_rule(self, name: str, condition: Callable[[Path], bool], destination: str):
        """Add a file organization rule."""
        # YOUR CODE HERE
        pass
    
    def organize(self, dry_run: bool = True) -> Dict[str, List[str]]:
        """
        Organize files according to rules.
        
        Returns:
            Dictionary mapping destination to list of moved files
        """
        # YOUR CODE HERE
        pass
    
    def undo_last(self):
        """Undo the last organization (bonus)."""
        pass


# ============================================================
# EXERCISE 2: Directory Watcher
# ============================================================
"""
Create a directory watcher that monitors for file changes.
"""

@dataclass
class FileEvent:
    """Represents a file system event."""
    path: Path
    event_type: str  # 'created', 'modified', 'deleted'
    timestamp: datetime


class DirectoryWatcher:
    """
    Watch a directory for file changes.
    
    Example:
        >>> def handler(event):
        ...     print(f"{event.event_type}: {event.path}")
        >>> 
        >>> watcher = DirectoryWatcher('./watched', handler)
        >>> watcher.start()  # Runs in background
        >>> # ... do stuff ...
        >>> watcher.stop()
    """
    
    def __init__(self, path: str, callback: Callable[[FileEvent], None]):
        self.path = Path(path)
        self.callback = callback
        self.running = False
        self._known_files = {}  # Path -> mtime
    
    def _scan(self) -> Dict[Path, float]:
        """Scan directory and return {path: mtime} dict."""
        # YOUR CODE HERE
        pass
    
    def _detect_changes(self, current: Dict[Path, float]) -> List[FileEvent]:
        """Detect changes between scans."""
        # YOUR CODE HERE
        pass
    
    def start(self, interval: float = 1.0, duration: Optional[float] = None):
        """Start watching (blocking)."""
        # YOUR CODE HERE
        pass
    
    def stop(self):
        """Stop watching."""
        self.running = False


# ============================================================
# EXERCISE 3: Backup Manager
# ============================================================
"""
Create a backup manager with versioning and rotation.
"""

class BackupManager:
    """
    Backup manager with versioning.
    
    Example:
        >>> backup = BackupManager('./data', './backups', max_versions=5)
        >>> backup.create()  # Creates timestamped backup
        >>> backup.list_backups()  # Shows all backups
        >>> backup.restore(version='latest')  # Restores from backup
    """
    
    def __init__(self, source_dir: str, backup_dir: str, max_versions: int = 5):
        self.source = Path(source_dir)
        self.backup_dir = Path(backup_dir)
        self.max_versions = max_versions
    
    def create(self) -> Path:
        """
        Create a new backup with timestamp.
        Returns path to backup.
        """
        # YOUR CODE HERE
        pass
    
    def list_backups(self) -> List[Dict[str, Any]]:
        """
        List all backups with metadata.
        Returns list of {path, timestamp, size}.
        """
        # YOUR CODE HERE
        pass
    
    def restore(self, version: str = 'latest'):
        """
        Restore from backup.
        version: 'latest', timestamp string, or backup name
        """
        # YOUR CODE HERE
        pass
    
    def cleanup(self):
        """Remove backups exceeding max_versions."""
        # YOUR CODE HERE
        pass


# ============================================================
# EXERCISE 4: Task Scheduler
# ============================================================
"""
Create a flexible task scheduler with cron-like functionality.
"""

@dataclass
class ScheduledTask:
    """Represents a scheduled task."""
    name: str
    func: Callable
    interval_seconds: Optional[float] = None
    run_at: Optional[datetime] = None
    last_run: Optional[datetime] = None
    next_run: Optional[datetime] = None


class TaskScheduler:
    """
    Task scheduler with interval and one-time tasks.
    
    Example:
        >>> scheduler = TaskScheduler()
        >>> scheduler.every(60).do(check_status)
        >>> scheduler.at("09:00").do(morning_report)
        >>> scheduler.run()
    """
    
    def __init__(self):
        self.tasks: List[ScheduledTask] = []
        self._current_interval = None
        self.running = False
    
    def every(self, seconds: float) -> 'TaskScheduler':
        """Set interval for next task."""
        # YOUR CODE HERE
        pass
    
    def at(self, time_str: str) -> 'TaskScheduler':
        """Set specific time for next task (HH:MM format)."""
        # YOUR CODE HERE
        pass
    
    def do(self, func: Callable, *args, **kwargs):
        """Register the task."""
        # YOUR CODE HERE
        pass
    
    def run_pending(self):
        """Run any tasks that are due."""
        # YOUR CODE HERE
        pass
    
    def run(self, duration: Optional[float] = None):
        """Run scheduler loop."""
        # YOUR CODE HERE
        pass
    
    def stop(self):
        """Stop the scheduler."""
        self.running = False


# ============================================================
# EXERCISE 5: Simple Web Scraper
# ============================================================
"""
Create a basic web scraper (using only stdlib).
"""

class HTMLExtractor:
    """
    Extract data from HTML using regex patterns.
    
    Example:
        >>> html = '<h1>Title</h1><a href="/link">Click</a>'
        >>> extractor = HTMLExtractor(html)
        >>> extractor.get_text('h1')
        ['Title']
        >>> extractor.get_links()
        [{'href': '/link', 'text': 'Click'}]
    """
    
    def __init__(self, html: str):
        self.html = html
    
    def get_text(self, tag: str) -> List[str]:
        """Extract text content from all occurrences of a tag."""
        # YOUR CODE HERE
        pass
    
    def get_links(self) -> List[Dict[str, str]]:
        """Extract all links with href and text."""
        # YOUR CODE HERE
        pass
    
    def get_attribute(self, tag: str, attr: str) -> List[str]:
        """Extract attribute value from all occurrences of a tag."""
        # YOUR CODE HERE
        pass
    
    def get_table_data(self) -> List[List[str]]:
        """Extract data from first table as 2D list."""
        # YOUR CODE HERE
        pass


# ============================================================
# EXERCISE 6: Command Runner
# ============================================================
"""
Create a command runner with timeout and retry support.
"""

@dataclass
class CommandResult:
    """Result of a command execution."""
    success: bool
    stdout: str
    stderr: str
    returncode: int
    duration: float


class CommandRunner:
    """
    Run system commands with timeout and retry.
    
    Example:
        >>> runner = CommandRunner()
        >>> result = runner.run(['ls', '-la'], timeout=10)
        >>> print(result.stdout)
        
        >>> # With retry
        >>> result = runner.run_with_retry(
        ...     ['flaky_command'],
        ...     max_retries=3,
        ...     retry_delay=1.0
        ... )
    """
    
    def __init__(self, default_timeout: float = 30.0):
        self.default_timeout = default_timeout
        self.history: List[CommandResult] = []
    
    def run(self, cmd: List[str], timeout: Optional[float] = None) -> CommandResult:
        """Run a command with optional timeout."""
        # YOUR CODE HERE
        pass
    
    def run_with_retry(
        self,
        cmd: List[str],
        max_retries: int = 3,
        retry_delay: float = 1.0,
        timeout: Optional[float] = None
    ) -> CommandResult:
        """Run command with retries on failure."""
        # YOUR CODE HERE
        pass
    
    def run_pipeline(self, commands: List[List[str]]) -> List[CommandResult]:
        """Run multiple commands in sequence, stop on failure."""
        # YOUR CODE HERE
        pass


# ============================================================
# EXERCISE 7: Config File Manager
# ============================================================
"""
Create a configuration file manager with validation.
"""

class ConfigManager:
    """
    Manage configuration files with validation.
    
    Example:
        >>> config = ConfigManager('config.json')
        >>> config.set('database.host', 'localhost')
        >>> config.set('database.port', 5432)
        >>> config.get('database.host')
        'localhost'
        >>> config.save()
    """
    
    def __init__(self, filepath: str, schema: Optional[Dict] = None):
        self.filepath = Path(filepath)
        self.schema = schema
        self.data: Dict = {}
        self._load()
    
    def _load(self):
        """Load config from file if exists."""
        # YOUR CODE HERE
        pass
    
    def get(self, key: str, default: Any = None) -> Any:
        """
        Get config value using dot notation.
        Example: get('database.host')
        """
        # YOUR CODE HERE
        pass
    
    def set(self, key: str, value: Any):
        """
        Set config value using dot notation.
        Example: set('database.host', 'localhost')
        """
        # YOUR CODE HERE
        pass
    
    def validate(self) -> List[str]:
        """Validate config against schema, return list of errors."""
        # YOUR CODE HERE
        pass
    
    def save(self):
        """Save config to file."""
        # YOUR CODE HERE
        pass


# ============================================================
# EXERCISE 8: Log Analyzer
# ============================================================
"""
Create a log file analyzer.
"""

@dataclass
class LogEntry:
    """Parsed log entry."""
    timestamp: datetime
    level: str
    message: str
    source: Optional[str] = None


class LogAnalyzer:
    """
    Analyze log files.
    
    Example:
        >>> analyzer = LogAnalyzer('app.log')
        >>> analyzer.parse()
        >>> print(analyzer.get_errors())
        >>> print(analyzer.get_stats())
    """
    
    # Common log pattern: 2024-01-15 10:30:45 [ERROR] Message here
    LOG_PATTERN = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+)'
    
    def __init__(self, filepath: str):
        self.filepath = Path(filepath)
        self.entries: List[LogEntry] = []
    
    def parse(self) -> int:
        """Parse log file, return number of entries parsed."""
        # YOUR CODE HERE
        pass
    
    def get_by_level(self, level: str) -> List[LogEntry]:
        """Get all entries of a specific level."""
        # YOUR CODE HERE
        pass
    
    def get_errors(self) -> List[LogEntry]:
        """Get all ERROR entries."""
        return self.get_by_level('ERROR')
    
    def get_in_timerange(
        self,
        start: datetime,
        end: datetime
    ) -> List[LogEntry]:
        """Get entries within time range."""
        # YOUR CODE HERE
        pass
    
    def get_stats(self) -> Dict[str, Any]:
        """
        Get log statistics.
        Returns: {total, by_level, first_entry, last_entry, duration}
        """
        # YOUR CODE HERE
        pass


# ============================================================
# EXERCISE 9: Rate-Limited API Client
# ============================================================
"""
Create a rate-limited API client wrapper.
"""

class RateLimitedClient:
    """
    Rate-limited HTTP client simulation.
    
    Example:
        >>> client = RateLimitedClient(requests_per_second=2)
        >>> for url in urls:
        ...     response = client.get(url)  # Automatically rate limited
    """
    
    def __init__(self, requests_per_second: float = 1.0):
        self.min_interval = 1.0 / requests_per_second
        self.last_request = 0.0
        self.request_count = 0
    
    def _wait_if_needed(self):
        """Wait until next request is allowed."""
        # YOUR CODE HERE
        pass
    
    def get(self, url: str) -> Dict:
        """
        Simulate GET request with rate limiting.
        Returns simulated response.
        """
        # YOUR CODE HERE
        pass
    
    def batch_get(self, urls: List[str]) -> List[Dict]:
        """Get multiple URLs with rate limiting."""
        # YOUR CODE HERE
        pass


# ============================================================
# EXERCISE 10: Automation Pipeline
# ============================================================
"""
Create a flexible automation pipeline.
"""

@dataclass
class PipelineStep:
    """A step in the pipeline."""
    name: str
    func: Callable
    args: tuple = ()
    kwargs: dict = None
    on_error: str = 'stop'  # 'stop', 'skip', 'retry'
    max_retries: int = 3


class AutomationPipeline:
    """
    Automation pipeline for chaining tasks.
    
    Example:
        >>> pipeline = AutomationPipeline("Daily Tasks")
        >>> pipeline.add_step("fetch_data", fetch_data)
        >>> pipeline.add_step("process", process_data, on_error='skip')
        >>> pipeline.add_step("report", send_report)
        >>> result = pipeline.run()
    """
    
    def __init__(self, name: str):
        self.name = name
        self.steps: List[PipelineStep] = []
        self.results: Dict[str, Any] = {}
        self.errors: List[Dict] = []
    
    def add_step(
        self,
        name: str,
        func: Callable,
        *args,
        on_error: str = 'stop',
        max_retries: int = 3,
        **kwargs
    ):
        """Add a step to the pipeline."""
        # YOUR CODE HERE
        pass
    
    def run(self) -> Dict[str, Any]:
        """
        Run the pipeline.
        Returns: {success, results, errors, duration}
        """
        # YOUR CODE HERE
        pass
    
    def get_report(self) -> str:
        """Generate human-readable report of last run."""
        # YOUR CODE HERE
        pass


# ============================================================
# SOLUTIONS (Uncomment to check your work)
# ============================================================

"""
# Solution 1: File Organizer
class FileOrganizer:
    def __init__(self, source_dir: str):
        self.source = Path(source_dir).expanduser()
        self.rules: List[FileRule] = []
        self._history = []
    
    def add_rule(self, name: str, condition: Callable[[Path], bool], destination: str):
        self.rules.append(FileRule(name, condition, destination))
    
    def organize(self, dry_run: bool = True) -> Dict[str, List[str]]:
        moves = defaultdict(list)
        
        for file in self.source.iterdir():
            if not file.is_file():
                continue
            
            for rule in self.rules:
                if rule.condition(file):
                    moves[rule.destination].append(file.name)
                    
                    if not dry_run:
                        dest_dir = self.source / rule.destination
                        dest_dir.mkdir(exist_ok=True)
                        shutil.move(str(file), str(dest_dir / file.name))
                        self._history.append((dest_dir / file.name, file))
                    break
        
        return dict(moves)


# Solution 5: HTML Extractor
class HTMLExtractor:
    def __init__(self, html: str):
        self.html = html
    
    def get_text(self, tag: str) -> List[str]:
        pattern = f'<{tag}[^>]*>(.*?)</{tag}>'
        matches = re.findall(pattern, self.html, re.DOTALL | re.IGNORECASE)
        return [re.sub(r'<[^>]+>', '', m).strip() for m in matches]
    
    def get_links(self) -> List[Dict[str, str]]:
        pattern = r'<a\\s+[^>]*href=["\\'](.*?)["\\''][^>]*>(.*?)</a>'
        matches = re.findall(pattern, self.html, re.DOTALL | re.IGNORECASE)
        return [{'href': href, 'text': re.sub(r'<[^>]+>', '', text).strip()} 
                for href, text in matches]
    
    def get_attribute(self, tag: str, attr: str) -> List[str]:
        pattern = f'<{tag}[^>]*{attr}=["\\'](.*?)["\\''][^>]*>'
        return re.findall(pattern, self.html, re.IGNORECASE)


# Solution 7: Config Manager
class ConfigManager:
    def __init__(self, filepath: str, schema: Optional[Dict] = None):
        self.filepath = Path(filepath)
        self.schema = schema
        self.data: Dict = {}
        self._load()
    
    def _load(self):
        if self.filepath.exists():
            with open(self.filepath) as f:
                self.data = json.load(f)
    
    def get(self, key: str, default: Any = None) -> Any:
        keys = key.split('.')
        value = self.data
        for k in keys:
            if isinstance(value, dict) and k in value:
                value = value[k]
            else:
                return default
        return value
    
    def set(self, key: str, value: Any):
        keys = key.split('.')
        data = self.data
        for k in keys[:-1]:
            data = data.setdefault(k, {})
        data[keys[-1]] = value
    
    def save(self):
        with open(self.filepath, 'w') as f:
            json.dump(self.data, f, indent=2)


# Solution 9: Rate Limited Client
class RateLimitedClient:
    def __init__(self, requests_per_second: float = 1.0):
        self.min_interval = 1.0 / requests_per_second
        self.last_request = 0.0
        self.request_count = 0
    
    def _wait_if_needed(self):
        now = time.time()
        elapsed = now - self.last_request
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        self.last_request = time.time()
    
    def get(self, url: str) -> Dict:
        self._wait_if_needed()
        self.request_count += 1
        return {'url': url, 'status': 200, 'data': f'Response from {url}'}
    
    def batch_get(self, urls: List[str]) -> List[Dict]:
        return [self.get(url) for url in urls]
"""


if __name__ == "__main__":
    print("Automation Exercises")
    print("=" * 50)
    print("\nComplete the exercises above to practice:")
    print("- File system automation")
    print("- Directory watching")
    print("- Backup management")
    print("- Task scheduling")
    print("- Web scraping")
    print("- Command execution")
    print("- Configuration management")
    print("- Log analysis")
    print("- Rate limiting")
    print("- Pipeline creation")
    print("\nUncomment the solutions to check your work!")
Exercises - Python Tutorial | DeepML