Docs

automation

Python Automation and Scripting

This module covers automating tasks, scripting, scheduling, and web scraping with Python.

Table of Contents

  1. Introduction to Automation
  2. File System Automation
  3. System and Process Automation
  4. Task Scheduling
  5. Web Scraping
  6. Email Automation
  7. GUI Automation
  8. Best Practices

Introduction to Automation

Python excels at automating repetitive tasks, saving time and reducing errors.

Common Automation Use Cases

  • File Management: Organize, rename, backup files
  • Data Processing: Parse, transform, validate data
  • Web Scraping: Extract data from websites
  • Report Generation: Create automated reports
  • Email Automation: Send notifications, reminders
  • System Administration: Monitor, maintain systems
  • Testing: Automate software testing

File System Automation

Working with Paths

from pathlib import Path
import os
import shutil

# Creating paths
home = Path.home()
current = Path.cwd()
file_path = Path('/home/user/documents/file.txt')

# Path properties
print(file_path.name)       # file.txt
print(file_path.stem)       # file
print(file_path.suffix)     # .txt
print(file_path.parent)     # /home/user/documents
print(file_path.exists())   # True/False

# Building paths
config = home / '.config' / 'myapp' / 'settings.ini'

# Globbing (pattern matching)
py_files = Path('.').glob('**/*.py')  # Recursive
txt_files = Path('.').glob('*.txt')   # Current dir only

File Operations

from pathlib import Path
import shutil

# Create directory
Path('output').mkdir(exist_ok=True)
Path('deep/nested/dir').mkdir(parents=True, exist_ok=True)

# Copy files
shutil.copy('source.txt', 'dest.txt')           # Copy file
shutil.copy2('source.txt', 'dest.txt')          # Preserve metadata
shutil.copytree('source_dir', 'dest_dir')       # Copy directory

# Move/Rename
shutil.move('old_name.txt', 'new_name.txt')
Path('file.txt').rename('renamed.txt')

# Delete
Path('file.txt').unlink()                       # Delete file
Path('empty_dir').rmdir()                       # Delete empty dir
shutil.rmtree('directory')                      # Delete dir with contents

# Read/Write
content = Path('file.txt').read_text()
Path('file.txt').write_text('Hello World')

bytes_content = Path('image.png').read_bytes()
Path('copy.png').write_bytes(bytes_content)

Batch File Operations

from pathlib import Path
import shutil
from datetime import datetime

def organize_files_by_extension(source_dir: str):
    """Organize files into folders by extension."""
    source = Path(source_dir)

    for file in source.iterdir():
        if file.is_file():
            ext = file.suffix.lower() or 'no_extension'
            dest_dir = source / ext[1:]  # Remove the dot
            dest_dir.mkdir(exist_ok=True)
            shutil.move(str(file), str(dest_dir / file.name))


def batch_rename(directory: str, pattern: str, replacement: str):
    """Rename files matching a pattern."""
    for file in Path(directory).iterdir():
        if pattern in file.name:
            new_name = file.name.replace(pattern, replacement)
            file.rename(file.parent / new_name)


def backup_directory(source: str, backup_base: str):
    """Create timestamped backup of directory."""
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    source_path = Path(source)
    backup_path = Path(backup_base) / f"{source_path.name}_{timestamp}"

    shutil.copytree(source, backup_path)
    return backup_path


def find_duplicate_files(directory: str):
    """Find files with same name in subdirectories."""
    from collections import defaultdict

    files_by_name = defaultdict(list)

    for file in Path(directory).rglob('*'):
        if file.is_file():
            files_by_name[file.name].append(file)

    duplicates = {
        name: paths for name, paths in files_by_name.items()
        if len(paths) > 1
    }
    return duplicates

Watching for File Changes

import time
from pathlib import Path
from typing import Callable

def watch_directory(
    path: str,
    callback: Callable[[Path, str], None],
    interval: float = 1.0
):
    """
    Watch directory for changes.

    callback receives (file_path, event_type) where event_type is:
    'created', 'modified', 'deleted'
    """
    watched = Path(path)
    known_files = {}

    # Initial scan
    for file in watched.rglob('*'):
        if file.is_file():
            known_files[file] = file.stat().st_mtime

    while True:
        current_files = {}

        for file in watched.rglob('*'):
            if file.is_file():
                mtime = file.stat().st_mtime
                current_files[file] = mtime

                if file not in known_files:
                    callback(file, 'created')
                elif known_files[file] != mtime:
                    callback(file, 'modified')

        # Check for deleted files
        for file in known_files:
            if file not in current_files:
                callback(file, 'deleted')

        known_files = current_files
        time.sleep(interval)


