Technical 10 min read

Building ETL Pipelines with Web Scraping APIs

Learn how to build production-ready ETL pipelines using web scraping APIs. Covers extraction, transformation, loading, scheduling, and monitoring.

FT
FineData Team
|

Building ETL Pipelines with Web Scraping APIs

Every data-driven organization eventually faces the same challenge: the data you need lives on the web, scattered across thousands of pages, and you need it structured, clean, and flowing into your systems automatically. That is where ETL pipelines powered by web scraping come in.

ETL — Extract, Transform, Load — is the backbone of modern data engineering. When you combine it with a reliable web scraping API like FineData, you get a pipeline that can pull fresh data from any website, normalize it into a consistent schema, and push it into your database or warehouse on autopilot.

This guide walks through how to architect, build, and operate production ETL pipelines with web scraping at the extraction layer.

The Architecture at a Glance

A well-designed scraping ETL pipeline has three distinct stages:

  1. Extract — Fetch raw HTML or JSON from target websites using a scraping API
  2. Transform — Parse, clean, normalize, and deduplicate the raw data
  3. Load — Write the structured data into your storage layer (databases, warehouses, spreadsheets)

Each stage should be isolated, idempotent, and independently testable. Let’s look at each one in depth.

Stage 1: Extract — Fetching Data with FineData

The extraction layer is responsible for fetching raw content from target URLs. This is where most pipelines break, because websites change, block scrapers, or load content dynamically. A scraping API abstracts away those concerns.

Basic Extraction

Here is a straightforward extraction function that fetches a product listing page:

import requests
from typing import Optional

FINEDATA_API_KEY = "fd_your_api_key"
FINEDATA_URL = "https://api.finedata.ai/api/v1/scrape"

def extract_page(url: str, use_js: bool = False) -> Optional[str]:
    """Fetch a single page and return its HTML content."""
    response = requests.post(
        FINEDATA_URL,
        headers={
            "x-api-key": FINEDATA_API_KEY,
            "Content-Type": "application/json"
        },
        json={
            "url": url,
            "use_js_render": use_js,
            "solve_captcha": False,
            "tls_profile": "chrome124",
            "timeout": 30
        }
    )

    if response.status_code == 200:
        return response.json().get("content")
    else:
        print(f"Extraction failed for {url}: {response.status_code}")
        return None

Batch Extraction

For pipelines that scrape hundreds or thousands of pages, batch extraction is critical. FineData’s batch endpoint lets you submit up to 100 URLs in one request:

def extract_batch(urls: list[str], use_js: bool = False) -> list[dict]:
    """Extract multiple pages in a single batch request."""
    response = requests.post(
        "https://api.finedata.ai/api/v1/batch",
        headers={
            "x-api-key": FINEDATA_API_KEY,
            "Content-Type": "application/json"
        },
        json={
            "urls": urls,
            "use_js_render": use_js,
            "timeout": 60
        }
    )

    if response.status_code == 200:
        return response.json().get("results", [])
    return []

Handling Dynamic Content

Many modern websites render content with JavaScript. E-commerce sites, SPAs, and dashboards often require a full browser render. Enable use_js_render for these targets — it costs more tokens but ensures you get the fully rendered DOM.

For pages behind anti-bot protection, add residential proxies:

payload = {
    "url": "https://protected-store.com/products",
    "use_js_render": True,
    "use_residential": True,
    "tls_profile": "chrome124",
    "timeout": 45
}

Retry Logic and Error Handling

Production pipelines must handle failures gracefully. Websites go down, rate limits kick in, and network errors happen. Always implement exponential backoff:

import time

def extract_with_retry(url: str, max_retries: int = 3) -> Optional[str]:
    """Extract a page with exponential backoff on failure."""
    for attempt in range(max_retries):
        result = extract_page(url, use_js=True)
        if result is not None:
            return result

        wait_time = 2 ** attempt  # 1s, 2s, 4s
        print(f"Retry {attempt + 1}/{max_retries} for {url}, waiting {wait_time}s")
        time.sleep(wait_time)

    print(f"All retries exhausted for {url}")
    return None

Stage 2: Transform — Parsing and Cleaning Data

Once you have raw HTML, you need to extract structured fields, normalize formats, and handle edge cases. This stage determines the quality of your final dataset.

