python

examples

examples.py🐍
"""
Python Automation and Scripting - Examples
Demonstrates file management, system tasks, scheduling, and web scraping
"""

import os
import shutil
import subprocess
import time
import json
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Callable, Optional
from collections import defaultdict


# ============================================================
# FILE SYSTEM AUTOMATION
# ============================================================

print("=" * 60)
print("FILE SYSTEM AUTOMATION")
print("=" * 60)


# --- Working with Paths ---
print("\n--- Path Operations (pathlib) ---")

# Current paths
current = Path.cwd()
home = Path.home()

print(f"Current directory: {current}")
print(f"Home directory: {home}")

# Path components
sample_path = Path('/home/user/documents/report.txt')
print(f"\nPath: {sample_path}")
print(f"  Name: {sample_path.name}")
print(f"  Stem: {sample_path.stem}")
print(f"  Suffix: {sample_path.suffix}")
print(f"  Parent: {sample_path.parent}")

# Building paths
config_path = home / '.config' / 'myapp' / 'config.json'
print(f"\nBuilt path: {config_path}")


# --- File Operations Demo ---
print("\n--- File Operations Demo ---")

# Create demo directory
demo_dir = Path('automation_demo')
demo_dir.mkdir(exist_ok=True)

# Create some files
for i in range(3):
    file_path = demo_dir / f'file_{i}.txt'
    file_path.write_text(f'This is file {i}\nCreated at {datetime.now()}')

# List files
print("Created files:")
for f in demo_dir.iterdir():
    print(f"  {f.name}: {f.stat().st_size} bytes")

# Read file
content = (demo_dir / 'file_0.txt').read_text()
print(f"\nContent of file_0.txt:\n{content}")


# --- Batch Rename Example ---
print("\n--- Batch Rename Function ---")

def batch_rename(
    directory: Path,
    pattern: str,
    replacement: str,
    dry_run: bool = True
) -> List[tuple]:
    """
    Rename files matching pattern.
    Returns list of (old_name, new_name) tuples.
    """
    changes = []
    
    for file in directory.iterdir():
        if pattern in file.name:
            new_name = file.name.replace(pattern, replacement)
            changes.append((file.name, new_name))
            
            if not dry_run:
                file.rename(file.parent / new_name)
    
    return changes

# Demo
changes = batch_rename(demo_dir, 'file_', 'document_', dry_run=True)
print("Planned renames (dry run):")
for old, new in changes:
    print(f"  {old} -> {new}")


# --- Organize Files by Extension ---
print("\n--- Organize Files by Extension ---")

def organize_by_extension(directory: Path, dry_run: bool = True) -> Dict[str, List[str]]:
    """Organize files into folders by extension."""
    moves = defaultdict(list)
    
    for file in directory.iterdir():
        if file.is_file():
            ext = file.suffix.lower() or 'no_extension'
            folder_name = ext[1:] if ext.startswith('.') else ext
            moves[folder_name].append(file.name)
            
            if not dry_run:
                dest_dir = directory / folder_name
                dest_dir.mkdir(exist_ok=True)
                shutil.move(str(file), str(dest_dir / file.name))
    
    return dict(moves)

# Create mixed files for demo
(demo_dir / 'image.jpg').write_text('fake jpg')
(demo_dir / 'script.py').write_text('# python file')
(demo_dir / 'data.json').write_text('{}')

organization = organize_by_extension(demo_dir, dry_run=True)
print("Would organize into folders:")
for folder, files in organization.items():
    print(f"  {folder}/: {files}")


# --- Find Duplicate Files ---
print("\n--- Find Duplicate Files ---")

def find_duplicates_by_name(directory: Path) -> Dict[str, List[Path]]:
    """Find files with same name in subdirectories."""
    files_by_name = defaultdict(list)
    
    for file in directory.rglob('*'):
        if file.is_file():
            files_by_name[file.name].append(file)
    
    return {
        name: paths for name, paths in files_by_name.items()
        if len(paths) > 1
    }

# Create subdir with duplicate
subdir = demo_dir / 'subdir'
subdir.mkdir(exist_ok=True)
(subdir / 'file_0.txt').write_text('Duplicate!')

duplicates = find_duplicates_by_name(demo_dir)
print("Duplicate files found:")
for name, paths in duplicates.items():
    print(f"  {name}:")
    for p in paths:
        print(f"    - {p}")


