Building a Price Monitoring Tool: Step-by-Step Guide
Build a complete price monitoring tool with Python. Track prices, detect changes, and get email alerts. Full code with scheduler and database.
Building a Price Monitoring Tool: Step-by-Step Guide
Price monitoring is one of the most practical applications of web scraping. Whether you’re tracking competitor pricing, waiting for a deal on a product you want, or monitoring your own prices across retail partners, a price tracker pays for itself quickly.
In this guide, we’ll build a complete price monitoring tool from scratch: a scraper that extracts prices, a database to store history, a scheduler to run checks automatically, and email alerts for price changes.
Architecture Overview
Our price monitoring tool has four components:
- Scraper — Fetches product pages and extracts the current price
- Database — Stores price history over time (SQLite for simplicity)
- Scheduler — Runs price checks on a defined interval
- Alerter — Sends notifications when prices change significantly
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Scheduler│────▶│ Scraper │────▶│ Database │────▶│ Alerter │
│ (cron) │ │(FineData)│ │ (SQLite) │ │ (email) │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
Let’s build each one.
Step 1: Set Up the Project
Create a project directory and install dependencies:
mkdir price-monitor && cd price-monitor
pip install requests beautifulsoup4 schedule
Create a configuration file that defines what products to track:
# config.py
FINEDATA_API_KEY = "fd_your_api_key"
FINEDATA_URL = "https://api.finedata.ai/api/v1/scrape"
DATABASE_PATH = "prices.db"
# Alert settings
ALERT_EMAIL = "you@example.com"
SMTP_HOST = "smtp.gmail.com"
SMTP_PORT = 587
SMTP_USER = "your-email@gmail.com"
SMTP_PASSWORD = "your-app-password"
# Price change threshold for alerts (percentage)
PRICE_CHANGE_THRESHOLD = 5.0
# Products to monitor
PRODUCTS = [
{
"name": "Sony WH-1000XM5",
"url": "https://www.amazon.com/dp/B0BSHF7WHW",
"selector": ".a-price .a-offscreen",
"site": "amazon"
},
{
"name": "MacBook Air M3",
"url": "https://www.bestbuy.com/site/macbook-air-13-inch/6565837.p",
"selector": ".priceView-customer-price span",
"site": "bestbuy"
},
{
"name": "Nike Air Max 90",
"url": "https://www.nike.com/t/air-max-90-mens-shoes-abc123",
"selector": '[data-test="product-price"]',
"site": "nike"
}
]
Step 2: Build the Database Layer
We’ll use SQLite for simplicity — it needs zero configuration and works great for monitoring a few hundred products.
# database.py
import sqlite3
from datetime import datetime
from config import DATABASE_PATH
def init_db():
"""Initialize the database with required tables."""
conn = sqlite3.connect(DATABASE_PATH)
conn.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
url TEXT NOT NULL UNIQUE,
site TEXT,
selector TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS prices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER NOT NULL,
price REAL,
currency TEXT DEFAULT 'USD',
available BOOLEAN DEFAULT 1,
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products(id)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_prices_product_date
ON prices(product_id, checked_at)
""")
conn.commit()
return conn
def add_product(conn, name, url, site, selector):
"""Add a product to track."""
conn.execute(
"INSERT OR IGNORE INTO products (name, url, site, selector) "
"VALUES (?, ?, ?, ?)",
(name, url, site, selector)
)
conn.commit()
def get_products(conn):
"""Get all tracked products."""
cursor = conn.execute(
"SELECT id, name, url, site, selector FROM products"
)
columns = ["id", "name", "url", "site", "selector"]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def record_price(conn, product_id, price, available=True):
"""Record a price check result."""
conn.execute(
"INSERT INTO prices (product_id, price, available) VALUES (?, ?, ?)",
(product_id, price, available)
)
conn.commit()
def get_latest_price(conn, product_id):
"""Get the most recent price for a product."""
cursor = conn.execute(
"SELECT price, checked_at FROM prices "
"WHERE product_id = ? ORDER BY checked_at DESC LIMIT 1",
(product_id,)
)
row = cursor.fetchone()
return {"price": row[0], "checked_at": row[1]} if row else None
def get_price_history(conn, product_id, days=30):
"""Get price history for a product over the last N days."""
cursor = conn.execute(
"SELECT price, checked_at FROM prices "
"WHERE product_id = ? "
"AND checked_at >= datetime('now', ?) "
"ORDER BY checked_at",
(product_id, f"-{days} days")
)
return [{"price": row[0], "checked_at": row[1]} for row in cursor]
Step 3: Build the Scraper
The scraper uses FineData to fetch product pages and extract prices. Different sites need different parsing logic:
# scraper.py
import re
import requests
from bs4 import BeautifulSoup
from config import FINEDATA_API_KEY, FINEDATA_URL
def fetch_page(url, site):
"""Fetch a product page through FineData."""
# Different sites need different settings
settings = {
"amazon": {
"use_js_render": True,
"use_residential": True,
"tls_profile": "chrome124"
},
"bestbuy": {
"use_js_render": True,
"tls_profile": "chrome124"
},
"nike": {
"use_js_render": True,
"tls_profile": "chrome124"
},
"default": {
"use_js_render": False,
"tls_profile": "chrome124"
}
}
site_settings = settings.get(site, settings["default"])
response = requests.post(
FINEDATA_URL,
headers={
"x-api-key": FINEDATA_API_KEY,
"Content-Type": "application/json"
},
json={
"url": url,
"timeout": 30,
**site_settings
}
)
response.raise_for_status()
return response.json()
def extract_price(html, selector):
"""Extract price from HTML using a CSS selector."""
soup = BeautifulSoup(html, "html.parser")
price_el = soup.select_one(selector)
if not price_el:
return None
price_text = price_el.get_text(strip=True)
# Clean the price string: remove currency symbols, commas, etc.
# Handle formats like "$1,299.99", "1.299,99 €", "£49.99"
price_text = re.sub(r"[^\d.,]", "", price_text)
# Handle European format (1.299,99) vs US format (1,299.99)
if "," in price_text and "." in price_text:
if price_text.index(",") > price_text.index("."):
# European: 1.299,99
price_text = price_text.replace(".", "").replace(",", ".")
else:
# US: 1,299.99
price_text = price_text.replace(",", "")
elif "," in price_text:
# Could be "1,299" or "29,99"
parts = price_text.split(",")
if len(parts[-1]) == 2:
price_text = price_text.replace(",", ".")
else:
price_text = price_text.replace(",", "")
try:
return float(price_text)
except ValueError:
return None
def check_price(product):
"""Check the current price of a product."""
try:
data = fetch_page(product["url"], product["site"])
html = data["body"]
price = extract_price(html, product["selector"])
return {
"price": price,
"available": price is not None,
"error": None
}
except Exception as e:
return {
"price": None,
"available": False,
"error": str(e)
}
Step 4: Build the Alert System
When prices change significantly, we want to know about it. Here’s an email alerter:
# alerter.py
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from config import (
ALERT_EMAIL, SMTP_HOST, SMTP_PORT,
SMTP_USER, SMTP_PASSWORD, PRICE_CHANGE_THRESHOLD
)
def should_alert(old_price, new_price):
"""Determine if a price change warrants an alert."""
if old_price is None or new_price is None:
return False
if old_price == 0:
return True
pct_change = ((new_price - old_price) / old_price) * 100
return abs(pct_change) >= PRICE_CHANGE_THRESHOLD
def send_price_alert(product_name, old_price, new_price, url):
"""Send an email alert about a price change."""
pct_change = ((new_price - old_price) / old_price) * 100
direction = "dropped" if pct_change < 0 else "increased"
subject = (
f"Price Alert: {product_name} {direction} "
f"{abs(pct_change):.1f}%"
)
body = f"""
<h2>Price Change Detected</h2>
<p><strong>{product_name}</strong></p>
<table>
<tr><td>Previous Price:</td><td>${old_price:.2f}</td></tr>
<tr><td>Current Price:</td><td>${new_price:.2f}</td></tr>
<tr><td>Change:</td><td>{pct_change:+.1f}%</td></tr>
</table>
<p><a href="{url}">View Product</a></p>
"""
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = SMTP_USER
msg["To"] = ALERT_EMAIL
msg.attach(MIMEText(body, "html"))
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.sendmail(SMTP_USER, ALERT_EMAIL, msg.as_string())
print(f" Alert sent for {product_name}")
except Exception as e:
print(f" Failed to send alert: {e}")
Step 5: Tie It All Together
Now let’s build the main monitoring loop:
# monitor.py
import time
from datetime import datetime
from config import PRODUCTS
from database import (
init_db, add_product, get_products,
record_price, get_latest_price
)
from scraper import check_price
from alerter import should_alert, send_price_alert
def setup():
"""Initialize database and seed products."""
conn = init_db()
for product in PRODUCTS:
add_product(
conn,
product["name"],
product["url"],
product["site"],
product["selector"]
)
return conn
def run_price_check(conn):
"""Check prices for all tracked products."""
products = get_products(conn)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n--- Price Check: {timestamp} ---")
for product in products:
result = check_price(product)
if result["error"]:
print(f" ERROR: {product['name']} — {result['error']}")
continue
new_price = result["price"]
previous = get_latest_price(conn, product["id"])
# Record the new price
record_price(
conn,
product["id"],
new_price,
result["available"]
)
if new_price is None:
print(f" {product['name']} — price not found (may be OOS)")
continue
if previous and previous["price"]:
old_price = previous["price"]
change = new_price - old_price
pct = (change / old_price) * 100 if old_price else 0
print(
f" {product['name']}: ${new_price:.2f} "
f"(was ${old_price:.2f}, {pct:+.1f}%)"
)
if should_alert(old_price, new_price):
send_price_alert(
product["name"],
old_price,
new_price,
product["url"]
)
else:
print(f" {product['name']}: ${new_price:.2f} (first check)")
# Delay between products to be polite
time.sleep(2)
def main():
"""Run the price monitor on a schedule."""
conn = setup()
# Run immediately on startup
run_price_check(conn)
# Then run every 6 hours
import schedule
schedule.every(6).hours.do(run_price_check, conn)
print("\nPrice monitor running. Checking every 6 hours...")
while True:
schedule.run_pending()
time.sleep(60)
if __name__ == "__main__":
main()
Step 6: Adding Price History Analysis
Once you’ve collected price data, you can analyze trends:
# analytics.py
from database import init_db, get_products, get_price_history
def analyze_prices(days=30):
"""Generate a price analysis report."""
conn = init_db()
products = get_products(conn)
print(f"\n=== Price Analysis (Last {days} Days) ===\n")
for product in products:
history = get_price_history(conn, product["id"], days)
if not history:
print(f"{product['name']}: No data")
continue
prices = [h["price"] for h in history if h["price"] is not None]
if not prices:
continue
current = prices[-1]
lowest = min(prices)
highest = max(prices)
average = sum(prices) / len(prices)
print(f"{product['name']}:")
print(f" Current: ${current:.2f}")
print(f" Lowest: ${lowest:.2f}")
print(f" Highest: ${highest:.2f}")
print(f" Average: ${average:.2f}")
print(f" Checks: {len(prices)}")
if current == lowest:
print(f" >>> AT LOWEST PRICE")
elif current <= average * 0.95:
print(f" >>> BELOW AVERAGE (good time to buy)")
print()
if __name__ == "__main__":
analyze_prices()
Running as a Background Service
For production use, run the monitor as a systemd service or use cron. Here’s a cron approach:
# Run price checks every 6 hours
# Edit crontab: crontab -e
0 */6 * * * cd /path/to/price-monitor && python -c "from monitor import setup, run_price_check; conn = setup(); run_price_check(conn)"
Or create a simple systemd service:
# /etc/systemd/system/price-monitor.service
[Unit]
Description=Price Monitor
After=network.target
[Service]
Type=simple
User=ubuntu
WorkingDirectory=/path/to/price-monitor
ExecStart=/usr/bin/python3 monitor.py
Restart=on-failure
RestartSec=60
[Install]
WantedBy=multi-user.target
Scaling Considerations
This tutorial uses SQLite and runs on a single machine. For monitoring hundreds or thousands of products:
- Database: Switch to PostgreSQL for concurrent writes and better query performance
- Queue: Use Redis + Celery to distribute scraping work across multiple workers
- Batch scraping: Use FineData’s batch endpoint to scrape up to 100 URLs in parallel
- Storage: Archive old price data to keep the active database fast
- Dashboards: Build a simple web interface with Flask or Streamlit to visualize price trends
Token Cost Estimation
For a typical price monitoring setup:
| Scenario | Products | Checks/Day | Tokens/Check | Daily Cost |
|---|---|---|---|---|
| Personal | 20 | 4 | 6 | 480 tokens |
| Small biz | 200 | 4 | 6 | 4,800 tokens |
| Enterprise | 2,000 | 6 | 6 | 72,000 tokens |
The 6 tokens/check assumes JS rendering (5) + base (1). Add 3 more if the site requires residential proxies.
Key Takeaways
- A price monitoring tool has four components: scraper, database, scheduler, and alerter.
- Use FineData’s API for reliable scraping across different retailer sites, each with their own anti-bot protections.
- SQLite is sufficient for monitoring up to a few hundred products; switch to PostgreSQL for larger scale.
- Set meaningful alert thresholds — a 5% change catches significant price drops without drowning you in noise.
- Cache and delay between requests to minimize API token usage and avoid triggering anti-bot systems.
- Run the monitor as a background service using cron or systemd for hands-off operation.
Want to scrape individual retailers more effectively? Check out our Amazon scraping guide for site-specific parsing tips, or explore our use cases for more real-world applications.
Related Articles
Free No-Code Web Scraper: Extract Data Without Writing Code
How to use no-code web scrapers to extract structured data from websites. Tools, workflows, and practical limitations for non-developers.
TutorialHow to Scrape Dynamic Job Listings with Authentication in 2026
Learn how to scrape job portals with login requirements using FineData API, including session handling and secure credential management.
TutorialHow to Scrape Job Postings with Dynamic Filters Using FineData API
Step-by-step guide to extract job listings from career sites with dynamic filters using FineData's API and Playwright rendering.