ranczo-energy-price-scrapers/EnergyPriceScraper.py
Bartosz Wieczorek 879191527c Add logger
2025-09-02 09:20:44 +02:00

175 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, date, timedelta
from typing import Iterable, List, Tuple, Dict, Any, Optional
from zoneinfo import ZoneInfo
import json
import psycopg
import time as _time
from utils.time_helpers import WARSAW_TZ
IntervalRow = Tuple[datetime, datetime, float, str, str, str, str, str, str] # patrz _rows_to_upsert
UPSERT_SQL = """
INSERT INTO pricing.energy_prices ep
(ts_start, ts_end, price_pln_net, provider, kind, side, buyer, seller, source_meta)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (ts_start, ts_end, provider, kind, side)
DO UPDATE SET
price_pln_net = EXCLUDED.price_pln_net,
buyer = EXCLUDED.buyer,
seller = EXCLUDED.seller,
source_meta = COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb)
|| COALESCE(EXCLUDED.source_meta, '{}'::jsonb),
inserted_at = now()
WHERE
ep.price_pln_net IS DISTINCT FROM EXCLUDED.price_pln_net
OR ep.buyer IS DISTINCT FROM EXCLUDED.buyer
OR ep.seller IS DISTINCT FROM EXCLUDED.seller
OR (COALESCE(ep.source_meta, '{}'::jsonb)
|| COALESCE(EXCLUDED.source_meta, '{}'::jsonb))
IS DISTINCT FROM COALESCE(ep.source_meta, '{}'::jsonb)
RETURNING 1 AS affected, (xmax <> 0) AS was_update;"""
SELECT_LAST = """
select ts_end from pricing.energy_prices ep
where
ep.provider = %s
and ep.kind = %s
and ep.side = %s::price_side
and ep.buyer = %s
and ep.seller = %s
order by ts_start desc
limit 1
"""
@dataclass
class EnergyPriceScraperBase:
"""Bazowa klasa dla scraperów rynkowych (zbieranie → normalizacja → UPSERT)."""
dsn: Optional[str] = None
conn: Optional[psycopg.Connection] = None
tz: ZoneInfo = WARSAW_TZ
period: timedelta = timedelta(hours=1)
# identyfikatory NADPISZ w podklasie:
PROVIDER: str = "" # np. 'PSE' / 'instrat' / 'PSTRYK'
KIND: str = "" # np. 'rce' / 'fixing_I' / 'market_price'
SIDE: str = "" # 'buy'|'sell'
BUYER: str = "" # 'end_user'
SELLER: str = "" # 'market_index'
log = None
# throttling/retry
max_retries: int = 3
backoff_sec: float = 1.0
# ---------- public API ----------
def provider(self) -> str:
if not self.PROVIDER:
raise NotImplementedError("Subclass must define PROVIDER")
return self.PROVIDER
def kind(self) -> str:
if not self.KIND:
raise NotImplementedError("Subclass must define KIND")
return self.KIND
def side(self) -> str:
if not self.SIDE:
raise NotImplementedError("Subclass must define SIDE")
return self.SIDE
def buyer(self) -> str:
if not self.BUYER:
raise NotImplementedError("Subclass must define BUYER")
return self.BUYER
def seller(self) -> str:
if not self.SELLER:
raise NotImplementedError("Subclass must define SELLER")
return self.SELLER
# ---------- public API ----------
def ingest_day(self, business_day: date) -> int:
"""Pobiera i zapisuje całą dobę [00:00, 24:00) lokalnie. Zwraca liczbę upsertowanych wierszy."""
self.log.info("Ingesting day {}".format(business_day))
points = self.fetch_day(business_day) # [(start,end,price,meta_dict), ...]
rows = self._rows_to_upsert(points)
self.log.debug("{} rows inserted".format(len(rows)))
return self._upsert(rows)
def ingest_range(self, start_day: date, end_day: date) -> int:
"""Backfill: [start_day, end_day] po dniach lokalnych."""
self.log.info("Ingesting range [{}, {})".format(start_day, end_day))
total = 0
d = start_day
while d <= end_day:
total += self.ingest_day(d)
d = d + timedelta(days=1)
return total
def last_entry(self) -> Optional[datetime]:
self.log.info("Retrieving last entry")
with self._ensure_conn().cursor() as cur:
params = (self.provider(), self.kind(), self.side(), self.buyer(), self.seller())
cur.execute(SELECT_LAST, params)
res = cur.fetchone() # None if no change, tuple if insert/update
if res is not None:
self.log.debug("last entry {}".format(res))
return res[0]
self.log.debug("No last entry")
return None
def fetch_day(self, business_day: date) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
"""Zaimplementuj w podklasie. Zwracaj listę punktów z NETTO PLN/kWh."""
raise NotImplementedError
# ---------- helpers ----------
def _ensure_conn(self) -> psycopg.Connection:
if self.conn:
return self.conn
if not self.dsn:
raise RuntimeError("Podaj dsn= lub conn= dla PriceScraperBase")
self.conn = psycopg.connect(self.dsn)
return self.conn
def _rows_to_upsert(self, points: Iterable[Tuple[datetime, datetime, float, Dict[str, Any]]]) -> List[IntervalRow]:
rows: List[IntervalRow] = []
self.log.debug("Creating upsert rows from {} points".format(len(points)))
for ts_start, ts_end, price_pln_kwh_net, meta in points:
# force timezones
assert(ts_start.tzname() is not None)
assert(ts_end.tzname() is not None)
rows.append((
ts_start, ts_end, float(price_pln_kwh_net),
self.provider(), self.kind(), self.side(), self.buyer(), self.seller(),
json.dumps(meta or {})
))
return rows
def _upsert(self, rows: list[IntervalRow]) -> int:
if not rows:
return 0
affected = 0
updated = 0
self.log.info("Upserting %d rows ", len(rows))
with self._ensure_conn().cursor() as cur:
for r in rows:
cur.execute(UPSERT_SQL, r)
res = cur.fetchone() # None if no change, tuple if insert/update
if res is not None:
affected += res[0] # res[0] == 1
updated += int(bool(res[1])) # was_update -> 1/0
self._ensure_conn().commit()
self.log.info("Affected=%d (updated=%d, inserted=%d)", affected, updated, affected - updated)
return affected
def _day_range(self, d: date) -> Tuple[datetime, datetime]:
start = datetime(d.year, d.month, d.day, 0, 0, tzinfo=self.tz)
return start, start + timedelta(days=1)