python

examples

examples.py🐍
"""
Python Security - Practical Examples
=====================================

Demonstrates security best practices in Python.
"""

import hashlib
import secrets
import os
import re
import html
from typing import Optional
from dataclasses import dataclass
from pathlib import Path


# ============================================================
# SECTION 1: Password Hashing
# ============================================================

print("=" * 60)
print("SECTION 1: Password Hashing")
print("=" * 60)

# Using hashlib for PBKDF2 (built-in)
def hash_password_pbkdf2(password: str) -> tuple[str, bytes]:
    """
    Hash password using PBKDF2-HMAC-SHA256.
    Returns (hash_hex, salt).
    """
    salt = os.urandom(16)
    hash_bytes = hashlib.pbkdf2_hmac(
        'sha256',           # Hash algorithm
        password.encode(),  # Password as bytes
        salt,               # Salt
        100000             # Iterations (higher = more secure)
    )
    return hash_bytes.hex(), salt


def verify_password_pbkdf2(password: str, stored_hash: str, salt: bytes) -> bool:
    """Verify password against stored hash."""
    computed_hash = hashlib.pbkdf2_hmac(
        'sha256',
        password.encode(),
        salt,
        100000
    )
    # Use constant-time comparison to prevent timing attacks
    return secrets.compare_digest(computed_hash.hex(), stored_hash)


# Demo
password = "my_secret_password"
hash_hex, salt = hash_password_pbkdf2(password)
print(f"Password: {password}")
print(f"Salt: {salt.hex()}")
print(f"Hash: {hash_hex}")
print(f"Verification (correct): {verify_password_pbkdf2(password, hash_hex, salt)}")
print(f"Verification (wrong): {verify_password_pbkdf2('wrong', hash_hex, salt)}")


# Using scrypt (memory-hard, built-in Python 3.6+)
def hash_password_scrypt(password: str) -> tuple[str, bytes]:
    """Hash password using scrypt."""
    salt = os.urandom(16)
    hash_bytes = hashlib.scrypt(
        password.encode(),
        salt=salt,
        n=2**14,  # CPU/Memory cost parameter
        r=8,      # Block size
        p=1       # Parallelization parameter
    )
    return hash_bytes.hex(), salt


print("\nUsing scrypt:")
scrypt_hash, scrypt_salt = hash_password_scrypt(password)
print(f"Scrypt hash: {scrypt_hash[:64]}...")


# ============================================================
# SECTION 2: Secure Random Generation
# ============================================================

print("\n" + "=" * 60)
print("SECTION 2: Secure Random Generation")
print("=" * 60)

# Generate various secure tokens
print("\nSecure tokens:")
print(f"Random bytes (16): {secrets.token_bytes(16).hex()}")
print(f"Random hex (32): {secrets.token_hex(16)}")
print(f"URL-safe token: {secrets.token_urlsafe(16)}")

# Secure random integers
print(f"\nSecure random int (0-99): {secrets.randbelow(100)}")
print(f"Secure random choice: {secrets.choice(['apple', 'banana', 'cherry'])}")


# Password generator
def generate_secure_password(length: int = 16) -> str:
    """Generate a cryptographically secure password."""
    import string
    
    if length < 8:
        raise ValueError("Password must be at least 8 characters")
    
    # Character pools
    lowercase = string.ascii_lowercase
    uppercase = string.ascii_uppercase
    digits = string.digits
    special = "!@#$%^&*()_+-=[]{}|"
    
    # Ensure at least one of each type
    password = [
        secrets.choice(lowercase),
        secrets.choice(uppercase),
        secrets.choice(digits),
        secrets.choice(special),
    ]
    
    # Fill remaining with random characters
    all_chars = lowercase + uppercase + digits + special
    password += [secrets.choice(all_chars) for _ in range(length - 4)]
    
    # Shuffle using secure random
    shuffled = list(password)
    secrets.SystemRandom().shuffle(shuffled)
    
    return ''.join(shuffled)


print("\nGenerated passwords:")
for i in range(3):
    print(f"  Password {i+1}: {generate_secure_password(16)}")


# API key generator
def generate_api_key(prefix: str = "sk") -> str:
    """Generate a secure API key."""
    return f"{prefix}_{secrets.token_urlsafe(32)}"


