python

examples

examples.py🐍
"""
Professional API Development - Examples

Build production-ready REST APIs with authentication, validation, and best practices.
"""

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Callable
import json
import hashlib
import hmac
import base64
import secrets


# =============================================================================
# REST API CONCEPTS
# =============================================================================

"""
REST (Representational State Transfer) Principles:

1. RESOURCES - Everything is a resource with a URL
   - /users, /users/123, /users/123/posts

2. HTTP METHODS - Standard operations
   - GET    - Read (safe, idempotent)
   - POST   - Create (not idempotent)
   - PUT    - Update/Replace (idempotent)
   - PATCH  - Partial Update (not always idempotent)
   - DELETE - Delete (idempotent)

3. STATUS CODES
   - 2xx: Success (200 OK, 201 Created, 204 No Content)
   - 3xx: Redirect (301 Moved, 304 Not Modified)
   - 4xx: Client Error (400 Bad Request, 401 Unauthorized, 404 Not Found)
   - 5xx: Server Error (500 Internal Error, 503 Service Unavailable)

4. STATELESS - Each request contains all information needed

5. REPRESENTATIONS - Resources can have multiple formats (JSON, XML, etc.)
"""


# =============================================================================
# API RESPONSE MODELS
# =============================================================================

@dataclass
class ApiResponse:
    """Standard API response wrapper."""
    success: bool
    data: Any = None
    error: str | None = None
    message: str | None = None
    meta: dict = field(default_factory=dict)
    
    def to_dict(self) -> dict:
        result = {"success": self.success}
        if self.data is not None:
            result["data"] = self.data
        if self.error:
            result["error"] = self.error
        if self.message:
            result["message"] = self.message
        if self.meta:
            result["meta"] = self.meta
        return result
    
    def to_json(self) -> str:
        return json.dumps(self.to_dict(), default=str)


@dataclass
class PaginatedResponse:
    """Paginated API response."""
    items: list
    total: int
    page: int
    per_page: int
    
    @property
    def total_pages(self) -> int:
        return (self.total + self.per_page - 1) // self.per_page
    
    @property
    def has_next(self) -> bool:
        return self.page < self.total_pages
    
    @property
    def has_prev(self) -> bool:
        return self.page > 1
    
    def to_dict(self) -> dict:
        return {
            "items": self.items,
            "pagination": {
                "total": self.total,
                "page": self.page,
                "per_page": self.per_page,
                "total_pages": self.total_pages,
                "has_next": self.has_next,
                "has_prev": self.has_prev
            }
        }


# =============================================================================
# REQUEST VALIDATION
# =============================================================================

@dataclass
class ValidationError:
    """Validation error for a field."""
    field: str
    message: str
    code: str = "invalid"


class Validator:
    """Request data validator."""
    
    def __init__(self):
        self.errors: list[ValidationError] = []
    
    def required(self, data: dict, field: str, message: str = None) -> Any:
        """Check that a field is present and not empty."""
        value = data.get(field)
        if value is None or value == "":
            self.errors.append(ValidationError(
                field=field,
                message=message or f"{field} is required",
                code="required"
            ))
            return None
        return value
    
    def string(self, data: dict, field: str, min_len: int = 0, max_len: int = None) -> str | None:
        """Validate string field."""
        value = data.get(field)
        if value is None:
            return None
        
        if not isinstance(value, str):
            self.errors.append(ValidationError(
                field=field,
                message=f"{field} must be a string",
                code="type_error"
            ))
            return None
        
        if len(value) < min_len:
            self.errors.append(ValidationError(
                field=field,
                message=f"{field} must be at least {min_len} characters",
                code="min_length"
            ))
        
        if max_len and len(value) > max_len:
            self.errors.append(ValidationError(
                field=field,
                message=f"{field} must be at most {max_len} characters",
                code="max_length"
            ))
        
        return value
    
    def integer(self, data: dict, field: str, min_val: int = None, max_val: int = None) -> int | None:
        """Validate integer field."""
        value = data.get(field)
        if value is None:
            return None
        
        try:
            value = int(value)
        except (TypeError, ValueError):
            self.errors.append(ValidationError(
                field=field,
                message=f"{field} must be an integer",
                code="type_error"
            ))
            return None
        
        if min_val is not None and value < min_val:
            self.errors.append(ValidationError(
                field=field,
                message=f"{field} must be at least {min_val}",
                code="min_value"
            ))
        
        if max_val is not None and value > max_val:
            self.errors.append(ValidationError(
                field=field,
                message=f"{field} must be at most {max_val}",
                code="max_value"
            ))
        
        return value
    
    def email(self, data: dict, field: str) -> str | None:
        """Validate email field."""
        value = self.string(data, field)
        if value and "@" not in value:
            self.errors.append(ValidationError(
                field=field,
                message=f"{field} must be a valid email",
                code="invalid_email"
            ))
        return value
    
    def is_valid(self) -> bool:
        return len(self.errors) == 0
    
    def get_errors(self) -> list[dict]:
        return [{"field": e.field, "message": e.message, "code": e.code} for e in self.errors]