# Usage:
# def on_change(path, event):
#     print(f"{event}: {path}")
# watch_directory('./watched_folder', on_change)

System and Process Automation

Running System Commands

import subprocess
import os

# Simple command
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(result.stdout)
print(result.returncode)

# With shell=True (be careful with user input!)
result = subprocess.run('echo "Hello World"', shell=True, capture_output=True, text=True)

# Check for errors
result = subprocess.run(['python', 'script.py'], check=True)  # Raises on error

# Get live output
process = subprocess.Popen(
    ['ping', '-c', '4', 'google.com'],
    stdout=subprocess.PIPE,
    text=True
)

for line in process.stdout:
    print(line, end='')

process.wait()


# Environment variables
env = os.environ.copy()
env['MY_VAR'] = 'value'
subprocess.run(['script.sh'], env=env)

System Information

import os
import platform
import psutil  # pip install psutil

# Platform info
print(f"System: {platform.system()}")
print(f"Release: {platform.release()}")
print(f"Machine: {platform.machine()}")
print(f"Python: {platform.python_version()}")

# Environment
print(f"User: {os.getenv('USER')}")
print(f"Home: {os.getenv('HOME')}")
print(f"PATH: {os.getenv('PATH')}")

# System resources (requires psutil)
print(f"CPU cores: {psutil.cpu_count()}")
print(f"CPU usage: {psutil.cpu_percent()}%")
print(f"Memory: {psutil.virtual_memory().percent}% used")
print(f"Disk: {psutil.disk_usage('/').percent}% used")

# Running processes
for proc in psutil.process_iter(['pid', 'name', 'status']):
    print(proc.info)

Process Management

import subprocess
import signal
import os

class ProcessManager:
    """Manage background processes."""

    def __init__(self):
        self.processes = {}

    def start(self, name: str, command: list):
        """Start a background process."""
        process = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        self.processes[name] = process
        return process.pid

    def stop(self, name: str, timeout: float = 5.0):
        """Stop a process gracefully."""
        if name in self.processes:
            proc = self.processes[name]
            proc.terminate()
            try:
                proc.wait(timeout=timeout)
            except subprocess.TimeoutExpired:
                proc.kill()
            del self.processes[name]

    def is_running(self, name: str) -> bool:
        """Check if process is running."""
        if name in self.processes:
            return self.processes[name].poll() is None
        return False

    def stop_all(self):
        """Stop all managed processes."""
        for name in list(self.processes.keys()):
            self.stop(name)

Task Scheduling

Using schedule Library

import schedule
import time
from datetime import datetime

# pip install schedule

def job():
    print(f"Running job at {datetime.now()}")

def morning_report():
    print("Generating morning report...")

def hourly_check():
    print("Performing hourly health check...")

# Schedule jobs
schedule.every(10).seconds.do(job)
schedule.every().minute.do(job)
schedule.every().hour.do(hourly_check)
schedule.every().day.at("09:00").do(morning_report)
schedule.every().monday.do(lambda: print("Start of week!"))
schedule.every().wednesday.at("13:15").do(job)

# Run scheduler
while True:
    schedule.run_pending()
    time.sleep(1)

System Cron Jobs

from crontab import CronTab  # pip install python-crontab

# Access user's crontab
cron = CronTab(user=True)

# Create a new job
job = cron.new(command='python /path/to/script.py')

# Schedule: minute, hour, day of month, month, day of week
job.setall('0 9 * * *')  # Every day at 9:00 AM

# Or use convenience methods
job.minute.every(30)     # Every 30 minutes
job.hour.on(9, 17)       # At 9 AM and 5 PM
job.dow.on('MON', 'FRI') # Monday and Friday

# Enable/disable
job.enable()
job.enable(False)

# Write to crontab
cron.write()

# List all jobs
for job in cron:
    print(job)

# Remove job
cron.remove(job)
cron.write()

Background Task Runner

import threading
import queue
import time
from typing import Callable, Any
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Task:
    func: Callable
    args: tuple = ()
    kwargs: dict = None
    scheduled_time: datetime = None

