How Python Replaced 6 Hours of Manual Data Work Every Week
When I joined Mechanismic as a data intern, the team spent 6 hours every Monday manually pulling data from 500+ sources. I built a Python pipeline that reduced it to 12 minutes. Here's the architecture, the gotchas, and the code that made it happen.
The Monday Morning Nightmare
Picture this: every Monday at 7 AM, two analysts sit down with 47 browser tabs open. They're manually downloading CSVs, copying data into spreadsheets, fixing date formats, and merging everything into a master report. By 1 PM, they have a "weekly snapshot" that's already 6 hours stale.
This was happening at Mechanismic when I joined as a data analyst intern. The company tracked marketing performance across 500+ data sources — ad platforms, email providers, social channels, CRM exports. All manual. All painful.
I was given six weeks to fix it. I finished in four.
The pipeline I built reduced a 6-hour weekly manual process to 12 minutes of automated execution. It processed 500+ sources, handled 14 different data formats, and improved email campaign engagement by 60%.
The Architecture
The core challenge wasn't "how do I write Python." It was "how do I handle 500 sources that all lie about their data format."
Some gave clean CSVs. Some gave JSON with nested arrays. Some gave Excel files with merged cells and colour-coded headers (yes, really). One source gave data in a PDF that was actually a scanned image.
Here's the pipeline architecture I landed on:
Sources (500+) → Connector Layer → Normalize → Validate → Transform → Load
↓ ↓ ↓ ↓ ↓ ↓
CSV/JSON/ Pandas read Schema Business Pivot & Output
Excel/API with fallback mapping rules aggregate Master CSV
Stage 1: The Connector Layer
The hardest part was building connectors that degrade gracefully. Every source can fail independently, and one failure can't block 499 other sources.
import pandas as pd
from pathlib import Path
from typing import Optional
import logging
logger = logging.getLogger(__name__)
def read_source(file_path: Path) -> Optional[pd.DataFrame]:
"""Read a data source with automatic format detection and fallback."""
readers = {
'.csv': pd.read_csv,
'.xlsx': pd.read_excel,
'.json': pd.read_json,
}
reader = readers.get(file_path.suffix, pd.read_csv)
try:
df = reader(file_path)
logger.info(f"Loaded {len(df)} rows from {file_path.name}")
return df
except Exception as e:
logger.error(f"Failed to read {file_path.name}: {e}")
return None
Nothing fancy. The key insight: each source failure is isolated. I wrapped every connector in a try/except, logged the failure, and moved on. The pipeline doesn't stop because one CSV is corrupt.
Stage 2: Schema Normalization
500 sources means 500 different column names for the same thing. "Revenue" could be revenue, rev, total_revenue, sales, amount, gross, or my personal favourite: col_14.
I built a mapping layer:
COLUMN_ALIASES = {
'revenue': ['rev', 'total_revenue', 'sales', 'amount', 'gross', 'income'],
'date': ['dt', 'timestamp', 'period', 'reporting_date', 'week_starting'],
'channel': ['platform', 'source', 'medium', 'channel_name'],
'campaign': ['campaign_name', 'ad_group', 'promo', 'initiative'],
}
def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Map inconsistent column names to standard schema."""
reverse_map = {}
for canonical, aliases in COLUMN_ALIASES.items():
for alias in aliases:
reverse_map[alias.lower()] = canonical
reverse_map[canonical.lower()] = canonical
df.columns = [reverse_map.get(col.lower().strip(), col.lower().strip()) for col in df.columns]
return df
This single function eliminated 80% of the "why is this report broken" Slack messages.
Stage 3: Validation Rules
Before any data hits the master report, it passes through business rules:
VALIDATION_RULES = {
'revenue': lambda s: (s >= 0) & (s < 1000000),
'date': lambda s: pd.to_datetime(s, errors='coerce').notna(),
'channel': lambda s: s.str.lower().isin(VALID_CHANNELS),
}
def validate(df: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]:
"""Return clean data and list of issues found."""
issues = []
for col, rule in VALIDATION_RULES.items():
if col not in df.columns:
issues.append(f"Missing column: {col}")
continue
violations = ~rule(df[col])
if violations.any():
issues.append(f"{col}: {violations.sum()} invalid values")
df = df[~violations]
return df, issues
The validation catches problems before they propagate. An email campaign showing $2M revenue from a $500 budget? Flagged and excluded. A date column with "TBD" as a value? Flagged and excluded.
The Results
| Metric | Before | After | |--------|--------|-------| | Weekly processing time | 6 hours | 12 minutes | | Data sources handled | ~120 (manual limit) | 500+ (automated) | | Error rate | ~8% of reports had issues | under 0.5% | | Time to detect bad data | Days (or never) | Real-time | | Email campaign engagement | Baseline | +60% |
That 60% email engagement lift came from a side effect: the pipeline surfaced data that showed which subject lines actually worked. When you process 500 sources every week instead of 120, patterns emerge that manual work hides.
Lessons for Your Own Pipeline
1. Failure is a feature, not a bug. Your pipeline should handle every source failing independently. If one broken CSV kills your entire Monday report, you've built a fragile system.
2. Normalize early. Column name inconsistencies cause more bugs than actual data errors. Solve the mapping problem once in a central place.
3. Validate before you transform. Bad data in, bad analysis out. Catch it at the gate.
4. Log everything. When your pipeline runs at 7 AM and someone asks "why is this number different" at 2 PM, you need to trace back exactly what happened.
The code isn't complex. The architecture decisions are. If you're still doing manual data work every Monday morning, the ROI on building a pipeline is measured in weeks, not months.
How many hours does your team spend on manual data tasks every week? I bet the number would shock your manager.