Docs
README
Python Security
This module covers security best practices, hashing, encryption, and secure coding in Python.
Table of Contents
- •Security Fundamentals
- •Password Hashing
- •Encryption
- •Secure Random Generation
- •Input Validation and Sanitization
- •SQL Injection Prevention
- •XSS Prevention
- •Secure File Handling
- •Secrets Management
- •Common Vulnerabilities
Security Fundamentals
Core Security Principles
| Principle | Description |
|---|---|
| Confidentiality | Data accessible only to authorized users |
| Integrity | Data cannot be tampered with |
| Availability | Systems are accessible when needed |
| Defense in Depth | Multiple layers of security |
| Least Privilege | Minimal access rights needed |
| Fail Secure | Default to secure state on failure |
Threat Model
Always consider:
- •Who might attack (threat actors)
- •What they want (assets at risk)
- •How they might attack (attack vectors)
- •What controls can prevent it
Password Hashing
Never store passwords in plain text!
Using bcrypt (Recommended)
import bcrypt
# Hash a password
def hash_password(password: str) -> bytes:
"""Hash password using bcrypt."""
salt = bcrypt.gensalt(rounds=12) # Higher rounds = more secure
return bcrypt.hashpw(password.encode(), salt)
# Verify a password
def verify_password(password: str, hashed: bytes) -> bool:
"""Verify password against hash."""
return bcrypt.checkpw(password.encode(), hashed)
# Usage
password = "mysecretpassword"
hashed = hash_password(password)
print(f"Hash: {hashed}")
print(f"Valid: {verify_password(password, hashed)}")
print(f"Invalid: {verify_password('wrongpassword', hashed)}")
Using Argon2 (Modern Alternative)
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
ph = PasswordHasher(
time_cost=3, # Number of iterations
memory_cost=65536, # Memory usage in KB
parallelism=4 # Number of threads
)
# Hash
hashed = ph.hash("mypassword")
# Verify
try:
ph.verify(hashed, "mypassword")
print("Password valid!")
except VerifyMismatchError:
print("Invalid password")
# Check if rehash needed (e.g., parameters changed)
if ph.check_needs_rehash(hashed):
new_hash = ph.hash("mypassword")
Using hashlib (Built-in, for non-passwords)
import hashlib
# SHA-256 hash (NOT for passwords, good for data integrity)
def sha256_hash(data: str) -> str:
return hashlib.sha256(data.encode()).hexdigest()
# PBKDF2 (acceptable for passwords if bcrypt unavailable)
def pbkdf2_hash(password: str, salt: bytes, iterations: int = 100000) -> bytes:
return hashlib.pbkdf2_hmac('sha256', password.encode(), salt, iterations)
Encryption
Symmetric Encryption (AES)
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import os
# Simple encryption with Fernet
def fernet_encrypt_decrypt():
# Generate key (store securely!)
key = Fernet.generate_key()
f = Fernet(key)
# Encrypt
message = b"Secret message"
encrypted = f.encrypt(message)
# Decrypt
decrypted = f.decrypt(encrypted)
print(f"Original: {message}")
print(f"Encrypted: {encrypted}")
print(f"Decrypted: {decrypted}")
# AES encryption (more control)
def aes_encrypt(plaintext: bytes, key: bytes) -> tuple[bytes, bytes, bytes]:
"""
Encrypt using AES-GCM.
Returns (ciphertext, tag, iv).
"""
iv = os.urandom(12) # 96-bit IV for GCM
cipher = Cipher(
algorithms.AES(key),
modes.GCM(iv),
backend=default_backend()
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext) + encryptor.finalize()
return ciphertext, encryptor.tag, iv
def aes_decrypt(ciphertext: bytes, tag: bytes, iv: bytes, key: bytes) -> bytes:
"""Decrypt AES-GCM encrypted data."""
cipher = Cipher(
algorithms.AES(key),
modes.GCM(iv, tag),
backend=default_backend()
)
decryptor = cipher.decryptor()
return decryptor.update(ciphertext) + decryptor.finalize()
# Usage
key = os.urandom(32) # 256-bit key
plaintext = b"Secret data"
ciphertext, tag, iv = aes_encrypt(plaintext, key)
decrypted = aes_decrypt(ciphertext, tag, iv, key)
Asymmetric Encryption (RSA)
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
# Generate key pair
def generate_rsa_keypair():
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
public_key = private_key.public_key()
return private_key, public_key
# Encrypt with public key
def rsa_encrypt(public_key, plaintext: bytes) -> bytes:
return public_key.encrypt(
plaintext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
# Decrypt with private key
def rsa_decrypt(private_key, ciphertext: bytes) -> bytes:
return private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
# Digital signatures
def sign_message(private_key, message: bytes) -> bytes:
return private_key.sign(
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
def verify_signature(public_key, message: bytes, signature: bytes) -> bool:
try:
public_key.verify(
signature,
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception:
return False
Secure Random Generation
import secrets
import os
# Secure random bytes
secure_bytes = secrets.token_bytes(32)
# Secure random hex string
secure_hex = secrets.token_hex(32) # 64 characters
# Secure URL-safe token
secure_token = secrets.token_urlsafe(32) # ~43 characters
# Secure random integer
secure_int = secrets.randbelow(1000) # 0 to 999
# Secure choice
secure_choice = secrets.choice(['a', 'b', 'c'])
# Secure comparison (constant-time)
secrets.compare_digest("string1", "string2")
# Generate secure password
import string
def generate_password(length: int = 16) -> str:
"""Generate a secure random password."""
alphabet = string.ascii_letters + string.digits + string.punctuation
# Ensure at least one of each type
password = [
secrets.choice(string.ascii_lowercase),
secrets.choice(string.ascii_uppercase),
secrets.choice(string.digits),
secrets.choice(string.punctuation),
]
password += [secrets.choice(alphabet) for _ in range(length - 4)]
# Shuffle
shuffled = list(password)
secrets.SystemRandom().shuffle(shuffled)
return ''.join(shuffled)
# API key generation
def generate_api_key() -> str:
"""Generate a secure API key."""
return secrets.token_urlsafe(32)
# Session token
def generate_session_token() -> str:
"""Generate secure session token."""
return secrets.token_hex(32)
Input Validation and Sanitization
Validation Patterns
import re
from typing import Optional
# Email validation
def validate_email(email: str) -> bool:
"""Validate email format."""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
# Username validation
def validate_username(username: str) -> bool:
"""Validate username (alphanumeric, 3-20 chars)."""
pattern = r'^[a-zA-Z0-9_]{3,20}$'
return bool(re.match(pattern, username))
# Password strength validation
def validate_password_strength(password: str) -> tuple[bool, list[str]]:
"""
Check password strength.
Returns (is_valid, list of issues).
"""
issues = []
if len(password) < 8:
issues.append("Must be at least 8 characters")
if not re.search(r'[a-z]', password):
issues.append("Must contain lowercase letter")
if not re.search(r'[A-Z]', password):
issues.append("Must contain uppercase letter")
if not re.search(r'\d', password):
issues.append("Must contain digit")
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
issues.append("Must contain special character")
return len(issues) == 0, issues
# Sanitize string (remove dangerous characters)
def sanitize_string(s: str, allow_chars: str = "") -> str:
"""Remove potentially dangerous characters."""
allowed = set(string.ascii_letters + string.digits + " " + allow_chars)
return ''.join(c for c in s if c in allowed)
# Path traversal prevention
def safe_join_path(base: str, *paths) -> Optional[str]:
"""Safely join paths, preventing directory traversal."""
from pathlib import Path
base_path = Path(base).resolve()
target_path = base_path.joinpath(*paths).resolve()
# Check if result is still under base
if base_path in target_path.parents or target_path == base_path:
return str(target_path)
return None # Traversal attempt detected
Type Validation
from typing import Any, get_type_hints
from dataclasses import dataclass
def validate_type(value: Any, expected_type: type) -> bool:
"""Validate value matches expected type."""
if expected_type in (int, float, str, bool):
return isinstance(value, expected_type)
# Handle Optional
origin = getattr(expected_type, '__origin__', None)
if origin is type(None):
return value is None
return isinstance(value, expected_type)
@dataclass
class UserInput:
"""Validated user input."""
username: str
age: int
email: str
def __post_init__(self):
if not validate_username(self.username):
raise ValueError("Invalid username")
if not 0 <= self.age <= 150:
raise ValueError("Invalid age")
if not validate_email(self.email):
raise ValueError("Invalid email")
SQL Injection Prevention
Always Use Parameterized Queries
import sqlite3
# ❌ DANGEROUS - Never do this!
def unsafe_query(username):
query = f"SELECT * FROM users WHERE username = '{username}'"
# Attacker input: ' OR '1'='1
# Results in: SELECT * FROM users WHERE username = '' OR '1'='1'
# ✅ SAFE - Use parameterized queries
def safe_query_sqlite(username):
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
# Parameters are escaped automatically
cursor.execute(
"SELECT * FROM users WHERE username = ?",
(username,)
)
return cursor.fetchall()
# ✅ SAFE - With SQLAlchemy
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
def safe_query_sqlalchemy(username):
engine = create_engine('sqlite:///database.db')
with Session(engine) as session:
result = session.execute(
text("SELECT * FROM users WHERE username = :username"),
{"username": username}
)
return result.fetchall()
# ✅ SAFE - With ORM
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String)
def safe_query_orm(session, username):
# ORM queries are automatically parameterized
return session.query(User).filter(User.username == username).first()
XSS Prevention
HTML Escaping
import html
from markupsafe import escape, Markup
# Basic HTML escaping
def escape_html(text: str) -> str:
"""Escape HTML special characters."""
return html.escape(text)
# Example
user_input = '<script>alert("XSS")</script>'
safe_output = escape_html(user_input)
# Output: <script>alert("XSS")</script>
# Using MarkupSafe (Flask default)
user_input = '<script>alert("XSS")</script>'
safe = escape(user_input) # Escaped
# To mark as safe (only for trusted content):
trusted = Markup('<strong>Bold</strong>')
# Content Security Policy header
def get_csp_header() -> str:
"""Generate Content Security Policy header."""
policies = [
"default-src 'self'",
"script-src 'self' 'unsafe-inline'", # Avoid unsafe-inline if possible
"style-src 'self' 'unsafe-inline'",
"img-src 'self' data: https:",
"font-src 'self'",
"frame-ancestors 'none'",
"base-uri 'self'",
"form-action 'self'",
]
return "; ".join(policies)
Safe Template Rendering
# Flask/Jinja2 - Auto-escaping is ON by default
from flask import Flask, render_template
app = Flask(__name__)
# In template:
# {{ user_input }} <- Automatically escaped
# {{ user_input | safe }} <- NOT escaped (use carefully!)
# Safe way to include user content
@app.route('/profile')
def profile():
user_bio = get_user_bio() # Might contain HTML
return render_template('profile.html', bio=user_bio)
# bio will be automatically escaped in template
Secure File Handling
Safe File Uploads
import os
import uuid
from pathlib import Path
from werkzeug.utils import secure_filename
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'pdf'}
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5 MB
UPLOAD_FOLDER = '/var/uploads'
def allowed_file(filename: str) -> bool:
"""Check if file extension is allowed."""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def secure_save_file(file, upload_folder: str) -> str:
"""Securely save uploaded file."""
if not file or not file.filename:
raise ValueError("No file provided")
if not allowed_file(file.filename):
raise ValueError("File type not allowed")
# Check file size
file.seek(0, 2) # Seek to end
size = file.tell()
file.seek(0) # Reset
if size > MAX_FILE_SIZE:
raise ValueError("File too large")
# Secure the filename
filename = secure_filename(file.filename)
# Generate unique filename to prevent overwriting
unique_name = f"{uuid.uuid4().hex}_{filename}"
# Ensure upload folder exists and is within allowed path
folder = Path(upload_folder).resolve()
if not str(folder).startswith(UPLOAD_FOLDER):
raise ValueError("Invalid upload folder")
folder.mkdir(parents=True, exist_ok=True)
filepath = folder / unique_name
file.save(str(filepath))
return unique_name
def validate_file_content(filepath: str, expected_type: str) -> bool:
"""Validate file content matches expected type."""
import magic # pip install python-magic
mime = magic.Magic(mime=True)
detected = mime.from_file(filepath)
valid_mimes = {
'image': ['image/jpeg', 'image/png', 'image/gif'],
'pdf': ['application/pdf'],
}
return detected in valid_mimes.get(expected_type, [])
Secrets Management
Using Environment Variables
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class Config:
"""Application configuration from environment."""
database_url: str
secret_key: str
api_key: str
debug: bool = False
@classmethod
def from_env(cls) -> 'Config':
"""Load config from environment variables."""
def require_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise ValueError(f"Missing required environment variable: {name}")
return value
return cls(
database_url=require_env('DATABASE_URL'),
secret_key=require_env('SECRET_KEY'),
api_key=require_env('API_KEY'),
debug=os.getenv('DEBUG', 'false').lower() == 'true'
)
# Using python-dotenv for local development
from dotenv import load_dotenv
load_dotenv() # Load from .env file
config = Config.from_env()
# .env file (NEVER commit to version control!)
"""
DATABASE_URL=postgresql://user:pass@localhost/db
SECRET_KEY=your-secret-key-here
API_KEY=your-api-key-here
DEBUG=false
"""
Secret Rotation
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List
@dataclass
class SecretVersion:
"""Versioned secret."""
value: str
created_at: datetime
expires_at: datetime
version: int
class SecretManager:
"""Manage secrets with rotation support."""
def __init__(self, rotation_period_days: int = 90):
self.secrets: List[SecretVersion] = []
self.rotation_period = timedelta(days=rotation_period_days)
def add_secret(self, value: str):
"""Add a new secret version."""
now = datetime.now()
version = len(self.secrets) + 1
self.secrets.append(SecretVersion(
value=value,
created_at=now,
expires_at=now + self.rotation_period,
version=version
))
def get_current(self) -> str:
"""Get the current (latest) secret."""
if not self.secrets:
raise ValueError("No secrets configured")
return self.secrets[-1].value
def verify_any(self, value: str) -> bool:
"""Verify against any valid (non-expired) secret."""
now = datetime.now()
return any(
s.value == value and s.expires_at > now
for s in self.secrets
)
def needs_rotation(self) -> bool:
"""Check if secret needs rotation."""
if not self.secrets:
return True
latest = self.secrets[-1]
# Rotate when 75% of period has passed
threshold = latest.created_at + (self.rotation_period * 0.75)
return datetime.now() > threshold
Common Vulnerabilities
OWASP Top 10 Prevention
| Vulnerability | Prevention |
|---|---|
| Injection | Parameterized queries, input validation |
| Broken Auth | Strong passwords, MFA, secure sessions |
| Sensitive Data | Encryption, secure transmission |
| XXE | Disable external entities in XML parsers |
| Broken Access | Role-based access, verify ownership |
| Misconfig | Secure defaults, least privilege |
| XSS | Output encoding, CSP headers |
| Insecure Deserialize | Don't deserialize untrusted data |
| Vulnerable Components | Keep dependencies updated |
| Insufficient Logging | Log security events, monitor |
Secure Coding Checklist
# 1. Never trust user input
user_input = sanitize(request.data)
# 2. Use parameterized queries
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
# 3. Escape output
return f"Hello, {html.escape(username)}"
# 4. Use secure random
token = secrets.token_urlsafe(32)
# 5. Hash passwords properly
hashed = bcrypt.hashpw(password, bcrypt.gensalt())
# 6. Use HTTPS
# Configure in your server/load balancer
# 7. Set secure headers
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Content-Security-Policy'] = "default-src 'self'"
# 8. Handle errors securely
try:
risky_operation()
except Exception as e:
logging.error(f"Error: {e}")
return generic_error_response() # Don't expose details
# 9. Validate file uploads
if not allowed_extension(filename):
raise ValueError("Invalid file type")
# 10. Keep secrets out of code
api_key = os.getenv('API_KEY')
Security Testing
# Use bandit for static analysis
# pip install bandit
# bandit -r your_project/
# Use safety for dependency checking
# pip install safety
# safety check
# Common security tests
def test_sql_injection():
"""Test SQL injection prevention."""
malicious_input = "'; DROP TABLE users; --"
# Should not cause SQL error or data loss
result = search_users(malicious_input)
assert result == [] # No results, no crash
def test_xss_prevention():
"""Test XSS prevention."""
malicious_input = '<script>alert("XSS")</script>'
output = render_user_content(malicious_input)
assert '<script>' not in output
assert '<script>' in output # Escaped
def test_path_traversal():
"""Test path traversal prevention."""
malicious_path = '../../../etc/passwd'
with pytest.raises(ValueError):
get_file_content(malicious_path)
Next Steps
- •Security Audits: Regular code review for security
- •Penetration Testing: Test your applications
- •Security Training: Stay updated on threats
- •Incident Response: Plan for security breaches
- •Compliance: GDPR, SOC2, PCI-DSS as needed