# --- Directory Statistics ---
print("\n--- Directory Statistics ---")

def get_dir_stats(directory: Path) -> Dict:
    """Get statistics about a directory."""
    stats = {
        'total_files': 0,
        'total_dirs': 0,
        'total_size': 0,
        'extensions': defaultdict(int),
        'largest_files': []
    }
    
    files_with_size = []
    
    for item in directory.rglob('*'):
        if item.is_file():
            stats['total_files'] += 1
            size = item.stat().st_size
            stats['total_size'] += size
            stats['extensions'][item.suffix or 'no_ext'] += 1
            files_with_size.append((item, size))
        elif item.is_dir():
            stats['total_dirs'] += 1
    
    # Get top 3 largest files
    files_with_size.sort(key=lambda x: x[1], reverse=True)
    stats['largest_files'] = [
        (str(f), size) for f, size in files_with_size[:3]
    ]
    
    return stats

dir_stats = get_dir_stats(demo_dir)
print(f"Files: {dir_stats['total_files']}")
print(f"Directories: {dir_stats['total_dirs']}")
print(f"Total size: {dir_stats['total_size']} bytes")
print(f"Extensions: {dict(dir_stats['extensions'])}")


# ============================================================
# SYSTEM AND PROCESS AUTOMATION
# ============================================================

print("\n" + "=" * 60)
print("SYSTEM AND PROCESS AUTOMATION")
print("=" * 60)


# --- Running Commands ---
print("\n--- Running System Commands ---")

def run_command(cmd: List[str], capture: bool = True) -> Dict:
    """Run command and return result."""
    try:
        result = subprocess.run(
            cmd,
            capture_output=capture,
            text=True,
            timeout=30
        )
        return {
            'success': result.returncode == 0,
            'returncode': result.returncode,
            'stdout': result.stdout if capture else None,
            'stderr': result.stderr if capture else None
        }
    except subprocess.TimeoutExpired:
        return {'success': False, 'error': 'Timeout'}
    except FileNotFoundError:
        return {'success': False, 'error': 'Command not found'}

# Example commands
commands = [
    ['echo', 'Hello World'],
    ['date'],
    ['pwd'],
]

for cmd in commands:
    result = run_command(cmd)
    if result['success']:
        output = result['stdout'].strip() if result['stdout'] else ''
        print(f"$ {' '.join(cmd)}: {output}")


# --- System Information ---
print("\n--- System Information ---")

import platform

system_info = {
    'System': platform.system(),
    'Release': platform.release(),
    'Machine': platform.machine(),
    'Python': platform.python_version(),
    'Processor': platform.processor() or 'Unknown',
}

for key, value in system_info.items():
    print(f"{key}: {value}")


# --- Environment Variables ---
print("\n--- Environment Variables ---")

env_vars = ['USER', 'HOME', 'SHELL', 'LANG']
for var in env_vars:
    value = os.environ.get(var, 'Not set')
    print(f"{var}: {value}")


# ============================================================
# TASK SCHEDULING
# ============================================================

print("\n" + "=" * 60)
print("TASK SCHEDULING")
print("=" * 60)


# --- Simple Scheduler ---
print("\n--- Simple Task Scheduler ---")

class SimpleScheduler:
    """Basic task scheduler for demonstration."""
    
    def __init__(self):
        self.tasks = []
    
    def every_seconds(self, seconds: int, func: Callable, *args, **kwargs):
        """Schedule task to run every N seconds."""
        self.tasks.append({
            'interval': seconds,
            'func': func,
            'args': args,
            'kwargs': kwargs,
            'last_run': 0
        })
    
    def run_pending(self):
        """Run any tasks that are due."""
        now = time.time()
        for task in self.tasks:
            if now - task['last_run'] >= task['interval']:
                task['func'](*task['args'], **task['kwargs'])
                task['last_run'] = now
    
    def run_for(self, duration: float):
        """Run scheduler for specified duration."""
        end_time = time.time() + duration
        while time.time() < end_time:
            self.run_pending()
            time.sleep(0.1)


# Demo scheduler
def heartbeat():
    print(f"  Heartbeat: {datetime.now().strftime('%H:%M:%S')}")

def status_check():
    print(f"  Status check completed")

scheduler = SimpleScheduler()
scheduler.every_seconds(1, heartbeat)
scheduler.every_seconds(2, status_check)