# =============================================================================
# AUTHENTICATION - JWT
# =============================================================================

@dataclass
class JWTConfig:
    """JWT configuration."""
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30
    refresh_token_expire_days: int = 7


class JWTAuth:
    """Simple JWT authentication handler."""
    
    def __init__(self, config: JWTConfig):
        self.config = config
    
    def _base64_encode(self, data: bytes) -> str:
        """URL-safe base64 encode."""
        return base64.urlsafe_b64encode(data).rstrip(b'=').decode('ascii')
    
    def _base64_decode(self, data: str) -> bytes:
        """URL-safe base64 decode."""
        padding = 4 - len(data) % 4
        if padding != 4:
            data += '=' * padding
        return base64.urlsafe_b64decode(data)
    
    def create_token(self, payload: dict, expires_delta: timedelta = None) -> str:
        """Create a JWT token."""
        if expires_delta is None:
            expires_delta = timedelta(minutes=self.config.access_token_expire_minutes)
        
        # Header
        header = {"alg": self.config.algorithm, "typ": "JWT"}
        header_b64 = self._base64_encode(json.dumps(header).encode())
        
        # Payload with expiration
        payload = payload.copy()
        payload["exp"] = (datetime.utcnow() + expires_delta).timestamp()
        payload["iat"] = datetime.utcnow().timestamp()
        payload_b64 = self._base64_encode(json.dumps(payload).encode())
        
        # Signature
        message = f"{header_b64}.{payload_b64}"
        signature = hmac.new(
            self.config.secret_key.encode(),
            message.encode(),
            hashlib.sha256
        ).digest()
        signature_b64 = self._base64_encode(signature)
        
        return f"{header_b64}.{payload_b64}.{signature_b64}"
    
    def verify_token(self, token: str) -> dict | None:
        """Verify and decode a JWT token."""
        try:
            parts = token.split('.')
            if len(parts) != 3:
                return None
            
            header_b64, payload_b64, signature_b64 = parts
            
            # Verify signature
            message = f"{header_b64}.{payload_b64}"
            expected_signature = hmac.new(
                self.config.secret_key.encode(),
                message.encode(),
                hashlib.sha256
            ).digest()
            
            actual_signature = self._base64_decode(signature_b64)
            if not hmac.compare_digest(expected_signature, actual_signature):
                return None
            
            # Decode payload
            payload = json.loads(self._base64_decode(payload_b64))
            
            # Check expiration
            if payload.get("exp", 0) < datetime.utcnow().timestamp():
                return None
            
            return payload
        except Exception:
            return None
    
    def create_access_token(self, user_id: str, **extra_claims) -> str:
        """Create an access token for a user."""
        payload = {"sub": user_id, "type": "access", **extra_claims}
        return self.create_token(payload)
    
    def create_refresh_token(self, user_id: str) -> str:
        """Create a refresh token for a user."""
        payload = {"sub": user_id, "type": "refresh"}
        expires = timedelta(days=self.config.refresh_token_expire_days)
        return self.create_token(payload, expires)


# =============================================================================
# AUTHENTICATION - API KEY
# =============================================================================

class APIKeyAuth:
    """API Key authentication handler."""
    
    def __init__(self):
        self.api_keys: dict[str, dict] = {}
    
    def generate_key(self, name: str, scopes: list[str] = None) -> str:
        """Generate a new API key."""
        key = f"sk_{secrets.token_urlsafe(32)}"
        self.api_keys[key] = {
            "name": name,
            "scopes": scopes or ["read"],
            "created_at": datetime.utcnow().isoformat(),
            "last_used": None
        }
        return key
    
    def validate_key(self, key: str) -> dict | None:
        """Validate an API key and return its metadata."""
        if key not in self.api_keys:
            return None
        
        self.api_keys[key]["last_used"] = datetime.utcnow().isoformat()
        return self.api_keys[key]
    
    def has_scope(self, key: str, scope: str) -> bool:
        """Check if API key has a specific scope."""
        data = self.api_keys.get(key)
        if not data:
            return False
        return scope in data.get("scopes", [])
    
    def revoke_key(self, key: str) -> bool:
        """Revoke an API key."""
        if key in self.api_keys:
            del self.api_keys[key]
            return True
        return False