print("\nGenerated API keys:")
print(f"  Live key: {generate_api_key('sk_live')}")
print(f"  Test key: {generate_api_key('sk_test')}")


# ============================================================
# SECTION 3: Hashing for Data Integrity
# ============================================================

print("\n" + "=" * 60)
print("SECTION 3: Hashing for Data Integrity")
print("=" * 60)


def compute_file_hash(filepath: str, algorithm: str = 'sha256') -> str:
    """Compute hash of a file for integrity verification."""
    hash_obj = hashlib.new(algorithm)
    
    with open(filepath, 'rb') as f:
        # Read in chunks for large files
        for chunk in iter(lambda: f.read(8192), b''):
            hash_obj.update(chunk)
    
    return hash_obj.hexdigest()


def compute_string_hash(data: str, algorithm: str = 'sha256') -> str:
    """Compute hash of a string."""
    return hashlib.new(algorithm, data.encode()).hexdigest()


# Demo
sample_data = "Important document content"
print(f"Data: {sample_data}")
print(f"SHA-256: {compute_string_hash(sample_data, 'sha256')}")
print(f"SHA-512: {compute_string_hash(sample_data, 'sha512')}")
print(f"MD5: {compute_string_hash(sample_data, 'md5')}")  # Don't use for security!


# HMAC for authenticated messages
import hmac

def create_hmac(message: str, key: bytes) -> str:
    """Create HMAC for message authentication."""
    return hmac.new(key, message.encode(), hashlib.sha256).hexdigest()


def verify_hmac(message: str, mac: str, key: bytes) -> bool:
    """Verify HMAC signature."""
    expected_mac = create_hmac(message, key)
    return hmac.compare_digest(mac, expected_mac)


# Demo
secret_key = secrets.token_bytes(32)
message = "Transfer $1000 to account 12345"
mac = create_hmac(message, secret_key)

print(f"\nHMAC Authentication:")
print(f"Message: {message}")
print(f"HMAC: {mac}")
print(f"Verification (valid): {verify_hmac(message, mac, secret_key)}")
print(f"Verification (tampered): {verify_hmac('Transfer $9000', mac, secret_key)}")


# ============================================================
# SECTION 4: Input Validation
# ============================================================

print("\n" + "=" * 60)
print("SECTION 4: Input Validation")
print("=" * 60)


class InputValidator:
    """Collection of input validation methods."""
    
    @staticmethod
    def validate_email(email: str) -> bool:
        """Validate email format."""
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, email)) and len(email) <= 254
    
    @staticmethod
    def validate_username(username: str) -> bool:
        """Validate username (alphanumeric, 3-30 chars)."""
        pattern = r'^[a-zA-Z][a-zA-Z0-9_]{2,29}$'
        return bool(re.match(pattern, username))
    
    @staticmethod
    def validate_password_strength(password: str) -> tuple[bool, list[str]]:
        """Check password strength, return (is_valid, issues)."""
        issues = []
        
        if len(password) < 8:
            issues.append("At least 8 characters required")
        if len(password) > 128:
            issues.append("Maximum 128 characters")
        if not re.search(r'[a-z]', password):
            issues.append("Missing lowercase letter")
        if not re.search(r'[A-Z]', password):
            issues.append("Missing uppercase letter")
        if not re.search(r'\d', password):
            issues.append("Missing digit")
        if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            issues.append("Missing special character")
        
        return len(issues) == 0, issues
    
    @staticmethod
    def validate_phone(phone: str) -> bool:
        """Validate phone number (basic)."""
        # Remove common formatting
        cleaned = re.sub(r'[\s\-\(\)\.]', '', phone)
        # Check if it's 10-15 digits, optionally starting with +
        return bool(re.match(r'^\+?\d{10,15}$', cleaned))
    
    @staticmethod
    def validate_url(url: str) -> bool:
        """Validate URL format."""
        pattern = (
            r'^https?://'  # http:// or https://
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'  # domain
            r'localhost|'  # localhost
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'  # IP
            r'(?::\d+)?'  # optional port
            r'(?:/?|[/?]\S+)$'
        )
        return bool(re.match(pattern, url, re.IGNORECASE))


# Demo
validator = InputValidator()

