ranczo-energy-usage-scrapers/EnergyPriceScraper.py
Bartosz Wieczorek 166d64d51e init
2025-09-02 18:14:05 +02:00

164 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, date, timedelta, timezone
from typing import Iterable, List, Tuple, Dict, Any, Optional
from zoneinfo import ZoneInfo
import json
import psycopg
from utils.time_helpers import WARSAW_TZ
IntervalRow = Tuple[datetime, datetime, float, str, str, str, str, str, str] # patrz _rows_to_upsert
UPSERT_SQL = """
INSERT INTO pricing.energy_prices
(ts_start, ts_end, price_pln_net, provider, kind, side, buyer, seller, source_meta)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (ts_start, ts_end, provider, kind, side)
DO UPDATE SET
price_pln_net = EXCLUDED.price_pln_net,
buyer = EXCLUDED.buyer,
seller = EXCLUDED.seller,
source_meta = COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb)
|| COALESCE(EXCLUDED.source_meta, '{}'::jsonb),
inserted_at = now()
WHERE
pricing.energy_prices.price_pln_net IS DISTINCT FROM EXCLUDED.price_pln_net
OR pricing.energy_prices.buyer IS DISTINCT FROM EXCLUDED.buyer
OR pricing.energy_prices.seller IS DISTINCT FROM EXCLUDED.seller
OR (COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb)
|| COALESCE(EXCLUDED.source_meta, '{}'::jsonb))
IS DISTINCT FROM COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb)
RETURNING 1 AS affected, (xmax <> 0) AS was_update;"""
SELECT_LAST = """
select ts_end from pricing.energy_prices ep
where
ep.provider = %s
and ep.kind = %s
and ep.side = %s::price_side
and ep.buyer = %s
and ep.seller = %s
order by ts_start desc
limit 1
"""
@dataclass
class EnergyPriceScraperBase:
"""Bazowa klasa dla scraperów rynkowych (zbieranie → normalizacja → UPSERT)."""
dsn: Optional[str] = None
conn: Optional[psycopg.Connection] = None
tz: ZoneInfo = WARSAW_TZ
period: timedelta = timedelta(hours=1)
# identyfikatory NADPISZ w podklasie:
PROVIDER: str = "" # np. 'PSE' / 'instrat' / 'PSTRYK'
KIND: str = "" # np. 'rce' / 'fixing_I' / 'market_price'
SIDE: str = "" # 'buy'|'sell'
BUYER: str = "" # 'end_user'
SELLER: str = "" # 'market_index'
log = None
# throttling/retry
max_retries: int = 3
backoff_sec: float = 1.0
# ---------- public API ----------
def provider(self) -> str:
if not self.PROVIDER:
raise NotImplementedError("Subclass must define PROVIDER")
return self.PROVIDER
def kind(self) -> str:
if not self.KIND:
raise NotImplementedError("Subclass must define KIND")
return self.KIND
def side(self) -> str:
if not self.SIDE:
raise NotImplementedError("Subclass must define SIDE")
return self.SIDE
def buyer(self) -> str:
if not self.BUYER:
raise NotImplementedError("Subclass must define BUYER")
return self.BUYER
def seller(self) -> str:
if not self.SELLER:
raise NotImplementedError("Subclass must define SELLER")
return self.SELLER
# ---------- public API ----------
def ingest_day(self, business_day: date) -> int:
"""Pobiera i zapisuje całą dobę [00:00, 24:00) lokalnie. Zwraca liczbę upsertowanych wierszy."""
self.log.info("Ingesting day {}".format(business_day))
points = self.fetch_day(business_day, WARSAW_TZ)
rows = self._rows_to_upsert(points)
self.log.debug("{} rows inserted".format(len(rows)))
return self._upsert(rows)
def last_entry(self) -> Optional[datetime]:
self.log.info("Retrieving last entry")
with self._ensure_conn().cursor() as cur:
params = (self.provider(), self.kind(), self.side(), self.buyer(), self.seller())
cur.execute(SELECT_LAST, params)
res = cur.fetchone() # None if no change, tuple if insert/update
if res is not None:
self.log.debug("last entry {}".format(res))
return res[0]
self.log.debug("No last entry")
return None
def fetch_day(self, business_day: date, tz: timezone) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
"""Zaimplementuj w podklasie. Zwracaj listę punktów z NETTO PLN/kWh."""
raise NotImplementedError
# ---------- helpers ----------
def _ensure_conn(self) -> psycopg.Connection:
if self.conn:
return self.conn
if not self.dsn:
raise RuntimeError("Podaj dsn= lub conn= dla PriceScraperBase")
self.conn = psycopg.connect(self.dsn)
return self.conn
def _rows_to_upsert(self, points: Iterable[Tuple[datetime, datetime, float, Dict[str, Any]]]) -> List[IntervalRow]:
rows: List[IntervalRow] = []
self.log.debug("Creating upsert rows from {} points".format(len(points)))
for ts_start, ts_end, price_pln_kwh_net, meta in points:
# force timezones
assert(ts_start.tzname() is not None)
assert(ts_end.tzname() is not None)
rows.append((
ts_start, ts_end, float(price_pln_kwh_net),
self.provider(), self.kind(), self.side(), self.buyer(), self.seller(),
json.dumps(meta or {})
))
return rows
def _upsert(self, rows: list[IntervalRow]) -> int:
if not rows:
return 0
affected = 0
updated = 0
self.log.info("Upserting %d rows ", len(rows))
with self._ensure_conn().cursor() as cur:
for r in rows:
cur.execute(UPSERT_SQL, r)
res = cur.fetchone() # None if no change, tuple if insert/update
if res is not None:
affected += res[0] # res[0] == 1
updated += int(bool(res[1])) # was_update -> 1/0
self._ensure_conn().commit()
self.log.info("Affected=%d (updated=%d, inserted=%d)", affected, updated, affected - updated)
return affected
def _day_range(self, d: date) -> Tuple[datetime, datetime]:
start = datetime(d.year, d.month, d.day, 0, 0, tzinfo=self.tz)
return start, start + timedelta(days=1)