# =============================================================================
# RATE LIMITING
# =============================================================================

@dataclass
class RateLimitInfo:
    """Rate limit information."""
    limit: int
    remaining: int
    reset_at: datetime
    
    @property
    def reset_in_seconds(self) -> int:
        return max(0, int((self.reset_at - datetime.utcnow()).total_seconds()))


class RateLimiter:
    """Token bucket rate limiter."""
    
    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.buckets: dict[str, dict] = {}
    
    def _get_bucket(self, key: str) -> dict:
        """Get or create a bucket for the key."""
        now = datetime.utcnow()
        
        if key not in self.buckets:
            self.buckets[key] = {
                "tokens": self.requests_per_minute,
                "last_update": now,
                "reset_at": now + timedelta(minutes=1)
            }
        
        bucket = self.buckets[key]
        
        # Refill tokens based on time passed
        elapsed = (now - bucket["last_update"]).total_seconds()
        refill = int(elapsed * self.requests_per_minute / 60)
        
        if refill > 0:
            bucket["tokens"] = min(self.requests_per_minute, bucket["tokens"] + refill)
            bucket["last_update"] = now
        
        # Reset if window passed
        if now >= bucket["reset_at"]:
            bucket["tokens"] = self.requests_per_minute
            bucket["reset_at"] = now + timedelta(minutes=1)
        
        return bucket
    
    def is_allowed(self, key: str) -> tuple[bool, RateLimitInfo]:
        """Check if request is allowed and consume a token."""
        bucket = self._get_bucket(key)
        
        info = RateLimitInfo(
            limit=self.requests_per_minute,
            remaining=max(0, bucket["tokens"] - 1),
            reset_at=bucket["reset_at"]
        )
        
        if bucket["tokens"] > 0:
            bucket["tokens"] -= 1
            return True, info
        
        info.remaining = 0
        return False, info


# =============================================================================
# SIMPLE FLASK-LIKE API FRAMEWORK
# =============================================================================

class Route:
    """A route definition."""
    
    def __init__(self, path: str, method: str, handler: Callable, **options):
        self.path = path
        self.method = method.upper()
        self.handler = handler
        self.options = options


class Request:
    """Simplified request object."""
    
    def __init__(self, method: str, path: str, headers: dict = None, 
                 body: dict = None, query: dict = None):
        self.method = method.upper()
        self.path = path
        self.headers = headers or {}
        self.body = body or {}
        self.query = query or {}
        self.user = None  # Set by auth middleware


class Response:
    """Simplified response object."""
    
    def __init__(self, data: Any = None, status: int = 200, headers: dict = None):
        self.data = data
        self.status = status
        self.headers = headers or {"Content-Type": "application/json"}
    
    def to_json(self) -> str:
        return json.dumps(self.data, default=str)
    
    @classmethod
    def ok(cls, data: Any = None, message: str = None) -> 'Response':
        return cls({"success": True, "data": data, "message": message}, 200)
    
    @classmethod
    def created(cls, data: Any = None) -> 'Response':
        return cls({"success": True, "data": data}, 201)
    
    @classmethod
    def no_content(cls) -> 'Response':
        return cls(None, 204)
    
    @classmethod
    def bad_request(cls, error: str, details: Any = None) -> 'Response':
        return cls({"success": False, "error": error, "details": details}, 400)
    
    @classmethod
    def unauthorized(cls, error: str = "Unauthorized") -> 'Response':
        return cls({"success": False, "error": error}, 401)
    
    @classmethod
    def forbidden(cls, error: str = "Forbidden") -> 'Response':
        return cls({"success": False, "error": error}, 403)
    
    @classmethod
    def not_found(cls, error: str = "Not found") -> 'Response':
        return cls({"success": False, "error": error}, 404)
    
    @classmethod
    def internal_error(cls, error: str = "Internal server error") -> 'Response':
        return cls({"success": False, "error": error}, 500)