print("\nEmail validation:")
emails = ["user@example.com", "invalid-email", "test@test", "user.name+tag@domain.org"]
for email in emails:
    print(f"  {email}: {validator.validate_email(email)}")

print("\nUsername validation:")
usernames = ["john_doe", "a", "123user", "valid_user_123", "user@name"]
for username in usernames:
    print(f"  {username}: {validator.validate_username(username)}")

print("\nPassword strength:")
passwords = ["weak", "StrongP@ss1", "onlylowercase", "NoSpecial123"]
for pwd in passwords:
    valid, issues = validator.validate_password_strength(pwd)
    status = "✓" if valid else f"✗ ({', '.join(issues)})"
    print(f"  {pwd}: {status}")


# ============================================================
# SECTION 5: Input Sanitization
# ============================================================

print("\n" + "=" * 60)
print("SECTION 5: Input Sanitization")
print("=" * 60)


class Sanitizer:
    """Input sanitization methods."""
    
    @staticmethod
    def sanitize_html(text: str) -> str:
        """Escape HTML special characters."""
        return html.escape(text)
    
    @staticmethod
    def sanitize_string(text: str, extra_allowed: str = "") -> str:
        """Remove non-alphanumeric characters except allowed ones."""
        import string
        allowed = set(string.ascii_letters + string.digits + " " + extra_allowed)
        return ''.join(c if c in allowed else '' for c in text)
    
    @staticmethod
    def sanitize_filename(filename: str) -> str:
        """Sanitize filename to prevent path traversal."""
        # Remove directory components
        filename = os.path.basename(filename)
        
        # Remove potentially dangerous characters
        filename = re.sub(r'[^\w\s\-\.]', '', filename)
        
        # Remove leading dots (hidden files)
        filename = filename.lstrip('.')
        
        # Limit length
        name, ext = os.path.splitext(filename)
        name = name[:100]  # Limit base name
        ext = ext[:10]     # Limit extension
        
        return f"{name}{ext}" if ext else name or "unnamed"
    
    @staticmethod
    def sanitize_sql_identifier(identifier: str) -> str:
        """Sanitize SQL table/column name (NOT for values!)."""
        # Only allow alphanumeric and underscore
        sanitized = re.sub(r'[^a-zA-Z0-9_]', '', identifier)
        
        # Must start with letter
        if sanitized and not sanitized[0].isalpha():
            sanitized = 'c_' + sanitized
        
        return sanitized[:64] or 'unnamed'  # Limit length


sanitizer = Sanitizer()

print("\nHTML Sanitization (XSS Prevention):")
malicious_html = '<script>alert("XSS")</script><img src=x onerror=alert("XSS")>'
print(f"  Input:  {malicious_html}")
print(f"  Output: {sanitizer.sanitize_html(malicious_html)}")

print("\nFilename Sanitization:")
malicious_filenames = [
    "../../../etc/passwd",
    "..\\..\\windows\\system32",
    ".hidden_file",
    "file; rm -rf /",
    "normal_file.txt"
]
for fn in malicious_filenames:
    print(f"  {fn!r} -> {sanitizer.sanitize_filename(fn)!r}")


# ============================================================
# SECTION 6: Safe Path Handling
# ============================================================

print("\n" + "=" * 60)
print("SECTION 6: Safe Path Handling")
print("=" * 60)


def safe_path_join(base_dir: str, *paths) -> Optional[str]:
    """
    Safely join paths, preventing directory traversal.
    Returns None if traversal attempt detected.
    """
    base = Path(base_dir).resolve()
    
    # Join and resolve the target path
    try:
        target = base.joinpath(*paths).resolve()
    except (ValueError, OSError):
        return None
    
    # Check if target is under base
    try:
        target.relative_to(base)
        return str(target)
    except ValueError:
        # Target is outside base directory
        return None


# Demo
base_dir = "/var/www/uploads"

print(f"\nSafe path joining (base: {base_dir}):")
test_paths = [
    ("documents", "report.pdf"),
    ("../../../etc", "passwd"),
    ("images", "..", "..", "etc", "passwd"),
    ("normal", "path", "file.txt"),
]

for paths in test_paths:
    result = safe_path_join(base_dir, *paths)
    status = result if result else "BLOCKED (traversal detected)"
    print(f"  {'/'.join(paths)} -> {status}")