Parsing HTML with BeautifulSoup

For most use cases, BeautifulSoup combined with CSS selectors gets the job done:

from bs4 import BeautifulSoup
from dataclasses import dataclass
from decimal import Decimal
import re

@dataclass
class Product:
    name: str
    price: Decimal
    currency: str
    sku: str
    in_stock: bool
    url: str

def transform_product_page(html: str, source_url: str) -> Product:
    """Parse a product page into a structured Product object."""
    soup = BeautifulSoup(html, "html.parser")

    name = soup.select_one("h1.product-title")
    price_el = soup.select_one(".price-current")
    sku_el = soup.select_one("[data-sku]")
    stock_el = soup.select_one(".stock-status")

    # Normalize price: remove currency symbols, commas, whitespace
    raw_price = price_el.get_text(strip=True) if price_el else "0"
    clean_price = re.sub(r"[^\d.]", "", raw_price)

    return Product(
        name=name.get_text(strip=True) if name else "Unknown",
        price=Decimal(clean_price) if clean_price else Decimal("0"),
        currency="USD",
        sku=sku_el["data-sku"] if sku_el else "",
        in_stock="in stock" in (stock_el.get_text().lower() if stock_el else ""),
        url=source_url
    )

Data Normalization

Real-world scraped data is messy. Prices come in different formats ($1,299.99, 1299.99 USD, €1.299,99). Dates vary wildly. Names have inconsistent casing. Your transform layer must normalize everything:

  • Prices: Strip currency symbols, handle locale-specific decimal separators, convert to a standard currency
  • Dates: Parse with dateutil.parser or similar, store in ISO 8601 format (UTC)
  • Text: Strip extra whitespace, normalize Unicode, apply consistent casing
  • URLs: Resolve relative URLs to absolute, remove tracking parameters

Deduplication

When scraping the same source regularly, you will inevitably encounter duplicate records. Deduplication strategies include:

  • Hash-based: Compute a hash of key fields (name + SKU + price) and skip records that already exist
  • Upsert: Use database upsert operations (INSERT ON CONFLICT UPDATE) to automatically handle duplicates
  • Window-based: Track what was scraped in each run and only process new or changed records
import hashlib

def compute_record_hash(product: Product) -> str:
    """Generate a deterministic hash for deduplication."""
    key = f"{product.sku}:{product.name}:{product.price}"
    return hashlib.sha256(key.encode()).hexdigest()

Stage 3: Load — Storing Structured Data

The final stage writes clean, structured data into your storage systems. The right destination depends on your use case.

Loading into PostgreSQL

For transactional workloads and moderate data volumes, PostgreSQL is the go-to choice:

import psycopg2
from psycopg2.extras import execute_values

def load_products(products: list[Product], conn_string: str):
    """Load products into PostgreSQL with upsert."""
    conn = psycopg2.connect(conn_string)
    cursor = conn.cursor()

    values = [
        (p.sku, p.name, float(p.price), p.currency, p.in_stock, p.url)
        for p in products
    ]

    query = """
        INSERT INTO products (sku, name, price, currency, in_stock, url)
        VALUES %s
        ON CONFLICT (sku)
        DO UPDATE SET
            price = EXCLUDED.price,
            in_stock = EXCLUDED.in_stock,
            updated_at = NOW()
    """

    execute_values(cursor, query, values)
    conn.commit()
    cursor.close()
    conn.close()

Loading into a Data Warehouse

For analytics workloads with large data volumes, load into BigQuery, Snowflake, or Redshift. Most warehouses support bulk loading from CSV or Parquet files:

import pandas as pd

def load_to_warehouse(products: list[Product], output_path: str):
    """Convert products to Parquet for warehouse loading."""
    df = pd.DataFrame([vars(p) for p in products])
    df["price"] = df["price"].astype(float)
    df["scraped_at"] = pd.Timestamp.utcnow()

    df.to_parquet(output_path, index=False)
    # Upload to S3/GCS and trigger warehouse COPY command

Loading into Google Sheets

For smaller datasets or stakeholder-facing reports, Google Sheets works surprisingly well:

import gspread
from google.oauth2.service_account import Credentials

