ranczo-energy-price-scrapers/app_impl.py
Bartosz Wieczorek 26d2ddae54 Add TGE scraper
2025-10-14 13:01:52 +02:00

86 lines
2.9 KiB
Python

from __future__ import annotations
from datetime import datetime, date, timedelta
from zoneinfo import ZoneInfo
from typing import Optional
from utils.logging import function_logger
from utils.time import WARSAW_TZ, end_of_business_day_utc
def is_day_fully_ingested(scraper, business_day: date, tz: ZoneInfo) -> bool:
"""
Decide if 'business_day' is fully present in DB based on scraper.last_entry().
Assumes that a fully ingested day ends at local midnight of D+1 in the DB.
"""
log = function_logger()
last: Optional[datetime] = scraper.last_entry()
if not last:
log.info("No last entry found for scrapper")
return False
log.debug("Last entry: {}".format(last))
# Normalize to UTC in case session TimeZone differs
assert last.tzinfo is not None, "last must be tz-aware"
needed_end_utc = end_of_business_day_utc(business_day, tz)
return last >= needed_end_utc
def maybe_ingest_for_next_day(
scraper,
now_local: datetime,
tz: ZoneInfo
) -> int:
"""
If the publication time for tomorrow has passed (with grace), and tomorrow is not in DB yet,
call ingest_day(tomorrow). Return the number of rows affected by the ingestion.
"""
log = function_logger()
log.debug("Start")
assert now_local.tzinfo is not None, "now_local must be tz-aware (Europe/Warsaw)."
today = now_local.date()
# Publication happens today for the next business day
target_day = today + timedelta(days=1)
log.info("Target day = {}".format(target_day))
if is_day_fully_ingested(scraper, target_day, tz):
log.info("Publication already happend, we got the data already, skipping")
return 0
# Ingest the next business day
log.info("Target day is NOT fully ingested, trying to read {}".format(target_day))
ingested_rows = scraper.ingest_day(target_day)
log.debug("Ingested rows = {}".format(ingested_rows))
return ingested_rows
def date_run_tick(
scraper,
now_local: datetime
) -> int:
log = function_logger()
# 1) Try to ingest the next business day after each con figured window
affected_next = 0
affected_next += maybe_ingest_for_next_day(scraper, now_local, WARSAW_TZ)
log.info(f"Hourly tick ended for {now_local} with {affected_next} affected rows")
return affected_next
def hourly_tick(
scraper
) -> int:
"""
Run this once per hour.
- Backfills if needed (once on startup or when the DB lags).
- Attempts ingestion for the next business day after each publication window.
Returns (affected_from_backfill, affected_from_next_day).
"""
log = function_logger()
now_local = datetime.now(WARSAW_TZ)
log.info(f"Hourly tick started for {now_local}")
if now_local.time().hour < 11:
log.info("Hourly tick skipped, too early for update")
return 0
return date_run_tick(scraper, now_local)