print("Running scheduler for 3 seconds:")
scheduler.run_for(3)


# --- Cron Expression Parser (Simplified) ---
print("\n--- Cron Expression Parser ---")

class SimpleCron:
    """Simplified cron expression handler."""
    
    @staticmethod
    def parse(expression: str) -> Dict:
        """Parse cron expression into components."""
        parts = expression.split()
        if len(parts) != 5:
            raise ValueError("Invalid cron expression")
        
        return {
            'minute': parts[0],
            'hour': parts[1],
            'day_of_month': parts[2],
            'month': parts[3],
            'day_of_week': parts[4]
        }
    
    @staticmethod
    def describe(expression: str) -> str:
        """Human-readable description of cron expression."""
        parts = SimpleCron.parse(expression)
        
        descriptions = []
        
        if parts['minute'] == '*' and parts['hour'] == '*':
            descriptions.append("Every minute")
        elif parts['minute'] == '0' and parts['hour'] == '*':
            descriptions.append("Every hour at minute 0")
        elif parts['minute'] != '*' and parts['hour'] != '*':
            descriptions.append(f"At {parts['hour']}:{parts['minute'].zfill(2)}")
        
        if parts['day_of_week'] != '*':
            days = ['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']
            try:
                day_idx = int(parts['day_of_week'])
                descriptions.append(f"on {days[day_idx]}")
            except ValueError:
                descriptions.append(f"on {parts['day_of_week']}")
        
        return ' '.join(descriptions) if descriptions else expression

# Examples
cron_examples = [
    '* * * * *',      # Every minute
    '0 * * * *',      # Every hour
    '0 9 * * *',      # Daily at 9 AM
    '0 9 * * 1',      # Monday at 9 AM
    '30 17 * * 5',    # Friday at 5:30 PM
]

print("Cron expressions:")
for expr in cron_examples:
    print(f"  {expr}: {SimpleCron.describe(expr)}")


# ============================================================
# WEB SCRAPING (Simulated)
# ============================================================

print("\n" + "=" * 60)
print("WEB SCRAPING (Simulated)")
print("=" * 60)


# --- HTML Parser (Simplified) ---
print("\n--- Simple HTML Parser ---")

class SimpleHTMLParser:
    """Basic HTML parsing without external dependencies."""
    
    def __init__(self, html: str):
        self.html = html
    
    def find_all_tags(self, tag: str) -> List[str]:
        """Find all occurrences of a tag."""
        import re
        pattern = f'<{tag}[^>]*>(.*?)</{tag}>'
        return re.findall(pattern, self.html, re.DOTALL | re.IGNORECASE)
    
    def find_all_links(self) -> List[Dict[str, str]]:
        """Find all <a> tags and extract href and text."""
        import re
        pattern = r'<a\s+[^>]*href=["\']([^"\']*)["\'][^>]*>(.*?)</a>'
        matches = re.findall(pattern, self.html, re.DOTALL | re.IGNORECASE)
        return [{'href': m[0], 'text': m[1].strip()} for m in matches]
    
    def get_title(self) -> Optional[str]:
        """Get page title."""
        titles = self.find_all_tags('title')
        return titles[0] if titles else None
    
    def get_text(self) -> str:
        """Extract text content."""
        import re
        text = re.sub(r'<[^>]+>', ' ', self.html)
        text = re.sub(r'\s+', ' ', text)
        return text.strip()

# Demo HTML
sample_html = """
<!DOCTYPE html>
<html>
<head>
    <title>Sample Page</title>
</head>
<body>
    <h1>Welcome</h1>
    <p>This is a sample page.</p>
    <a href="/about">About Us</a>
    <a href="/contact">Contact</a>
    <a href="https://example.com">External Link</a>
</body>
</html>
"""

parser = SimpleHTMLParser(sample_html)
print(f"Title: {parser.get_title()}")
print("\nLinks found:")
for link in parser.find_all_links():
    print(f"  {link['text']}: {link['href']}")


# --- Web Scraper Class (Structure) ---
print("\n--- Web Scraper Structure ---")