class TaskRunner:
    """Simple background task runner."""

    def __init__(self, num_workers: int = 2):
        self.task_queue = queue.Queue()
        self.workers = []
        self.running = True

        for _ in range(num_workers):
            worker = threading.Thread(target=self._worker)
            worker.daemon = True
            worker.start()
            self.workers.append(worker)

    def _worker(self):
        """Worker thread that processes tasks."""
        while self.running:
            try:
                task = self.task_queue.get(timeout=1)

                # Check if scheduled for later
                if task.scheduled_time and datetime.now() < task.scheduled_time:
                    self.task_queue.put(task)
                    time.sleep(0.1)
                    continue

                # Execute task
                try:
                    kwargs = task.kwargs or {}
                    task.func(*task.args, **kwargs)
                except Exception as e:
                    print(f"Task error: {e}")
                finally:
                    self.task_queue.task_done()

            except queue.Empty:
                continue

    def submit(self, func: Callable, *args, **kwargs):
        """Submit a task for immediate execution."""
        self.task_queue.put(Task(func, args, kwargs))

    def schedule(self, func: Callable, delay_seconds: float, *args, **kwargs):
        """Schedule a task for future execution."""
        scheduled_time = datetime.now() + timedelta(seconds=delay_seconds)
        self.task_queue.put(Task(func, args, kwargs, scheduled_time))

    def shutdown(self, wait: bool = True):
        """Shutdown the task runner."""
        self.running = False
        if wait:
            self.task_queue.join()

Web Scraping

Using Requests and BeautifulSoup

import requests
from bs4 import BeautifulSoup  # pip install beautifulsoup4

# Fetch page
response = requests.get('https://example.com')
soup = BeautifulSoup(response.text, 'html.parser')

# Find elements
title = soup.find('title').text
all_links = soup.find_all('a')
divs_with_class = soup.find_all('div', class_='content')

# CSS selectors
items = soup.select('div.item > p')
nav_links = soup.select('nav a[href]')

# Extract data
for link in all_links:
    href = link.get('href')
    text = link.text.strip()
    print(f"{text}: {href}")

# Extract specific attributes
images = soup.find_all('img')
for img in images:
    src = img.get('src')
    alt = img.get('alt', 'No alt text')
    print(f"{alt}: {src}")

Complete Web Scraper

import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time
from typing import List, Dict
import json

class WebScraper:
    """Configurable web scraper with politeness features."""

    def __init__(
        self,
        delay: float = 1.0,
        headers: dict = None,
        timeout: float = 10.0
    ):
        self.delay = delay
        self.timeout = timeout
        self.session = requests.Session()
        self.session.headers.update(headers or {
            'User-Agent': 'Mozilla/5.0 (compatible; PythonBot/1.0)'
        })

    def fetch(self, url: str) -> BeautifulSoup:
        """Fetch and parse a page."""
        time.sleep(self.delay)  # Be polite

        response = self.session.get(url, timeout=self.timeout)
        response.raise_for_status()

        return BeautifulSoup(response.text, 'html.parser')

    def extract_links(self, soup: BeautifulSoup, base_url: str) -> List[str]:
        """Extract all links from page."""
        links = []
        for a in soup.find_all('a', href=True):
            href = a['href']
            absolute_url = urljoin(base_url, href)
            links.append(absolute_url)
        return links

    def extract_data(
        self,
        soup: BeautifulSoup,
        selectors: Dict[str, str]
    ) -> Dict[str, str]:
        """Extract data using CSS selectors."""
        data = {}
        for key, selector in selectors.items():
            element = soup.select_one(selector)
            data[key] = element.text.strip() if element else None
        return data

    def crawl(
        self,
        start_url: str,
        max_pages: int = 10,
        same_domain: bool = True
    ) -> List[Dict]:
        """Crawl website and collect data."""
        visited = set()
        to_visit = [start_url]
        results = []

        start_domain = urlparse(start_url).netloc

        while to_visit and len(visited) < max_pages:
            url = to_visit.pop(0)

            if url in visited:
                continue

            try:
                soup = self.fetch(url)
                visited.add(url)

                # Store page info
                results.append({
                    'url': url,
                    'title': soup.title.text if soup.title else '',
                    'links': len(soup.find_all('a'))
                })

                # Get new links
                for link in self.extract_links(soup, url):
                    link_domain = urlparse(link).netloc
                    if same_domain and link_domain != start_domain:
                        continue
                    if link not in visited:
                        to_visit.append(link)

            except Exception as e:
                print(f"Error fetching {url}: {e}")

        return results


