Docs
README
Python Automation and Scripting
This module covers automating tasks, scripting, scheduling, and web scraping with Python.
Table of Contents
- •Introduction to Automation
- •File System Automation
- •System and Process Automation
- •Task Scheduling
- •Web Scraping
- •Email Automation
- •GUI Automation
- •Best Practices
Introduction to Automation
Python excels at automating repetitive tasks, saving time and reducing errors.
Common Automation Use Cases
- •File Management: Organize, rename, backup files
- •Data Processing: Parse, transform, validate data
- •Web Scraping: Extract data from websites
- •Report Generation: Create automated reports
- •Email Automation: Send notifications, reminders
- •System Administration: Monitor, maintain systems
- •Testing: Automate software testing
File System Automation
Working with Paths
from pathlib import Path
import os
import shutil
# Creating paths
home = Path.home()
current = Path.cwd()
file_path = Path('/home/user/documents/file.txt')
# Path properties
print(file_path.name) # file.txt
print(file_path.stem) # file
print(file_path.suffix) # .txt
print(file_path.parent) # /home/user/documents
print(file_path.exists()) # True/False
# Building paths
config = home / '.config' / 'myapp' / 'settings.ini'
# Globbing (pattern matching)
py_files = Path('.').glob('**/*.py') # Recursive
txt_files = Path('.').glob('*.txt') # Current dir only
File Operations
from pathlib import Path
import shutil
# Create directory
Path('output').mkdir(exist_ok=True)
Path('deep/nested/dir').mkdir(parents=True, exist_ok=True)
# Copy files
shutil.copy('source.txt', 'dest.txt') # Copy file
shutil.copy2('source.txt', 'dest.txt') # Preserve metadata
shutil.copytree('source_dir', 'dest_dir') # Copy directory
# Move/Rename
shutil.move('old_name.txt', 'new_name.txt')
Path('file.txt').rename('renamed.txt')
# Delete
Path('file.txt').unlink() # Delete file
Path('empty_dir').rmdir() # Delete empty dir
shutil.rmtree('directory') # Delete dir with contents
# Read/Write
content = Path('file.txt').read_text()
Path('file.txt').write_text('Hello World')
bytes_content = Path('image.png').read_bytes()
Path('copy.png').write_bytes(bytes_content)
Batch File Operations
from pathlib import Path
import shutil
from datetime import datetime
def organize_files_by_extension(source_dir: str):
"""Organize files into folders by extension."""
source = Path(source_dir)
for file in source.iterdir():
if file.is_file():
ext = file.suffix.lower() or 'no_extension'
dest_dir = source / ext[1:] # Remove the dot
dest_dir.mkdir(exist_ok=True)
shutil.move(str(file), str(dest_dir / file.name))
def batch_rename(directory: str, pattern: str, replacement: str):
"""Rename files matching a pattern."""
for file in Path(directory).iterdir():
if pattern in file.name:
new_name = file.name.replace(pattern, replacement)
file.rename(file.parent / new_name)
def backup_directory(source: str, backup_base: str):
"""Create timestamped backup of directory."""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
source_path = Path(source)
backup_path = Path(backup_base) / f"{source_path.name}_{timestamp}"
shutil.copytree(source, backup_path)
return backup_path
def find_duplicate_files(directory: str):
"""Find files with same name in subdirectories."""
from collections import defaultdict
files_by_name = defaultdict(list)
for file in Path(directory).rglob('*'):
if file.is_file():
files_by_name[file.name].append(file)
duplicates = {
name: paths for name, paths in files_by_name.items()
if len(paths) > 1
}
return duplicates
Watching for File Changes
import time
from pathlib import Path
from typing import Callable
def watch_directory(
path: str,
callback: Callable[[Path, str], None],
interval: float = 1.0
):
"""
Watch directory for changes.
callback receives (file_path, event_type) where event_type is:
'created', 'modified', 'deleted'
"""
watched = Path(path)
known_files = {}
# Initial scan
for file in watched.rglob('*'):
if file.is_file():
known_files[file] = file.stat().st_mtime
while True:
current_files = {}
for file in watched.rglob('*'):
if file.is_file():
mtime = file.stat().st_mtime
current_files[file] = mtime
if file not in known_files:
callback(file, 'created')
elif known_files[file] != mtime:
callback(file, 'modified')
# Check for deleted files
for file in known_files:
if file not in current_files:
callback(file, 'deleted')
known_files = current_files
time.sleep(interval)
# Usage:
# def on_change(path, event):
# print(f"{event}: {path}")
# watch_directory('./watched_folder', on_change)
System and Process Automation
Running System Commands
import subprocess
import os
# Simple command
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(result.stdout)
print(result.returncode)
# With shell=True (be careful with user input!)
result = subprocess.run('echo "Hello World"', shell=True, capture_output=True, text=True)
# Check for errors
result = subprocess.run(['python', 'script.py'], check=True) # Raises on error
# Get live output
process = subprocess.Popen(
['ping', '-c', '4', 'google.com'],
stdout=subprocess.PIPE,
text=True
)
for line in process.stdout:
print(line, end='')
process.wait()
# Environment variables
env = os.environ.copy()
env['MY_VAR'] = 'value'
subprocess.run(['script.sh'], env=env)
System Information
import os
import platform
import psutil # pip install psutil
# Platform info
print(f"System: {platform.system()}")
print(f"Release: {platform.release()}")
print(f"Machine: {platform.machine()}")
print(f"Python: {platform.python_version()}")
# Environment
print(f"User: {os.getenv('USER')}")
print(f"Home: {os.getenv('HOME')}")
print(f"PATH: {os.getenv('PATH')}")
# System resources (requires psutil)
print(f"CPU cores: {psutil.cpu_count()}")
print(f"CPU usage: {psutil.cpu_percent()}%")
print(f"Memory: {psutil.virtual_memory().percent}% used")
print(f"Disk: {psutil.disk_usage('/').percent}% used")
# Running processes
for proc in psutil.process_iter(['pid', 'name', 'status']):
print(proc.info)
Process Management
import subprocess
import signal
import os
class ProcessManager:
"""Manage background processes."""
def __init__(self):
self.processes = {}
def start(self, name: str, command: list):
"""Start a background process."""
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.processes[name] = process
return process.pid
def stop(self, name: str, timeout: float = 5.0):
"""Stop a process gracefully."""
if name in self.processes:
proc = self.processes[name]
proc.terminate()
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
proc.kill()
del self.processes[name]
def is_running(self, name: str) -> bool:
"""Check if process is running."""
if name in self.processes:
return self.processes[name].poll() is None
return False
def stop_all(self):
"""Stop all managed processes."""
for name in list(self.processes.keys()):
self.stop(name)
Task Scheduling
Using schedule Library
import schedule
import time
from datetime import datetime
# pip install schedule
def job():
print(f"Running job at {datetime.now()}")
def morning_report():
print("Generating morning report...")
def hourly_check():
print("Performing hourly health check...")
# Schedule jobs
schedule.every(10).seconds.do(job)
schedule.every().minute.do(job)
schedule.every().hour.do(hourly_check)
schedule.every().day.at("09:00").do(morning_report)
schedule.every().monday.do(lambda: print("Start of week!"))
schedule.every().wednesday.at("13:15").do(job)
# Run scheduler
while True:
schedule.run_pending()
time.sleep(1)
System Cron Jobs
from crontab import CronTab # pip install python-crontab
# Access user's crontab
cron = CronTab(user=True)
# Create a new job
job = cron.new(command='python /path/to/script.py')
# Schedule: minute, hour, day of month, month, day of week
job.setall('0 9 * * *') # Every day at 9:00 AM
# Or use convenience methods
job.minute.every(30) # Every 30 minutes
job.hour.on(9, 17) # At 9 AM and 5 PM
job.dow.on('MON', 'FRI') # Monday and Friday
# Enable/disable
job.enable()
job.enable(False)
# Write to crontab
cron.write()
# List all jobs
for job in cron:
print(job)
# Remove job
cron.remove(job)
cron.write()
Background Task Runner
import threading
import queue
import time
from typing import Callable, Any
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Task:
func: Callable
args: tuple = ()
kwargs: dict = None
scheduled_time: datetime = None
class TaskRunner:
"""Simple background task runner."""
def __init__(self, num_workers: int = 2):
self.task_queue = queue.Queue()
self.workers = []
self.running = True
for _ in range(num_workers):
worker = threading.Thread(target=self._worker)
worker.daemon = True
worker.start()
self.workers.append(worker)
def _worker(self):
"""Worker thread that processes tasks."""
while self.running:
try:
task = self.task_queue.get(timeout=1)
# Check if scheduled for later
if task.scheduled_time and datetime.now() < task.scheduled_time:
self.task_queue.put(task)
time.sleep(0.1)
continue
# Execute task
try:
kwargs = task.kwargs or {}
task.func(*task.args, **kwargs)
except Exception as e:
print(f"Task error: {e}")
finally:
self.task_queue.task_done()
except queue.Empty:
continue
def submit(self, func: Callable, *args, **kwargs):
"""Submit a task for immediate execution."""
self.task_queue.put(Task(func, args, kwargs))
def schedule(self, func: Callable, delay_seconds: float, *args, **kwargs):
"""Schedule a task for future execution."""
scheduled_time = datetime.now() + timedelta(seconds=delay_seconds)
self.task_queue.put(Task(func, args, kwargs, scheduled_time))
def shutdown(self, wait: bool = True):
"""Shutdown the task runner."""
self.running = False
if wait:
self.task_queue.join()
Web Scraping
Using Requests and BeautifulSoup
import requests
from bs4 import BeautifulSoup # pip install beautifulsoup4
# Fetch page
response = requests.get('https://example.com')
soup = BeautifulSoup(response.text, 'html.parser')
# Find elements
title = soup.find('title').text
all_links = soup.find_all('a')
divs_with_class = soup.find_all('div', class_='content')
# CSS selectors
items = soup.select('div.item > p')
nav_links = soup.select('nav a[href]')
# Extract data
for link in all_links:
href = link.get('href')
text = link.text.strip()
print(f"{text}: {href}")
# Extract specific attributes
images = soup.find_all('img')
for img in images:
src = img.get('src')
alt = img.get('alt', 'No alt text')
print(f"{alt}: {src}")
Complete Web Scraper
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time
from typing import List, Dict
import json
class WebScraper:
"""Configurable web scraper with politeness features."""
def __init__(
self,
delay: float = 1.0,
headers: dict = None,
timeout: float = 10.0
):
self.delay = delay
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update(headers or {
'User-Agent': 'Mozilla/5.0 (compatible; PythonBot/1.0)'
})
def fetch(self, url: str) -> BeautifulSoup:
"""Fetch and parse a page."""
time.sleep(self.delay) # Be polite
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
return BeautifulSoup(response.text, 'html.parser')
def extract_links(self, soup: BeautifulSoup, base_url: str) -> List[str]:
"""Extract all links from page."""
links = []
for a in soup.find_all('a', href=True):
href = a['href']
absolute_url = urljoin(base_url, href)
links.append(absolute_url)
return links
def extract_data(
self,
soup: BeautifulSoup,
selectors: Dict[str, str]
) -> Dict[str, str]:
"""Extract data using CSS selectors."""
data = {}
for key, selector in selectors.items():
element = soup.select_one(selector)
data[key] = element.text.strip() if element else None
return data
def crawl(
self,
start_url: str,
max_pages: int = 10,
same_domain: bool = True
) -> List[Dict]:
"""Crawl website and collect data."""
visited = set()
to_visit = [start_url]
results = []
start_domain = urlparse(start_url).netloc
while to_visit and len(visited) < max_pages:
url = to_visit.pop(0)
if url in visited:
continue
try:
soup = self.fetch(url)
visited.add(url)
# Store page info
results.append({
'url': url,
'title': soup.title.text if soup.title else '',
'links': len(soup.find_all('a'))
})
# Get new links
for link in self.extract_links(soup, url):
link_domain = urlparse(link).netloc
if same_domain and link_domain != start_domain:
continue
if link not in visited:
to_visit.append(link)
except Exception as e:
print(f"Error fetching {url}: {e}")
return results
# Usage
# scraper = WebScraper(delay=2.0)
# data = scraper.crawl('https://example.com', max_pages=5)
Handling JavaScript-Rendered Pages
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
# pip install selenium
# Download ChromeDriver or use webdriver-manager
def scrape_dynamic_page(url: str):
"""Scrape JavaScript-rendered content."""
# Setup headless browser
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
# Wait for dynamic content
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "dynamic-content"))
)
# Get rendered HTML
html = driver.page_source
soup = BeautifulSoup(html, 'html.parser')
# Extract data
items = soup.select('.dynamic-content .item')
return [item.text for item in items]
finally:
driver.quit()
Email Automation
Sending Emails
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from pathlib import Path
def send_email(
to: str,
subject: str,
body: str,
attachments: list = None,
html: bool = False
):
"""Send email with optional attachments."""
# Configuration (use environment variables in production!)
smtp_server = 'smtp.gmail.com'
smtp_port = 587
username = 'your_email@gmail.com'
password = 'your_app_password' # Use app-specific password
# Create message
msg = MIMEMultipart()
msg['From'] = username
msg['To'] = to
msg['Subject'] = subject
# Body
if html:
msg.attach(MIMEText(body, 'html'))
else:
msg.attach(MIMEText(body, 'plain'))
# Attachments
if attachments:
for filepath in attachments:
path = Path(filepath)
with open(path, 'rb') as f:
part = MIMEBase('application', 'octet-stream')
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename={path.name}'
)
msg.attach(part)
# Send
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(username, password)
server.send_message(msg)
# HTML email template
def send_report_email(to: str, data: dict):
"""Send formatted HTML report."""
html_body = f"""
<html>
<body>
<h1>Daily Report</h1>
<table border="1">
<tr>
<th>Metric</th>
<th>Value</th>
</tr>
{''.join(f'<tr><td>{k}</td><td>{v}</td></tr>' for k, v in data.items())}
</table>
</body>
</html>
"""
send_email(to, "Daily Report", html_body, html=True)
GUI Automation
Using PyAutoGUI
import pyautogui # pip install pyautogui
import time
# Safety: move mouse to corner to abort
pyautogui.FAILSAFE = True
# Screen info
width, height = pyautogui.size()
x, y = pyautogui.position()
# Mouse actions
pyautogui.moveTo(100, 100, duration=0.5)
pyautogui.click()
pyautogui.doubleClick()
pyautogui.rightClick()
pyautogui.scroll(3) # Scroll up 3 clicks
# Keyboard actions
pyautogui.write('Hello World', interval=0.1)
pyautogui.press('enter')
pyautogui.hotkey('ctrl', 'c')
pyautogui.hotkey('alt', 'tab')
# Screenshot
screenshot = pyautogui.screenshot()
screenshot.save('screen.png')
# Region screenshot
region = pyautogui.screenshot(region=(0, 0, 300, 400))
# Locate image on screen
location = pyautogui.locateOnScreen('button.png')
if location:
pyautogui.click(location)
Automating Applications
import pyautogui
import time
import subprocess
class AppAutomator:
"""Automate desktop applications."""
def __init__(self, app_path: str):
self.app_path = app_path
self.delay = 0.5
def start_app(self):
"""Launch the application."""
subprocess.Popen([self.app_path])
time.sleep(2) # Wait for app to load
def wait_for_image(self, image: str, timeout: float = 10) -> tuple:
"""Wait for image to appear on screen."""
start = time.time()
while time.time() - start < timeout:
location = pyautogui.locateOnScreen(image, confidence=0.9)
if location:
return pyautogui.center(location)
time.sleep(0.5)
raise TimeoutError(f"Image {image} not found")
def click_image(self, image: str):
"""Click on image when found."""
center = self.wait_for_image(image)
pyautogui.click(center)
time.sleep(self.delay)
def type_text(self, text: str):
"""Type text with natural delay."""
pyautogui.write(text, interval=0.05)
def press_key(self, key: str):
"""Press a key."""
pyautogui.press(key)
time.sleep(self.delay)
def hotkey(self, *keys):
"""Press key combination."""
pyautogui.hotkey(*keys)
time.sleep(self.delay)
Best Practices
Error Handling and Logging
import logging
from functools import wraps
import traceback
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('automation.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def retry(max_attempts: int = 3, delay: float = 1.0):
"""Decorator to retry failed operations."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
logger.warning(
f"Attempt {attempt + 1} failed: {e}"
)
time.sleep(delay)
raise last_exception
return wrapper
return decorator
@retry(max_attempts=3)
def unstable_operation():
"""Operation that might fail."""
pass
Configuration Management
import os
from pathlib import Path
import json
import yaml # pip install pyyaml
class Config:
"""Configuration manager for automation scripts."""
def __init__(self, config_path: str = None):
self.config = {}
if config_path:
self.load(config_path)
else:
self.load_from_env()
def load(self, path: str):
"""Load config from file."""
path = Path(path)
if path.suffix == '.json':
with open(path) as f:
self.config = json.load(f)
elif path.suffix in ['.yml', '.yaml']:
with open(path) as f:
self.config = yaml.safe_load(f)
def load_from_env(self):
"""Load config from environment variables."""
self.config = {
'smtp_server': os.getenv('SMTP_SERVER'),
'smtp_user': os.getenv('SMTP_USER'),
'api_key': os.getenv('API_KEY'),
}
def get(self, key: str, default=None):
"""Get config value."""
return self.config.get(key, default)
# Example config.yaml:
"""
smtp:
server: smtp.gmail.com
port: 587
user: ${SMTP_USER}
scraper:
delay: 2.0
max_pages: 100
"""
Safe Shutdown
import signal
import sys
import atexit
class GracefulShutdown:
"""Handle graceful shutdown of automation scripts."""
def __init__(self):
self.shutdown_requested = False
self.cleanup_handlers = []
# Register signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
# Register cleanup at exit
atexit.register(self._cleanup)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals."""
logger.info(f"Received signal {signum}, shutting down...")
self.shutdown_requested = True
def _cleanup(self):
"""Run cleanup handlers."""
for handler in self.cleanup_handlers:
try:
handler()
except Exception as e:
logger.error(f"Cleanup error: {e}")
def register_cleanup(self, handler):
"""Register a cleanup handler."""
self.cleanup_handlers.append(handler)
def should_exit(self) -> bool:
"""Check if shutdown was requested."""
return self.shutdown_requested
# Usage
shutdown = GracefulShutdown()
def cleanup_resources():
print("Cleaning up...")
shutdown.register_cleanup(cleanup_resources)
while not shutdown.should_exit():
# Do work
pass
Common Automation Patterns
| Pattern | Use Case | Library |
|---|---|---|
| File Watch | Monitor for changes | watchdog |
| Scheduling | Run tasks on schedule | schedule, APScheduler |
| Web Scraping | Extract web data | requests, BeautifulSoup |
| Browser Automation | JavaScript sites | Selenium, Playwright |
| GUI Automation | Desktop apps | PyAutoGUI |
| Notifications | smtplib, email | |
| CLI | User interaction | click, argparse |
Next Steps
- •Web APIs: Learn to work with REST APIs
- •Async: Use asyncio for concurrent automation
- •Docker: Containerize automation scripts
- •CI/CD: Integrate with Jenkins, GitHub Actions
- •Cloud: Deploy on AWS Lambda, Google Cloud Functions