def load_to_sheets(products: list[Product], spreadsheet_id: str):
    """Append products to a Google Sheet."""
    creds = Credentials.from_service_account_file("service_account.json")
    gc = gspread.authorize(creds)
    sheet = gc.open_by_key(spreadsheet_id).sheet1

    rows = [[p.name, str(p.price), p.currency, p.sku, p.in_stock] for p in products]
    sheet.append_rows(rows)

Putting It All Together

Here is a complete pipeline that ties all three stages together:

def run_pipeline(urls: list[str], db_conn: str):
    """Execute the full ETL pipeline."""
    print(f"Starting pipeline for {len(urls)} URLs")

    # Extract
    raw_pages = []
    for url in urls:
        html = extract_with_retry(url)
        if html:
            raw_pages.append((url, html))
    print(f"Extracted {len(raw_pages)}/{len(urls)} pages")

    # Transform
    products = []
    seen_hashes = set()
    for url, html in raw_pages:
        try:
            product = transform_product_page(html, url)
            record_hash = compute_record_hash(product)
            if record_hash not in seen_hashes:
                products.append(product)
                seen_hashes.add(record_hash)
        except Exception as e:
            print(f"Transform error for {url}: {e}")
    print(f"Transformed {len(products)} unique products")

    # Load
    if products:
        load_products(products, db_conn)
        print(f"Loaded {len(products)} products to database")

Scheduling and Orchestration

A pipeline is only useful if it runs automatically.

Cron for Simple Pipelines

For straightforward, single-script pipelines, a cron job is the simplest option:

# Run the product scraper every day at 2 AM UTC
0 2 * * * /usr/bin/python3 /opt/pipelines/product_scraper.py >> /var/log/scraper.log 2>&1

Apache Airflow for Complex Workflows

When your pipeline has dependencies, branching logic, or needs retries with SLA monitoring, Apache Airflow is the standard:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    "product_scraper",
    default_args=default_args,
    schedule_interval="0 2 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
) as dag:

    extract = PythonOperator(
        task_id="extract_pages",
        python_callable=extract_all_pages,
    )

    transform = PythonOperator(
        task_id="transform_data",
        python_callable=transform_all_products,
    )

    load = PythonOperator(
        task_id="load_to_database",
        python_callable=load_products_to_db,
    )

    extract >> transform >> load

Monitoring and Alerting

Production pipelines fail. What matters is knowing when they fail and recovering quickly.

Key Metrics to Track

  • Extraction success rate: What percentage of URLs returned valid content?
  • Records processed: How many records made it through each stage?
  • Pipeline duration: Is the pipeline slowing down over time?
  • Data freshness: When was the last successful run?
  • Token consumption: Are you staying within budget on API calls?

Idempotency

An idempotent pipeline can be re-run safely without creating duplicate data or side effects. This is essential for recovery. Techniques include:

  • Use upserts instead of inserts
  • Track pipeline runs with a unique run_id
  • Use hash-based deduplication before loading
  • Design transforms as pure functions with no side effects
  • Store intermediate results so you can resume from any stage

Common Pitfalls and How to Avoid Them

Coupling extraction to transformation. Keep your stages separate. Save raw HTML to disk or object storage before transforming. This lets you re-transform without re-scraping.

Ignoring schema evolution. Websites change their HTML structure. Build your parsers defensively — handle missing fields gracefully and log warnings when expected elements are absent.

No monitoring. A pipeline that fails silently is worse than no pipeline. Add logging, metrics, and alerts from day one.

Over-engineering scheduling. Start with cron. Move to Airflow or Prefect only when you actually need DAG dependencies, retries, or multi-step orchestration.

Not respecting rate limits. Even with a scraping API handling the hard parts, be thoughtful about request volume. Spread requests over time and use batch endpoints when available.

Conclusion

Building ETL pipelines with web scraping is not fundamentally different from any other data pipeline — it just has a more unpredictable source. By using a reliable API like FineData for extraction, structuring your transforms defensively, and loading with idempotent operations, you can build pipelines that run reliably in production.

Start simple: one script, one source, one cron job. Then grow into Airflow DAGs and warehouse loading as your data needs evolve. The architecture patterns here will scale with you.

Ready to build your first pipeline? Sign up for FineData and start extracting data in minutes.

#etl #data-pipeline #architecture #automation #data-engineering

Related Articles