class MiniAPI:
    """Minimal API framework for demonstration."""
    
    def __init__(self, name: str = "api"):
        self.name = name
        self.routes: list[Route] = []
        self.middleware: list[Callable] = []
    
    def route(self, path: str, methods: list[str] = None, **options):
        """Decorator to register a route."""
        methods = methods or ["GET"]
        
        def decorator(func: Callable):
            for method in methods:
                self.routes.append(Route(path, method, func, **options))
            return func
        
        return decorator
    
    def get(self, path: str, **options):
        """Shortcut for GET route."""
        return self.route(path, ["GET"], **options)
    
    def post(self, path: str, **options):
        """Shortcut for POST route."""
        return self.route(path, ["POST"], **options)
    
    def put(self, path: str, **options):
        """Shortcut for PUT route."""
        return self.route(path, ["PUT"], **options)
    
    def delete(self, path: str, **options):
        """Shortcut for DELETE route."""
        return self.route(path, ["DELETE"], **options)
    
    def use(self, middleware: Callable):
        """Add middleware."""
        self.middleware.append(middleware)
    
    def _match_route(self, method: str, path: str) -> tuple[Route | None, dict]:
        """Match a route and extract path parameters."""
        for route in self.routes:
            if route.method != method:
                continue
            
            # Simple path matching with parameters
            route_parts = route.path.split('/')
            path_parts = path.split('/')
            
            if len(route_parts) != len(path_parts):
                continue
            
            params = {}
            match = True
            
            for rp, pp in zip(route_parts, path_parts):
                if rp.startswith('{') and rp.endswith('}'):
                    param_name = rp[1:-1]
                    params[param_name] = pp
                elif rp != pp:
                    match = False
                    break
            
            if match:
                return route, params
        
        return None, {}
    
    def handle(self, request: Request) -> Response:
        """Handle a request."""
        # Apply middleware
        for mw in self.middleware:
            result = mw(request)
            if isinstance(result, Response):
                return result
        
        # Find matching route
        route, params = self._match_route(request.method, request.path)
        
        if not route:
            return Response.not_found(f"No route for {request.method} {request.path}")
        
        try:
            # Call handler
            result = route.handler(request, **params)
            
            if isinstance(result, Response):
                return result
            
            return Response.ok(result)
        except Exception as e:
            return Response.internal_error(str(e))


# =============================================================================
# EXAMPLE API IMPLEMENTATION
# =============================================================================

# Create API instance
api = MiniAPI("users-api")

# In-memory database
users_db: dict[int, dict] = {
    1: {"id": 1, "name": "Alice", "email": "alice@example.com"},
    2: {"id": 2, "name": "Bob", "email": "bob@example.com"},
}
next_id = 3

# JWT configuration
jwt_config = JWTConfig(secret_key="super-secret-key-change-in-production")
jwt_auth = JWTAuth(jwt_config)

# Rate limiter
rate_limiter = RateLimiter(requests_per_minute=100)


# Middleware
def auth_middleware(request: Request) -> Response | None:
    """Authentication middleware."""
    # Skip auth for login endpoint
    if request.path == "/auth/login":
        return None
    
    auth_header = request.headers.get("Authorization", "")
    if not auth_header.startswith("Bearer "):
        return Response.unauthorized("Missing or invalid Authorization header")
    
    token = auth_header[7:]
    payload = jwt_auth.verify_token(token)
    
    if not payload:
        return Response.unauthorized("Invalid or expired token")
    
    request.user = payload
    return None


def rate_limit_middleware(request: Request) -> Response | None:
    """Rate limiting middleware."""
    # Use IP or user ID as key
    key = request.user.get("sub") if request.user else request.headers.get("X-Forwarded-For", "anonymous")
    
    allowed, info = rate_limiter.is_allowed(key)
    
    if not allowed:
        response = Response(
            {"error": "Rate limit exceeded", "retry_after": info.reset_in_seconds},
            429
        )
        response.headers["X-RateLimit-Limit"] = str(info.limit)
        response.headers["X-RateLimit-Remaining"] = str(info.remaining)
        response.headers["X-RateLimit-Reset"] = str(int(info.reset_at.timestamp()))
        return response
    
    return None


