Building ETL Pipelines with Web Scraping APIs
Learn how to build production-ready ETL pipelines using web scraping APIs. Covers extraction, transformation, loading, scheduling, and monitoring.
Building ETL Pipelines with Web Scraping APIs
Every data-driven organization eventually faces the same challenge: the data you need lives on the web, scattered across thousands of pages, and you need it structured, clean, and flowing into your systems automatically. That is where ETL pipelines powered by web scraping come in.
ETL — Extract, Transform, Load — is the backbone of modern data engineering. When you combine it with a reliable web scraping API like FineData, you get a pipeline that can pull fresh data from any website, normalize it into a consistent schema, and push it into your database or warehouse on autopilot.
This guide walks through how to architect, build, and operate production ETL pipelines with web scraping at the extraction layer.
The Architecture at a Glance
A well-designed scraping ETL pipeline has three distinct stages:
- Extract — Fetch raw HTML or JSON from target websites using a scraping API
- Transform — Parse, clean, normalize, and deduplicate the raw data
- Load — Write the structured data into your storage layer (databases, warehouses, spreadsheets)
Each stage should be isolated, idempotent, and independently testable. Let’s look at each one in depth.
Stage 1: Extract — Fetching Data with FineData
The extraction layer is responsible for fetching raw content from target URLs. This is where most pipelines break, because websites change, block scrapers, or load content dynamically. A scraping API abstracts away those concerns.
Basic Extraction
Here is a straightforward extraction function that fetches a product listing page:
import requests
from typing import Optional
FINEDATA_API_KEY = "fd_your_api_key"
FINEDATA_URL = "https://api.finedata.ai/api/v1/scrape"
def extract_page(url: str, use_js: bool = False) -> Optional[str]:
"""Fetch a single page and return its HTML content."""
response = requests.post(
FINEDATA_URL,
headers={
"x-api-key": FINEDATA_API_KEY,
"Content-Type": "application/json"
},
json={
"url": url,
"use_js_render": use_js,
"solve_captcha": False,
"tls_profile": "chrome124",
"timeout": 30
}
)
if response.status_code == 200:
return response.json().get("content")
else:
print(f"Extraction failed for {url}: {response.status_code}")
return None
Batch Extraction
For pipelines that scrape hundreds or thousands of pages, batch extraction is critical. FineData’s batch endpoint lets you submit up to 100 URLs in one request:
def extract_batch(urls: list[str], use_js: bool = False) -> list[dict]:
"""Extract multiple pages in a single batch request."""
response = requests.post(
"https://api.finedata.ai/api/v1/batch",
headers={
"x-api-key": FINEDATA_API_KEY,
"Content-Type": "application/json"
},
json={
"urls": urls,
"use_js_render": use_js,
"timeout": 60
}
)
if response.status_code == 200:
return response.json().get("results", [])
return []
Handling Dynamic Content
Many modern websites render content with JavaScript. E-commerce sites, SPAs, and dashboards often require a full browser render. Enable use_js_render for these targets — it costs more tokens but ensures you get the fully rendered DOM.
For pages behind anti-bot protection, add residential proxies:
payload = {
"url": "https://protected-store.com/products",
"use_js_render": True,
"use_residential": True,
"tls_profile": "chrome124",
"timeout": 45
}
Retry Logic and Error Handling
Production pipelines must handle failures gracefully. Websites go down, rate limits kick in, and network errors happen. Always implement exponential backoff:
import time
def extract_with_retry(url: str, max_retries: int = 3) -> Optional[str]:
"""Extract a page with exponential backoff on failure."""
for attempt in range(max_retries):
result = extract_page(url, use_js=True)
if result is not None:
return result
wait_time = 2 ** attempt # 1s, 2s, 4s
print(f"Retry {attempt + 1}/{max_retries} for {url}, waiting {wait_time}s")
time.sleep(wait_time)
print(f"All retries exhausted for {url}")
return None
Stage 2: Transform — Parsing and Cleaning Data
Once you have raw HTML, you need to extract structured fields, normalize formats, and handle edge cases. This stage determines the quality of your final dataset.
Parsing HTML with BeautifulSoup
For most use cases, BeautifulSoup combined with CSS selectors gets the job done:
from bs4 import BeautifulSoup
from dataclasses import dataclass
from decimal import Decimal
import re
@dataclass
class Product:
name: str
price: Decimal
currency: str
sku: str
in_stock: bool
url: str
def transform_product_page(html: str, source_url: str) -> Product:
"""Parse a product page into a structured Product object."""
soup = BeautifulSoup(html, "html.parser")
name = soup.select_one("h1.product-title")
price_el = soup.select_one(".price-current")
sku_el = soup.select_one("[data-sku]")
stock_el = soup.select_one(".stock-status")
# Normalize price: remove currency symbols, commas, whitespace
raw_price = price_el.get_text(strip=True) if price_el else "0"
clean_price = re.sub(r"[^\d.]", "", raw_price)
return Product(
name=name.get_text(strip=True) if name else "Unknown",
price=Decimal(clean_price) if clean_price else Decimal("0"),
currency="USD",
sku=sku_el["data-sku"] if sku_el else "",
in_stock="in stock" in (stock_el.get_text().lower() if stock_el else ""),
url=source_url
)
Data Normalization
Real-world scraped data is messy. Prices come in different formats ($1,299.99, 1299.99 USD, €1.299,99). Dates vary wildly. Names have inconsistent casing. Your transform layer must normalize everything:
- Prices: Strip currency symbols, handle locale-specific decimal separators, convert to a standard currency
- Dates: Parse with
dateutil.parseror similar, store in ISO 8601 format (UTC) - Text: Strip extra whitespace, normalize Unicode, apply consistent casing
- URLs: Resolve relative URLs to absolute, remove tracking parameters
Deduplication
When scraping the same source regularly, you will inevitably encounter duplicate records. Deduplication strategies include:
- Hash-based: Compute a hash of key fields (name + SKU + price) and skip records that already exist
- Upsert: Use database upsert operations (INSERT ON CONFLICT UPDATE) to automatically handle duplicates
- Window-based: Track what was scraped in each run and only process new or changed records
import hashlib
def compute_record_hash(product: Product) -> str:
"""Generate a deterministic hash for deduplication."""
key = f"{product.sku}:{product.name}:{product.price}"
return hashlib.sha256(key.encode()).hexdigest()
Stage 3: Load — Storing Structured Data
The final stage writes clean, structured data into your storage systems. The right destination depends on your use case.
Loading into PostgreSQL
For transactional workloads and moderate data volumes, PostgreSQL is the go-to choice:
import psycopg2
from psycopg2.extras import execute_values
def load_products(products: list[Product], conn_string: str):
"""Load products into PostgreSQL with upsert."""
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
values = [
(p.sku, p.name, float(p.price), p.currency, p.in_stock, p.url)
for p in products
]
query = """
INSERT INTO products (sku, name, price, currency, in_stock, url)
VALUES %s
ON CONFLICT (sku)
DO UPDATE SET
price = EXCLUDED.price,
in_stock = EXCLUDED.in_stock,
updated_at = NOW()
"""
execute_values(cursor, query, values)
conn.commit()
cursor.close()
conn.close()
Loading into a Data Warehouse
For analytics workloads with large data volumes, load into BigQuery, Snowflake, or Redshift. Most warehouses support bulk loading from CSV or Parquet files:
import pandas as pd
def load_to_warehouse(products: list[Product], output_path: str):
"""Convert products to Parquet for warehouse loading."""
df = pd.DataFrame([vars(p) for p in products])
df["price"] = df["price"].astype(float)
df["scraped_at"] = pd.Timestamp.utcnow()
df.to_parquet(output_path, index=False)
# Upload to S3/GCS and trigger warehouse COPY command
Loading into Google Sheets
For smaller datasets or stakeholder-facing reports, Google Sheets works surprisingly well:
import gspread
from google.oauth2.service_account import Credentials
def load_to_sheets(products: list[Product], spreadsheet_id: str):
"""Append products to a Google Sheet."""
creds = Credentials.from_service_account_file("service_account.json")
gc = gspread.authorize(creds)
sheet = gc.open_by_key(spreadsheet_id).sheet1
rows = [[p.name, str(p.price), p.currency, p.sku, p.in_stock] for p in products]
sheet.append_rows(rows)
Putting It All Together
Here is a complete pipeline that ties all three stages together:
def run_pipeline(urls: list[str], db_conn: str):
"""Execute the full ETL pipeline."""
print(f"Starting pipeline for {len(urls)} URLs")
# Extract
raw_pages = []
for url in urls:
html = extract_with_retry(url)
if html:
raw_pages.append((url, html))
print(f"Extracted {len(raw_pages)}/{len(urls)} pages")
# Transform
products = []
seen_hashes = set()
for url, html in raw_pages:
try:
product = transform_product_page(html, url)
record_hash = compute_record_hash(product)
if record_hash not in seen_hashes:
products.append(product)
seen_hashes.add(record_hash)
except Exception as e:
print(f"Transform error for {url}: {e}")
print(f"Transformed {len(products)} unique products")
# Load
if products:
load_products(products, db_conn)
print(f"Loaded {len(products)} products to database")
Scheduling and Orchestration
A pipeline is only useful if it runs automatically.
Cron for Simple Pipelines
For straightforward, single-script pipelines, a cron job is the simplest option:
# Run the product scraper every day at 2 AM UTC
0 2 * * * /usr/bin/python3 /opt/pipelines/product_scraper.py >> /var/log/scraper.log 2>&1
Apache Airflow for Complex Workflows
When your pipeline has dependencies, branching logic, or needs retries with SLA monitoring, Apache Airflow is the standard:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"product_scraper",
default_args=default_args,
schedule_interval="0 2 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id="extract_pages",
python_callable=extract_all_pages,
)
transform = PythonOperator(
task_id="transform_data",
python_callable=transform_all_products,
)
load = PythonOperator(
task_id="load_to_database",
python_callable=load_products_to_db,
)
extract >> transform >> load
Monitoring and Alerting
Production pipelines fail. What matters is knowing when they fail and recovering quickly.
Key Metrics to Track
- Extraction success rate: What percentage of URLs returned valid content?
- Records processed: How many records made it through each stage?
- Pipeline duration: Is the pipeline slowing down over time?
- Data freshness: When was the last successful run?
- Token consumption: Are you staying within budget on API calls?
Idempotency
An idempotent pipeline can be re-run safely without creating duplicate data or side effects. This is essential for recovery. Techniques include:
- Use upserts instead of inserts
- Track pipeline runs with a unique
run_id - Use hash-based deduplication before loading
- Design transforms as pure functions with no side effects
- Store intermediate results so you can resume from any stage
Common Pitfalls and How to Avoid Them
Coupling extraction to transformation. Keep your stages separate. Save raw HTML to disk or object storage before transforming. This lets you re-transform without re-scraping.
Ignoring schema evolution. Websites change their HTML structure. Build your parsers defensively — handle missing fields gracefully and log warnings when expected elements are absent.
No monitoring. A pipeline that fails silently is worse than no pipeline. Add logging, metrics, and alerts from day one.
Over-engineering scheduling. Start with cron. Move to Airflow or Prefect only when you actually need DAG dependencies, retries, or multi-step orchestration.
Not respecting rate limits. Even with a scraping API handling the hard parts, be thoughtful about request volume. Spread requests over time and use batch endpoints when available.
Conclusion
Building ETL pipelines with web scraping is not fundamentally different from any other data pipeline — it just has a more unpredictable source. By using a reliable API like FineData for extraction, structuring your transforms defensively, and loading with idempotent operations, you can build pipelines that run reliably in production.
Start simple: one script, one source, one cron job. Then grow into Airflow DAGs and warehouse loading as your data needs evolve. The architecture patterns here will scale with you.
Ready to build your first pipeline? Sign up for FineData and start extracting data in minutes.
Related Articles
Anti-Bot Detection in 2026: How Cloudflare, DataDome, and PerimeterX Work
How modern anti-bot systems detect scrapers in 2026: IP reputation, TLS fingerprinting, JS challenges, behavioral analysis, and device fingerprinting explained.
TechnicalThe Future of Web Scraping: AI, LLMs, and Structured Extraction
Explore how AI and large language models are transforming web scraping with natural language queries, intelligent extraction, and the MCP protocol.
TechnicalMCP Protocol: How to Connect AI Agents to Web Data
Guide to the Model Context Protocol (MCP) for connecting AI agents to live web data. Set up FineData's MCP server with Cursor IDE and Claude Desktop.