# ============================================================
# SECTION 7: Constant-Time Comparison
# ============================================================

print("\n" + "=" * 60)
print("SECTION 7: Constant-Time Comparison")
print("=" * 60)


def verify_token_UNSAFE(provided: str, expected: str) -> bool:
    """
    UNSAFE: Vulnerable to timing attacks.
    Don't use this!
    """
    return provided == expected


def verify_token_SAFE(provided: str, expected: str) -> bool:
    """
    SAFE: Uses constant-time comparison.
    Use this for security-sensitive comparisons.
    """
    return secrets.compare_digest(provided, expected)


# Demo
api_key = "sk_live_abc123def456"
print(f"API Key: {api_key}")
print(f"Safe comparison (correct): {verify_token_SAFE(api_key, api_key)}")
print(f"Safe comparison (wrong): {verify_token_SAFE('wrong_key', api_key)}")
print("\nNote: secrets.compare_digest takes the same time regardless of")
print("where the mismatch occurs, preventing timing attacks.")


# ============================================================
# SECTION 8: Secure Configuration
# ============================================================

print("\n" + "=" * 60)
print("SECTION 8: Secure Configuration")
print("=" * 60)


@dataclass
class SecureConfig:
    """Secure application configuration."""
    database_url: str
    secret_key: str
    debug: bool = False
    
    @classmethod
    def from_env(cls) -> 'SecureConfig':
        """Load configuration from environment variables."""
        
        def get_required(name: str) -> str:
            value = os.environ.get(name)
            if not value:
                raise ValueError(f"Missing required env var: {name}")
            return value
        
        return cls(
            database_url=get_required('DATABASE_URL'),
            secret_key=get_required('SECRET_KEY'),
            debug=os.environ.get('DEBUG', '').lower() == 'true'
        )
    
    def __repr__(self):
        """Don't expose secrets in logs."""
        return (
            f"SecureConfig("
            f"database_url='***', "
            f"secret_key='***', "
            f"debug={self.debug})"
        )


# Demo (simulating environment)
print("\nSecure configuration example:")
print("Setting environment variables...")
os.environ['DATABASE_URL'] = 'postgresql://user:pass@localhost/db'
os.environ['SECRET_KEY'] = secrets.token_urlsafe(32)
os.environ['DEBUG'] = 'false'

config = SecureConfig.from_env()
print(f"Config object: {config}")  # Secrets are hidden
print("\nNote: __repr__ hides sensitive values to prevent logging exposure")


# ============================================================
# SECTION 9: Rate Limiting (Simple Implementation)
# ============================================================

print("\n" + "=" * 60)
print("SECTION 9: Rate Limiting")
print("=" * 60)

from collections import defaultdict
from datetime import datetime, timedelta
import time


class RateLimiter:
    """Simple in-memory rate limiter."""
    
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window = timedelta(seconds=window_seconds)
        self.requests: dict[str, list[datetime]] = defaultdict(list)
    
    def is_allowed(self, identifier: str) -> tuple[bool, int]:
        """
        Check if request is allowed.
        Returns (is_allowed, remaining_requests).
        """
        now = datetime.now()
        window_start = now - self.window
        
        # Clean old requests
        self.requests[identifier] = [
            req for req in self.requests[identifier]
            if req > window_start
        ]
        
        # Check limit
        current_count = len(self.requests[identifier])
        
        if current_count >= self.max_requests:
            return False, 0
        
        # Record this request
        self.requests[identifier].append(now)
        return True, self.max_requests - current_count - 1
    
    def reset(self, identifier: str):
        """Reset rate limit for identifier."""
        self.requests[identifier] = []


# Demo
limiter = RateLimiter(max_requests=5, window_seconds=10)

print(f"\nRate limiter: 5 requests per 10 seconds")
for i in range(7):
    allowed, remaining = limiter.is_allowed("user123")
    status = "✓ Allowed" if allowed else "✗ Rate limited"
    print(f"  Request {i+1}: {status} (remaining: {remaining})")


# ============================================================
# SECTION 10: Secure Error Handling
# ============================================================

print("\n" + "=" * 60)
print("SECTION 10: Secure Error Handling")
print("=" * 60)


