python
examples
examples.py🐍python
"""
Professional API Development - Examples
Build production-ready REST APIs with authentication, validation, and best practices.
"""
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Callable
import json
import hashlib
import hmac
import base64
import secrets
# =============================================================================
# REST API CONCEPTS
# =============================================================================
"""
REST (Representational State Transfer) Principles:
1. RESOURCES - Everything is a resource with a URL
- /users, /users/123, /users/123/posts
2. HTTP METHODS - Standard operations
- GET - Read (safe, idempotent)
- POST - Create (not idempotent)
- PUT - Update/Replace (idempotent)
- PATCH - Partial Update (not always idempotent)
- DELETE - Delete (idempotent)
3. STATUS CODES
- 2xx: Success (200 OK, 201 Created, 204 No Content)
- 3xx: Redirect (301 Moved, 304 Not Modified)
- 4xx: Client Error (400 Bad Request, 401 Unauthorized, 404 Not Found)
- 5xx: Server Error (500 Internal Error, 503 Service Unavailable)
4. STATELESS - Each request contains all information needed
5. REPRESENTATIONS - Resources can have multiple formats (JSON, XML, etc.)
"""
# =============================================================================
# API RESPONSE MODELS
# =============================================================================
@dataclass
class ApiResponse:
"""Standard API response wrapper."""
success: bool
data: Any = None
error: str | None = None
message: str | None = None
meta: dict = field(default_factory=dict)
def to_dict(self) -> dict:
result = {"success": self.success}
if self.data is not None:
result["data"] = self.data
if self.error:
result["error"] = self.error
if self.message:
result["message"] = self.message
if self.meta:
result["meta"] = self.meta
return result
def to_json(self) -> str:
return json.dumps(self.to_dict(), default=str)
@dataclass
class PaginatedResponse:
"""Paginated API response."""
items: list
total: int
page: int
per_page: int
@property
def total_pages(self) -> int:
return (self.total + self.per_page - 1) // self.per_page
@property
def has_next(self) -> bool:
return self.page < self.total_pages
@property
def has_prev(self) -> bool:
return self.page > 1
def to_dict(self) -> dict:
return {
"items": self.items,
"pagination": {
"total": self.total,
"page": self.page,
"per_page": self.per_page,
"total_pages": self.total_pages,
"has_next": self.has_next,
"has_prev": self.has_prev
}
}
# =============================================================================
# REQUEST VALIDATION
# =============================================================================
@dataclass
class ValidationError:
"""Validation error for a field."""
field: str
message: str
code: str = "invalid"
class Validator:
"""Request data validator."""
def __init__(self):
self.errors: list[ValidationError] = []
def required(self, data: dict, field: str, message: str = None) -> Any:
"""Check that a field is present and not empty."""
value = data.get(field)
if value is None or value == "":
self.errors.append(ValidationError(
field=field,
message=message or f"{field} is required",
code="required"
))
return None
return value
def string(self, data: dict, field: str, min_len: int = 0, max_len: int = None) -> str | None:
"""Validate string field."""
value = data.get(field)
if value is None:
return None
if not isinstance(value, str):
self.errors.append(ValidationError(
field=field,
message=f"{field} must be a string",
code="type_error"
))
return None
if len(value) < min_len:
self.errors.append(ValidationError(
field=field,
message=f"{field} must be at least {min_len} characters",
code="min_length"
))
if max_len and len(value) > max_len:
self.errors.append(ValidationError(
field=field,
message=f"{field} must be at most {max_len} characters",
code="max_length"
))
return value
def integer(self, data: dict, field: str, min_val: int = None, max_val: int = None) -> int | None:
"""Validate integer field."""
value = data.get(field)
if value is None:
return None
try:
value = int(value)
except (TypeError, ValueError):
self.errors.append(ValidationError(
field=field,
message=f"{field} must be an integer",
code="type_error"
))
return None
if min_val is not None and value < min_val:
self.errors.append(ValidationError(
field=field,
message=f"{field} must be at least {min_val}",
code="min_value"
))
if max_val is not None and value > max_val:
self.errors.append(ValidationError(
field=field,
message=f"{field} must be at most {max_val}",
code="max_value"
))
return value
def email(self, data: dict, field: str) -> str | None:
"""Validate email field."""
value = self.string(data, field)
if value and "@" not in value:
self.errors.append(ValidationError(
field=field,
message=f"{field} must be a valid email",
code="invalid_email"
))
return value
def is_valid(self) -> bool:
return len(self.errors) == 0
def get_errors(self) -> list[dict]:
return [{"field": e.field, "message": e.message, "code": e.code} for e in self.errors]
# =============================================================================
# AUTHENTICATION - JWT
# =============================================================================
@dataclass
class JWTConfig:
"""JWT configuration."""
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
class JWTAuth:
"""Simple JWT authentication handler."""
def __init__(self, config: JWTConfig):
self.config = config
def _base64_encode(self, data: bytes) -> str:
"""URL-safe base64 encode."""
return base64.urlsafe_b64encode(data).rstrip(b'=').decode('ascii')
def _base64_decode(self, data: str) -> bytes:
"""URL-safe base64 decode."""
padding = 4 - len(data) % 4
if padding != 4:
data += '=' * padding
return base64.urlsafe_b64decode(data)
def create_token(self, payload: dict, expires_delta: timedelta = None) -> str:
"""Create a JWT token."""
if expires_delta is None:
expires_delta = timedelta(minutes=self.config.access_token_expire_minutes)
# Header
header = {"alg": self.config.algorithm, "typ": "JWT"}
header_b64 = self._base64_encode(json.dumps(header).encode())
# Payload with expiration
payload = payload.copy()
payload["exp"] = (datetime.utcnow() + expires_delta).timestamp()
payload["iat"] = datetime.utcnow().timestamp()
payload_b64 = self._base64_encode(json.dumps(payload).encode())
# Signature
message = f"{header_b64}.{payload_b64}"
signature = hmac.new(
self.config.secret_key.encode(),
message.encode(),
hashlib.sha256
).digest()
signature_b64 = self._base64_encode(signature)
return f"{header_b64}.{payload_b64}.{signature_b64}"
def verify_token(self, token: str) -> dict | None:
"""Verify and decode a JWT token."""
try:
parts = token.split('.')
if len(parts) != 3:
return None
header_b64, payload_b64, signature_b64 = parts
# Verify signature
message = f"{header_b64}.{payload_b64}"
expected_signature = hmac.new(
self.config.secret_key.encode(),
message.encode(),
hashlib.sha256
).digest()
actual_signature = self._base64_decode(signature_b64)
if not hmac.compare_digest(expected_signature, actual_signature):
return None
# Decode payload
payload = json.loads(self._base64_decode(payload_b64))
# Check expiration
if payload.get("exp", 0) < datetime.utcnow().timestamp():
return None
return payload
except Exception:
return None
def create_access_token(self, user_id: str, **extra_claims) -> str:
"""Create an access token for a user."""
payload = {"sub": user_id, "type": "access", **extra_claims}
return self.create_token(payload)
def create_refresh_token(self, user_id: str) -> str:
"""Create a refresh token for a user."""
payload = {"sub": user_id, "type": "refresh"}
expires = timedelta(days=self.config.refresh_token_expire_days)
return self.create_token(payload, expires)
# =============================================================================
# AUTHENTICATION - API KEY
# =============================================================================
class APIKeyAuth:
"""API Key authentication handler."""
def __init__(self):
self.api_keys: dict[str, dict] = {}
def generate_key(self, name: str, scopes: list[str] = None) -> str:
"""Generate a new API key."""
key = f"sk_{secrets.token_urlsafe(32)}"
self.api_keys[key] = {
"name": name,
"scopes": scopes or ["read"],
"created_at": datetime.utcnow().isoformat(),
"last_used": None
}
return key
def validate_key(self, key: str) -> dict | None:
"""Validate an API key and return its metadata."""
if key not in self.api_keys:
return None
self.api_keys[key]["last_used"] = datetime.utcnow().isoformat()
return self.api_keys[key]
def has_scope(self, key: str, scope: str) -> bool:
"""Check if API key has a specific scope."""
data = self.api_keys.get(key)
if not data:
return False
return scope in data.get("scopes", [])
def revoke_key(self, key: str) -> bool:
"""Revoke an API key."""
if key in self.api_keys:
del self.api_keys[key]
return True
return False
# =============================================================================
# RATE LIMITING
# =============================================================================
@dataclass
class RateLimitInfo:
"""Rate limit information."""
limit: int
remaining: int
reset_at: datetime
@property
def reset_in_seconds(self) -> int:
return max(0, int((self.reset_at - datetime.utcnow()).total_seconds()))
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, requests_per_minute: int = 60):
self.requests_per_minute = requests_per_minute
self.buckets: dict[str, dict] = {}
def _get_bucket(self, key: str) -> dict:
"""Get or create a bucket for the key."""
now = datetime.utcnow()
if key not in self.buckets:
self.buckets[key] = {
"tokens": self.requests_per_minute,
"last_update": now,
"reset_at": now + timedelta(minutes=1)
}
bucket = self.buckets[key]
# Refill tokens based on time passed
elapsed = (now - bucket["last_update"]).total_seconds()
refill = int(elapsed * self.requests_per_minute / 60)
if refill > 0:
bucket["tokens"] = min(self.requests_per_minute, bucket["tokens"] + refill)
bucket["last_update"] = now
# Reset if window passed
if now >= bucket["reset_at"]:
bucket["tokens"] = self.requests_per_minute
bucket["reset_at"] = now + timedelta(minutes=1)
return bucket
def is_allowed(self, key: str) -> tuple[bool, RateLimitInfo]:
"""Check if request is allowed and consume a token."""
bucket = self._get_bucket(key)
info = RateLimitInfo(
limit=self.requests_per_minute,
remaining=max(0, bucket["tokens"] - 1),
reset_at=bucket["reset_at"]
)
if bucket["tokens"] > 0:
bucket["tokens"] -= 1
return True, info
info.remaining = 0
return False, info
# =============================================================================
# SIMPLE FLASK-LIKE API FRAMEWORK
# =============================================================================
class Route:
"""A route definition."""
def __init__(self, path: str, method: str, handler: Callable, **options):
self.path = path
self.method = method.upper()
self.handler = handler
self.options = options
class Request:
"""Simplified request object."""
def __init__(self, method: str, path: str, headers: dict = None,
body: dict = None, query: dict = None):
self.method = method.upper()
self.path = path
self.headers = headers or {}
self.body = body or {}
self.query = query or {}
self.user = None # Set by auth middleware
class Response:
"""Simplified response object."""
def __init__(self, data: Any = None, status: int = 200, headers: dict = None):
self.data = data
self.status = status
self.headers = headers or {"Content-Type": "application/json"}
def to_json(self) -> str:
return json.dumps(self.data, default=str)
@classmethod
def ok(cls, data: Any = None, message: str = None) -> 'Response':
return cls({"success": True, "data": data, "message": message}, 200)
@classmethod
def created(cls, data: Any = None) -> 'Response':
return cls({"success": True, "data": data}, 201)
@classmethod
def no_content(cls) -> 'Response':
return cls(None, 204)
@classmethod
def bad_request(cls, error: str, details: Any = None) -> 'Response':
return cls({"success": False, "error": error, "details": details}, 400)
@classmethod
def unauthorized(cls, error: str = "Unauthorized") -> 'Response':
return cls({"success": False, "error": error}, 401)
@classmethod
def forbidden(cls, error: str = "Forbidden") -> 'Response':
return cls({"success": False, "error": error}, 403)
@classmethod
def not_found(cls, error: str = "Not found") -> 'Response':
return cls({"success": False, "error": error}, 404)
@classmethod
def internal_error(cls, error: str = "Internal server error") -> 'Response':
return cls({"success": False, "error": error}, 500)
class MiniAPI:
"""Minimal API framework for demonstration."""
def __init__(self, name: str = "api"):
self.name = name
self.routes: list[Route] = []
self.middleware: list[Callable] = []
def route(self, path: str, methods: list[str] = None, **options):
"""Decorator to register a route."""
methods = methods or ["GET"]
def decorator(func: Callable):
for method in methods:
self.routes.append(Route(path, method, func, **options))
return func
return decorator
def get(self, path: str, **options):
"""Shortcut for GET route."""
return self.route(path, ["GET"], **options)
def post(self, path: str, **options):
"""Shortcut for POST route."""
return self.route(path, ["POST"], **options)
def put(self, path: str, **options):
"""Shortcut for PUT route."""
return self.route(path, ["PUT"], **options)
def delete(self, path: str, **options):
"""Shortcut for DELETE route."""
return self.route(path, ["DELETE"], **options)
def use(self, middleware: Callable):
"""Add middleware."""
self.middleware.append(middleware)
def _match_route(self, method: str, path: str) -> tuple[Route | None, dict]:
"""Match a route and extract path parameters."""
for route in self.routes:
if route.method != method:
continue
# Simple path matching with parameters
route_parts = route.path.split('/')
path_parts = path.split('/')
if len(route_parts) != len(path_parts):
continue
params = {}
match = True
for rp, pp in zip(route_parts, path_parts):
if rp.startswith('{') and rp.endswith('}'):
param_name = rp[1:-1]
params[param_name] = pp
elif rp != pp:
match = False
break
if match:
return route, params
return None, {}
def handle(self, request: Request) -> Response:
"""Handle a request."""
# Apply middleware
for mw in self.middleware:
result = mw(request)
if isinstance(result, Response):
return result
# Find matching route
route, params = self._match_route(request.method, request.path)
if not route:
return Response.not_found(f"No route for {request.method} {request.path}")
try:
# Call handler
result = route.handler(request, **params)
if isinstance(result, Response):
return result
return Response.ok(result)
except Exception as e:
return Response.internal_error(str(e))
# =============================================================================
# EXAMPLE API IMPLEMENTATION
# =============================================================================
# Create API instance
api = MiniAPI("users-api")
# In-memory database
users_db: dict[int, dict] = {
1: {"id": 1, "name": "Alice", "email": "alice@example.com"},
2: {"id": 2, "name": "Bob", "email": "bob@example.com"},
}
next_id = 3
# JWT configuration
jwt_config = JWTConfig(secret_key="super-secret-key-change-in-production")
jwt_auth = JWTAuth(jwt_config)
# Rate limiter
rate_limiter = RateLimiter(requests_per_minute=100)
# Middleware
def auth_middleware(request: Request) -> Response | None:
"""Authentication middleware."""
# Skip auth for login endpoint
if request.path == "/auth/login":
return None
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return Response.unauthorized("Missing or invalid Authorization header")
token = auth_header[7:]
payload = jwt_auth.verify_token(token)
if not payload:
return Response.unauthorized("Invalid or expired token")
request.user = payload
return None
def rate_limit_middleware(request: Request) -> Response | None:
"""Rate limiting middleware."""
# Use IP or user ID as key
key = request.user.get("sub") if request.user else request.headers.get("X-Forwarded-For", "anonymous")
allowed, info = rate_limiter.is_allowed(key)
if not allowed:
response = Response(
{"error": "Rate limit exceeded", "retry_after": info.reset_in_seconds},
429
)
response.headers["X-RateLimit-Limit"] = str(info.limit)
response.headers["X-RateLimit-Remaining"] = str(info.remaining)
response.headers["X-RateLimit-Reset"] = str(int(info.reset_at.timestamp()))
return response
return None
# Routes
@api.post("/auth/login")
def login(request: Request) -> Response:
"""Login endpoint."""
email = request.body.get("email")
password = request.body.get("password")
# Simple validation
if not email or not password:
return Response.bad_request("Email and password required")
# Find user (in real app, check password hash)
user = next((u for u in users_db.values() if u["email"] == email), None)
if not user:
return Response.unauthorized("Invalid credentials")
# Generate tokens
access_token = jwt_auth.create_access_token(str(user["id"]), name=user["name"])
refresh_token = jwt_auth.create_refresh_token(str(user["id"]))
return Response.ok({
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
})
@api.get("/users")
def list_users(request: Request) -> Response:
"""List all users with pagination."""
page = int(request.query.get("page", 1))
per_page = int(request.query.get("per_page", 10))
users = list(users_db.values())
total = len(users)
# Paginate
start = (page - 1) * per_page
end = start + per_page
paginated_users = users[start:end]
return Response.ok(PaginatedResponse(
items=paginated_users,
total=total,
page=page,
per_page=per_page
).to_dict())
@api.get("/users/{user_id}")
def get_user(request: Request, user_id: str) -> Response:
"""Get a specific user."""
user = users_db.get(int(user_id))
if not user:
return Response.not_found(f"User {user_id} not found")
return Response.ok(user)
@api.post("/users")
def create_user(request: Request) -> Response:
"""Create a new user."""
global next_id
# Validate
validator = Validator()
name = validator.required(request.body, "name")
email = validator.email(request.body, "email")
validator.required(request.body, "email")
if not validator.is_valid():
return Response.bad_request("Validation failed", validator.get_errors())
# Check email uniqueness
if any(u["email"] == email for u in users_db.values()):
return Response.bad_request("Email already exists")
# Create user
user = {
"id": next_id,
"name": name,
"email": email
}
users_db[next_id] = user
next_id += 1
return Response.created(user)
@api.put("/users/{user_id}")
def update_user(request: Request, user_id: str) -> Response:
"""Update a user."""
user = users_db.get(int(user_id))
if not user:
return Response.not_found(f"User {user_id} not found")
# Update fields
if "name" in request.body:
user["name"] = request.body["name"]
if "email" in request.body:
user["email"] = request.body["email"]
return Response.ok(user)
@api.delete("/users/{user_id}")
def delete_user(request: Request, user_id: str) -> Response:
"""Delete a user."""
if int(user_id) not in users_db:
return Response.not_found(f"User {user_id} not found")
del users_db[int(user_id)]
return Response.no_content()
# =============================================================================
# DEMONSTRATIONS
# =============================================================================
if __name__ == "__main__":
print("=" * 60)
print("API VALIDATION")
print("=" * 60)
validator = Validator()
data = {"name": "Jo", "email": "invalid", "age": "abc"}
validator.required(data, "name")
validator.string(data, "name", min_len=3)
validator.email(data, "email")
validator.integer(data, "age")
print(f"Valid: {validator.is_valid()}")
print(f"Errors: {json.dumps(validator.get_errors(), indent=2)}")
print("\n" + "=" * 60)
print("JWT AUTHENTICATION")
print("=" * 60)
token = jwt_auth.create_access_token("user123", role="admin")
print(f"Token: {token[:50]}...")
payload = jwt_auth.verify_token(token)
print(f"Payload: {payload}")
print("\n" + "=" * 60)
print("RATE LIMITING")
print("=" * 60)
for i in range(5):
allowed, info = rate_limiter.is_allowed("test-user")
print(f"Request {i+1}: allowed={allowed}, remaining={info.remaining}")
print("\n" + "=" * 60)
print("API REQUESTS")
print("=" * 60)
# Login
request = Request("POST", "/auth/login", body={"email": "alice@example.com", "password": "test"})
response = api.handle(request)
print(f"Login: {response.status} - {response.to_json()[:100]}...")
# Get users (would need auth in real scenario)
request = Request("GET", "/users", query={"page": "1", "per_page": "10"})
request.user = {"sub": "1"} # Mock auth
response = api.handle(request)
print(f"List users: {response.status}")
# Get single user
request = Request("GET", "/users/1")
request.user = {"sub": "1"}
response = api.handle(request)
print(f"Get user 1: {response.status} - {response.to_json()}")