from __future__ import annotations from datetime import datetime, date, timedelta from zoneinfo import ZoneInfo from typing import Optional from utils.logging import function_logger from utils.time import WARSAW_TZ, end_of_business_day_utc def is_day_fully_ingested(scraper, business_day: date, tz: ZoneInfo) -> bool: """ Decide if 'business_day' is fully present in DB based on scraper.last_entry(). Assumes that a fully ingested day ends at local midnight of D+1 in the DB. """ log = function_logger() last: Optional[datetime] = scraper.last_entry() if not last: log.info("No last entry found for scrapper") return False log.debug("Last entry: {}".format(last)) # Normalize to UTC in case session TimeZone differs assert last.tzinfo is not None, "last must be tz-aware" needed_end_utc = end_of_business_day_utc(business_day, tz) return last >= needed_end_utc def maybe_ingest_for_next_day( scraper, now_local: datetime, tz: ZoneInfo ) -> int: """ If the publication time for tomorrow has passed (with grace), and tomorrow is not in DB yet, call ingest_day(tomorrow). Return the number of rows affected by the ingestion. """ log = function_logger() log.debug("Start") assert now_local.tzinfo is not None, "now_local must be tz-aware (Europe/Warsaw)." today = now_local.date() # Publication happens today for the next business day target_day = today + timedelta(days=1) log.info("Target day = {}".format(target_day)) if is_day_fully_ingested(scraper, target_day, tz): log.info("Publication already happend, we got the data already, skipping") return 0 # Ingest the next business day log.info("Target day is NOT fully ingested, trying to read {}".format(target_day)) ingested_rows = scraper.ingest_day(target_day) log.debug("Ingested rows = {}".format(ingested_rows)) return ingested_rows def date_run_tick( scraper, now_local: datetime ) -> int: log = function_logger() # 1) Try to ingest the next business day after each con figured window affected_next = 0 affected_next += maybe_ingest_for_next_day(scraper, now_local, WARSAW_TZ) log.info(f"Hourly tick ended for {now_local} with {affected_next} affected rows") return affected_next def hourly_tick( scraper ) -> int: """ Run this once per hour. - Backfills if needed (once on startup or when the DB lags). - Attempts ingestion for the next business day after each publication window. Returns (affected_from_backfill, affected_from_next_day). """ log = function_logger() now_local = datetime.now(WARSAW_TZ) log.info(f"Hourly tick started for {now_local}") if now_local.time().hour < 11: log.info("Hourly tick skipped, too early for update") return 0 return date_run_tick(scraper, now_local)