# Usage
# scraper = WebScraper(delay=2.0)
# data = scraper.crawl('https://example.com', max_pages=5)

Handling JavaScript-Rendered Pages

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

# pip install selenium
# Download ChromeDriver or use webdriver-manager

def scrape_dynamic_page(url: str):
    """Scrape JavaScript-rendered content."""

    # Setup headless browser
    options = Options()
    options.add_argument('--headless')
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')

    driver = webdriver.Chrome(options=options)

    try:
        driver.get(url)

        # Wait for dynamic content
        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CLASS_NAME, "dynamic-content"))
        )

        # Get rendered HTML
        html = driver.page_source
        soup = BeautifulSoup(html, 'html.parser')

        # Extract data
        items = soup.select('.dynamic-content .item')
        return [item.text for item in items]

    finally:
        driver.quit()

Email Automation

Sending Emails

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from pathlib import Path

def send_email(
    to: str,
    subject: str,
    body: str,
    attachments: list = None,
    html: bool = False
):
    """Send email with optional attachments."""

    # Configuration (use environment variables in production!)
    smtp_server = 'smtp.gmail.com'
    smtp_port = 587
    username = 'your_email@gmail.com'
    password = 'your_app_password'  # Use app-specific password

    # Create message
    msg = MIMEMultipart()
    msg['From'] = username
    msg['To'] = to
    msg['Subject'] = subject

    # Body
    if html:
        msg.attach(MIMEText(body, 'html'))
    else:
        msg.attach(MIMEText(body, 'plain'))

    # Attachments
    if attachments:
        for filepath in attachments:
            path = Path(filepath)
            with open(path, 'rb') as f:
                part = MIMEBase('application', 'octet-stream')
                part.set_payload(f.read())
                encoders.encode_base64(part)
                part.add_header(
                    'Content-Disposition',
                    f'attachment; filename={path.name}'
                )
                msg.attach(part)

    # Send
    with smtplib.SMTP(smtp_server, smtp_port) as server:
        server.starttls()
        server.login(username, password)
        server.send_message(msg)


# HTML email template
def send_report_email(to: str, data: dict):
    """Send formatted HTML report."""

    html_body = f"""
    <html>
    <body>
        <h1>Daily Report</h1>
        <table border="1">
            <tr>
                <th>Metric</th>
                <th>Value</th>
            </tr>
            {''.join(f'<tr><td>{k}</td><td>{v}</td></tr>' for k, v in data.items())}
        </table>
    </body>
    </html>
    """

    send_email(to, "Daily Report", html_body, html=True)

GUI Automation

Using PyAutoGUI

import pyautogui  # pip install pyautogui
import time

# Safety: move mouse to corner to abort
pyautogui.FAILSAFE = True

# Screen info
width, height = pyautogui.size()
x, y = pyautogui.position()

# Mouse actions
pyautogui.moveTo(100, 100, duration=0.5)
pyautogui.click()
pyautogui.doubleClick()
pyautogui.rightClick()
pyautogui.scroll(3)  # Scroll up 3 clicks

# Keyboard actions
pyautogui.write('Hello World', interval=0.1)
pyautogui.press('enter')
pyautogui.hotkey('ctrl', 'c')
pyautogui.hotkey('alt', 'tab')

# Screenshot
screenshot = pyautogui.screenshot()
screenshot.save('screen.png')

# Region screenshot
region = pyautogui.screenshot(region=(0, 0, 300, 400))

# Locate image on screen
location = pyautogui.locateOnScreen('button.png')
if location:
    pyautogui.click(location)

Automating Applications

import pyautogui
import time
import subprocess