class SecureError(Exception):
    """
    Exception that separates internal details from user message.
    """
    def __init__(self, user_message: str, internal_details: str = ""):
        self.user_message = user_message
        self.internal_details = internal_details
        super().__init__(user_message)


def process_user_data(data: str):
    """Example function that may raise secure errors."""
    if not data:
        raise SecureError(
            user_message="Invalid input provided",
            internal_details=f"Empty data received at {datetime.now()}"
        )
    
    if len(data) > 1000:
        raise SecureError(
            user_message="Input too large",
            internal_details=f"Data size: {len(data)} bytes, limit: 1000"
        )
    
    return f"Processed: {data}"


def handle_request_safely(data: str) -> dict:
    """Handle request with secure error handling."""
    try:
        result = process_user_data(data)
        return {"success": True, "data": result}
    
    except SecureError as e:
        # Log internal details (server-side only)
        print(f"  [LOG] Internal error: {e.internal_details}")
        # Return safe message to user
        return {"success": False, "error": e.user_message}
    
    except Exception as e:
        # Log unexpected error details
        print(f"  [LOG] Unexpected error: {type(e).__name__}: {e}")
        # Return generic message to user
        return {"success": False, "error": "An unexpected error occurred"}


# Demo
print("\nSecure error handling:")
print(f"  Valid input: {handle_request_safely('Hello')}")
print(f"  Empty input: {handle_request_safely('')}")
print(f"  Large input: {handle_request_safely('x' * 2000)}")


# ============================================================
# SECTION 11: Session Token Management
# ============================================================

print("\n" + "=" * 60)
print("SECTION 11: Session Token Management")
print("=" * 60)


class SessionManager:
    """Simple secure session manager."""
    
    def __init__(self, expiry_minutes: int = 30):
        self.sessions: dict[str, dict] = {}
        self.expiry = timedelta(minutes=expiry_minutes)
    
    def create_session(self, user_id: str) -> str:
        """Create a new session for user."""
        # Generate secure session token
        token = secrets.token_urlsafe(32)
        
        self.sessions[token] = {
            'user_id': user_id,
            'created_at': datetime.now(),
            'expires_at': datetime.now() + self.expiry
        }
        
        return token
    
    def validate_session(self, token: str) -> Optional[str]:
        """
        Validate session token.
        Returns user_id if valid, None if invalid/expired.
        """
        session = self.sessions.get(token)
        
        if not session:
            return None
        
        if datetime.now() > session['expires_at']:
            # Clean up expired session
            del self.sessions[token]
            return None
        
        return session['user_id']
    
    def invalidate_session(self, token: str):
        """Invalidate (logout) a session."""
        self.sessions.pop(token, None)
    
    def invalidate_all_user_sessions(self, user_id: str):
        """Invalidate all sessions for a user."""
        tokens_to_remove = [
            token for token, data in self.sessions.items()
            if data['user_id'] == user_id
        ]
        for token in tokens_to_remove:
            del self.sessions[token]


# Demo
session_mgr = SessionManager(expiry_minutes=30)

print("\nSession management:")
token = session_mgr.create_session("user_123")
print(f"  Created session token: {token[:20]}...")

user = session_mgr.validate_session(token)
print(f"  Validated session for: {user}")

session_mgr.invalidate_session(token)
user = session_mgr.validate_session(token)
print(f"  After invalidation: {user}")


# ============================================================
# Summary
# ============================================================

print("\n" + "=" * 60)
print("SECURITY BEST PRACTICES SUMMARY")
print("=" * 60)
print("""
1. PASSWORDS: Always use proper hashing (bcrypt, argon2, scrypt, PBKDF2)
2. RANDOM: Use `secrets` module, never `random` for security
3. COMPARISON: Use `secrets.compare_digest` for constant-time comparison
4. INPUT: Validate AND sanitize all user input
5. PATHS: Prevent directory traversal with proper path joining
6. HTML: Always escape output to prevent XSS
7. SQL: Use parameterized queries, never string formatting
8. ERRORS: Log details internally, show generic messages to users
9. CONFIG: Store secrets in environment variables, never in code
10. SESSIONS: Use secure random tokens, implement expiration
""")
Examples - Python Tutorial | DeepML