# Routes
@api.post("/auth/login")
def login(request: Request) -> Response:
    """Login endpoint."""
    email = request.body.get("email")
    password = request.body.get("password")
    
    # Simple validation
    if not email or not password:
        return Response.bad_request("Email and password required")
    
    # Find user (in real app, check password hash)
    user = next((u for u in users_db.values() if u["email"] == email), None)
    
    if not user:
        return Response.unauthorized("Invalid credentials")
    
    # Generate tokens
    access_token = jwt_auth.create_access_token(str(user["id"]), name=user["name"])
    refresh_token = jwt_auth.create_refresh_token(str(user["id"]))
    
    return Response.ok({
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer"
    })


@api.get("/users")
def list_users(request: Request) -> Response:
    """List all users with pagination."""
    page = int(request.query.get("page", 1))
    per_page = int(request.query.get("per_page", 10))
    
    users = list(users_db.values())
    total = len(users)
    
    # Paginate
    start = (page - 1) * per_page
    end = start + per_page
    paginated_users = users[start:end]
    
    return Response.ok(PaginatedResponse(
        items=paginated_users,
        total=total,
        page=page,
        per_page=per_page
    ).to_dict())


@api.get("/users/{user_id}")
def get_user(request: Request, user_id: str) -> Response:
    """Get a specific user."""
    user = users_db.get(int(user_id))
    
    if not user:
        return Response.not_found(f"User {user_id} not found")
    
    return Response.ok(user)


@api.post("/users")
def create_user(request: Request) -> Response:
    """Create a new user."""
    global next_id
    
    # Validate
    validator = Validator()
    name = validator.required(request.body, "name")
    email = validator.email(request.body, "email")
    validator.required(request.body, "email")
    
    if not validator.is_valid():
        return Response.bad_request("Validation failed", validator.get_errors())
    
    # Check email uniqueness
    if any(u["email"] == email for u in users_db.values()):
        return Response.bad_request("Email already exists")
    
    # Create user
    user = {
        "id": next_id,
        "name": name,
        "email": email
    }
    users_db[next_id] = user
    next_id += 1
    
    return Response.created(user)


@api.put("/users/{user_id}")
def update_user(request: Request, user_id: str) -> Response:
    """Update a user."""
    user = users_db.get(int(user_id))
    
    if not user:
        return Response.not_found(f"User {user_id} not found")
    
    # Update fields
    if "name" in request.body:
        user["name"] = request.body["name"]
    if "email" in request.body:
        user["email"] = request.body["email"]
    
    return Response.ok(user)


@api.delete("/users/{user_id}")
def delete_user(request: Request, user_id: str) -> Response:
    """Delete a user."""
    if int(user_id) not in users_db:
        return Response.not_found(f"User {user_id} not found")
    
    del users_db[int(user_id)]
    return Response.no_content()


# =============================================================================
# DEMONSTRATIONS
# =============================================================================

if __name__ == "__main__":
    print("=" * 60)
    print("API VALIDATION")
    print("=" * 60)
    validator = Validator()
    data = {"name": "Jo", "email": "invalid", "age": "abc"}
    
    validator.required(data, "name")
    validator.string(data, "name", min_len=3)
    validator.email(data, "email")
    validator.integer(data, "age")
    
    print(f"Valid: {validator.is_valid()}")
    print(f"Errors: {json.dumps(validator.get_errors(), indent=2)}")
    
    print("\n" + "=" * 60)
    print("JWT AUTHENTICATION")
    print("=" * 60)
    
    token = jwt_auth.create_access_token("user123", role="admin")
    print(f"Token: {token[:50]}...")
    
    payload = jwt_auth.verify_token(token)
    print(f"Payload: {payload}")
    
    print("\n" + "=" * 60)
    print("RATE LIMITING")
    print("=" * 60)
    
    for i in range(5):
        allowed, info = rate_limiter.is_allowed("test-user")
        print(f"Request {i+1}: allowed={allowed}, remaining={info.remaining}")
    
    print("\n" + "=" * 60)
    print("API REQUESTS")
    print("=" * 60)
    
    # Login
    request = Request("POST", "/auth/login", body={"email": "alice@example.com", "password": "test"})
    response = api.handle(request)
    print(f"Login: {response.status} - {response.to_json()[:100]}...")
    
    # Get users (would need auth in real scenario)
    request = Request("GET", "/users", query={"page": "1", "per_page": "10"})
    request.user = {"sub": "1"}  # Mock auth
    response = api.handle(request)
    print(f"List users: {response.status}")
    
    # Get single user
    request = Request("GET", "/users/1")
    request.user = {"sub": "1"}
    response = api.handle(request)
    print(f"Get user 1: {response.status} - {response.to_json()}")
Examples - Python Tutorial | DeepML