class WebScraper:
    """
    Web scraper structure (without actual HTTP requests).
    In real usage, would use requests + BeautifulSoup.
    """
    
    def __init__(self, delay: float = 1.0):
        self.delay = delay
        self.visited = set()
    
    def fetch(self, url: str) -> str:
        """Simulate fetching a page."""
        print(f"  [FETCH] {url}")
        time.sleep(self.delay * 0.1)  # Simulated delay
        return f"<html><title>{url}</title></html>"
    
    def extract_data(self, html: str, selectors: Dict[str, str]) -> Dict:
        """Simulate data extraction."""
        return {key: f"Value for {sel}" for key, sel in selectors.items()}
    
    def crawl(self, start_url: str, max_pages: int = 3) -> List[Dict]:
        """Simulate crawling."""
        results = []
        urls = [start_url]
        
        while urls and len(self.visited) < max_pages:
            url = urls.pop(0)
            if url in self.visited:
                continue
            
            html = self.fetch(url)
            self.visited.add(url)
            
            results.append({
                'url': url,
                'data': self.extract_data(html, {'title': 'title'})
            })
        
        return results

# Demo
scraper = WebScraper(delay=0.5)
print("Simulated crawl:")
results = scraper.crawl('https://example.com', max_pages=3)
for r in results:
    print(f"  {r['url']}: {r['data']}")


# ============================================================
# AUTOMATION UTILITIES
# ============================================================

print("\n" + "=" * 60)
print("AUTOMATION UTILITIES")
print("=" * 60)


# --- Retry Decorator ---
print("\n--- Retry Decorator ---")

def retry(max_attempts: int = 3, delay: float = 1.0, exceptions: tuple = (Exception,)):
    """Decorator to retry failed operations."""
    def decorator(func):
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    print(f"  Attempt {attempt + 1} failed: {e}")
                    if attempt < max_attempts - 1:
                        time.sleep(delay * 0.1)  # Shortened for demo
            raise last_exception
        return wrapper
    return decorator

# Demo
attempt_count = 0

@retry(max_attempts=3, delay=1.0)
def unreliable_operation():
    global attempt_count
    attempt_count += 1
    if attempt_count < 3:
        raise ValueError("Random failure")
    return "Success!"

try:
    result = unreliable_operation()
    print(f"Result: {result}")
except Exception as e:
    print(f"Failed after retries: {e}")


# --- Rate Limiter ---
print("\n--- Rate Limiter ---")

class RateLimiter:
    """Simple rate limiter for API calls."""
    
    def __init__(self, calls_per_second: float):
        self.min_interval = 1.0 / calls_per_second
        self.last_call = 0
    
    def wait(self):
        """Wait until next call is allowed."""
        now = time.time()
        elapsed = now - self.last_call
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        self.last_call = time.time()
    
    def __call__(self, func):
        """Use as decorator."""
        def wrapper(*args, **kwargs):
            self.wait()
            return func(*args, **kwargs)
        return wrapper

# Demo
limiter = RateLimiter(calls_per_second=2)

@limiter
def api_call(i):
    print(f"  API call {i} at {time.time():.2f}")

print("Rate-limited calls (2/sec):")
for i in range(4):
    api_call(i)


# --- Progress Tracker ---
print("\n--- Progress Tracker ---")

class ProgressTracker:
    """Track progress of long-running tasks."""
    
    def __init__(self, total: int, desc: str = "Progress"):
        self.total = total
        self.current = 0
        self.desc = desc
        self.start_time = time.time()
    
    def update(self, n: int = 1):
        """Update progress."""
        self.current += n
        self._display()
    
    def _display(self):
        """Display progress bar."""
        percent = (self.current / self.total) * 100
        filled = int(percent // 5)
        bar = '█' * filled + '░' * (20 - filled)
        elapsed = time.time() - self.start_time
        eta = (elapsed / self.current) * (self.total - self.current) if self.current > 0 else 0
        print(f"\r  {self.desc}: |{bar}| {percent:.1f}% ETA: {eta:.1f}s", end='', flush=True)
    
    def finish(self):
        """Mark as complete."""
        print()

# Demo
print("Processing files:")
tracker = ProgressTracker(10, "Files")
for i in range(10):
    time.sleep(0.1)
    tracker.update()
tracker.finish()


# --- Cleanup ---
print("\n--- Cleanup ---")
shutil.rmtree(demo_dir)
print(f"Removed demo directory: {demo_dir}")


print("\n" + "=" * 60)
print("All automation examples completed!")
print("=" * 60)
print("\nFor full functionality, install:")
print("  pip install requests beautifulsoup4 schedule")
Examples - Python Tutorial | DeepML