class AppAutomator:
    """Automate desktop applications."""

    def __init__(self, app_path: str):
        self.app_path = app_path
        self.delay = 0.5

    def start_app(self):
        """Launch the application."""
        subprocess.Popen([self.app_path])
        time.sleep(2)  # Wait for app to load

    def wait_for_image(self, image: str, timeout: float = 10) -> tuple:
        """Wait for image to appear on screen."""
        start = time.time()
        while time.time() - start < timeout:
            location = pyautogui.locateOnScreen(image, confidence=0.9)
            if location:
                return pyautogui.center(location)
            time.sleep(0.5)
        raise TimeoutError(f"Image {image} not found")

    def click_image(self, image: str):
        """Click on image when found."""
        center = self.wait_for_image(image)
        pyautogui.click(center)
        time.sleep(self.delay)

    def type_text(self, text: str):
        """Type text with natural delay."""
        pyautogui.write(text, interval=0.05)

    def press_key(self, key: str):
        """Press a key."""
        pyautogui.press(key)
        time.sleep(self.delay)

    def hotkey(self, *keys):
        """Press key combination."""
        pyautogui.hotkey(*keys)
        time.sleep(self.delay)

Best Practices

Error Handling and Logging

import logging
from functools import wraps
import traceback

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('automation.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

def retry(max_attempts: int = 3, delay: float = 1.0):
    """Decorator to retry failed operations."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    logger.warning(
                        f"Attempt {attempt + 1} failed: {e}"
                    )
                    time.sleep(delay)
            raise last_exception
        return wrapper
    return decorator


@retry(max_attempts=3)
def unstable_operation():
    """Operation that might fail."""
    pass

Configuration Management

import os
from pathlib import Path
import json
import yaml  # pip install pyyaml

class Config:
    """Configuration manager for automation scripts."""

    def __init__(self, config_path: str = None):
        self.config = {}

        if config_path:
            self.load(config_path)
        else:
            self.load_from_env()

    def load(self, path: str):
        """Load config from file."""
        path = Path(path)

        if path.suffix == '.json':
            with open(path) as f:
                self.config = json.load(f)
        elif path.suffix in ['.yml', '.yaml']:
            with open(path) as f:
                self.config = yaml.safe_load(f)

    def load_from_env(self):
        """Load config from environment variables."""
        self.config = {
            'smtp_server': os.getenv('SMTP_SERVER'),
            'smtp_user': os.getenv('SMTP_USER'),
            'api_key': os.getenv('API_KEY'),
        }

    def get(self, key: str, default=None):
        """Get config value."""
        return self.config.get(key, default)


# Example config.yaml:
"""
smtp:
  server: smtp.gmail.com
  port: 587
  user: ${SMTP_USER}

scraper:
  delay: 2.0
  max_pages: 100
"""

Safe Shutdown

import signal
import sys
import atexit

class GracefulShutdown:
    """Handle graceful shutdown of automation scripts."""

    def __init__(self):
        self.shutdown_requested = False
        self.cleanup_handlers = []

        # Register signal handlers
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)

        # Register cleanup at exit
        atexit.register(self._cleanup)

    def _signal_handler(self, signum, frame):
        """Handle shutdown signals."""
        logger.info(f"Received signal {signum}, shutting down...")
        self.shutdown_requested = True

    def _cleanup(self):
        """Run cleanup handlers."""
        for handler in self.cleanup_handlers:
            try:
                handler()
            except Exception as e:
                logger.error(f"Cleanup error: {e}")

    def register_cleanup(self, handler):
        """Register a cleanup handler."""
        self.cleanup_handlers.append(handler)

    def should_exit(self) -> bool:
        """Check if shutdown was requested."""
        return self.shutdown_requested


# Usage
shutdown = GracefulShutdown()

def cleanup_resources():
    print("Cleaning up...")

shutdown.register_cleanup(cleanup_resources)

while not shutdown.should_exit():
    # Do work
    pass

Common Automation Patterns

PatternUse CaseLibrary
File WatchMonitor for changeswatchdog
SchedulingRun tasks on scheduleschedule, APScheduler
Web ScrapingExtract web datarequests, BeautifulSoup
Browser AutomationJavaScript sitesSelenium, Playwright
GUI AutomationDesktop appsPyAutoGUI
EmailNotificationssmtplib, email
CLIUser interactionclick, argparse

Next Steps

  1. Web APIs: Learn to work with REST APIs
  2. Async: Use asyncio for concurrent automation
  3. Docker: Containerize automation scripts
  4. CI/CD: Integrate with Jenkins, GitHub Actions
  5. Cloud: Deploy on AWS Lambda, Google Cloud Functions
Automation - Python Tutorial | DeepML