164 lines
6.2 KiB
Python
164 lines
6.2 KiB
Python
from __future__ import annotations
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, date, timedelta, timezone
|
||
from typing import Iterable, List, Tuple, Dict, Any, Optional
|
||
from zoneinfo import ZoneInfo
|
||
import json
|
||
import psycopg
|
||
from utils.time import WARSAW_TZ
|
||
|
||
IntervalRow = Tuple[datetime, datetime, float, str, str, str, str, str, str] # patrz _rows_to_upsert
|
||
|
||
UPSERT_SQL = """
|
||
INSERT INTO pricing.energy_prices
|
||
(ts_start, ts_end, price_pln_net, provider, kind, side, buyer, seller, source_meta)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
||
ON CONFLICT (ts_start, ts_end, provider, kind, side)
|
||
DO UPDATE SET
|
||
price_pln_net = EXCLUDED.price_pln_net,
|
||
buyer = EXCLUDED.buyer,
|
||
seller = EXCLUDED.seller,
|
||
source_meta = COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb)
|
||
|| COALESCE(EXCLUDED.source_meta, '{}'::jsonb),
|
||
inserted_at = now()
|
||
WHERE
|
||
pricing.energy_prices.price_pln_net IS DISTINCT FROM EXCLUDED.price_pln_net
|
||
OR pricing.energy_prices.buyer IS DISTINCT FROM EXCLUDED.buyer
|
||
OR pricing.energy_prices.seller IS DISTINCT FROM EXCLUDED.seller
|
||
OR (COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb)
|
||
|| COALESCE(EXCLUDED.source_meta, '{}'::jsonb))
|
||
IS DISTINCT FROM COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb)
|
||
RETURNING 1 AS affected, (xmax <> 0) AS was_update;"""
|
||
|
||
SELECT_LAST = """
|
||
select ts_end from pricing.energy_prices ep
|
||
where
|
||
ep.provider = %s
|
||
and ep.kind = %s
|
||
and ep.side = %s::price_side
|
||
and ep.buyer = %s
|
||
and ep.seller = %s
|
||
order by ts_start desc
|
||
limit 1
|
||
"""
|
||
|
||
@dataclass
|
||
class EnergyPriceScraperBase:
|
||
"""Bazowa klasa dla scraperów rynkowych (zbieranie → normalizacja → UPSERT)."""
|
||
dsn: Optional[str] = None
|
||
conn: Optional[psycopg.Connection] = None
|
||
tz: ZoneInfo = WARSAW_TZ
|
||
period: timedelta = timedelta(hours=1)
|
||
# identyfikatory – NADPISZ w podklasie:
|
||
PROVIDER: str = "" # np. 'PSE' / 'instrat' / 'PSTRYK'
|
||
KIND: str = "" # np. 'rce' / 'fixing_I' / 'market_price'
|
||
SIDE: str = "" # 'buy'|'sell'
|
||
BUYER: str = "" # 'end_user'
|
||
SELLER: str = "" # 'market_index'
|
||
log = None
|
||
|
||
# throttling/retry
|
||
max_retries: int = 3
|
||
backoff_sec: float = 1.0
|
||
|
||
# ---------- public API ----------
|
||
def provider(self) -> str:
|
||
if not self.PROVIDER:
|
||
raise NotImplementedError("Subclass must define PROVIDER")
|
||
return self.PROVIDER
|
||
|
||
def kind(self) -> str:
|
||
if not self.KIND:
|
||
raise NotImplementedError("Subclass must define KIND")
|
||
return self.KIND
|
||
|
||
def side(self) -> str:
|
||
if not self.SIDE:
|
||
raise NotImplementedError("Subclass must define SIDE")
|
||
return self.SIDE
|
||
|
||
def buyer(self) -> str:
|
||
if not self.BUYER:
|
||
raise NotImplementedError("Subclass must define BUYER")
|
||
return self.BUYER
|
||
|
||
def seller(self) -> str:
|
||
if not self.SELLER:
|
||
raise NotImplementedError("Subclass must define SELLER")
|
||
return self.SELLER
|
||
|
||
# ---------- public API ----------
|
||
def ingest_day(self, business_day: date) -> int:
|
||
"""Pobiera i zapisuje całą dobę [00:00, 24:00) lokalnie. Zwraca liczbę upsertowanych wierszy."""
|
||
self.log.info("Ingesting day {}".format(business_day))
|
||
points = self.fetch_day(business_day, WARSAW_TZ)
|
||
rows = self._rows_to_upsert(points)
|
||
self.log.debug("{} rows inserted".format(len(rows)))
|
||
return self._upsert(rows)
|
||
|
||
def last_entry(self) -> Optional[datetime]:
|
||
self.log.info("Retrieving last entry")
|
||
with self._ensure_conn().cursor() as cur:
|
||
params = (self.provider(), self.kind(), self.side(), self.buyer(), self.seller())
|
||
cur.execute(SELECT_LAST, params)
|
||
res = cur.fetchone() # None if no change, tuple if insert/update
|
||
if res is not None:
|
||
self.log.debug("last entry {}".format(res))
|
||
return res[0]
|
||
|
||
self.log.debug("No last entry")
|
||
return None
|
||
|
||
def fetch_day(self, business_day: date, tz: timezone) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
|
||
"""Zaimplementuj w podklasie. Zwracaj listę punktów z NETTO PLN/kWh."""
|
||
raise NotImplementedError
|
||
|
||
# ---------- helpers ----------
|
||
def _ensure_conn(self) -> psycopg.Connection:
|
||
if self.conn:
|
||
return self.conn
|
||
if not self.dsn:
|
||
raise RuntimeError("Podaj dsn= lub conn= dla PriceScraperBase")
|
||
self.conn = psycopg.connect(self.dsn)
|
||
return self.conn
|
||
|
||
def _rows_to_upsert(self, points: Iterable[Tuple[datetime, datetime, float, Dict[str, Any]]]) -> List[IntervalRow]:
|
||
rows: List[IntervalRow] = []
|
||
self.log.debug("Creating upsert rows from {} points".format(len(points)))
|
||
for ts_start, ts_end, price_pln_kwh_net, meta in points:
|
||
# force timezones
|
||
assert(ts_start.tzname() is not None)
|
||
assert(ts_end.tzname() is not None)
|
||
|
||
rows.append((
|
||
ts_start, ts_end, float(price_pln_kwh_net),
|
||
self.provider(), self.kind(), self.side(), self.buyer(), self.seller(),
|
||
json.dumps(meta or {})
|
||
))
|
||
return rows
|
||
|
||
def _upsert(self, rows: list[IntervalRow]) -> int:
|
||
if not rows:
|
||
return 0
|
||
|
||
affected = 0
|
||
updated = 0
|
||
|
||
self.log.info("Upserting %d rows ", len(rows))
|
||
with self._ensure_conn().cursor() as cur:
|
||
for r in rows:
|
||
cur.execute(UPSERT_SQL, r)
|
||
res = cur.fetchone() # None if no change, tuple if insert/update
|
||
if res is not None:
|
||
affected += res[0] # res[0] == 1
|
||
updated += int(bool(res[1])) # was_update -> 1/0
|
||
|
||
self._ensure_conn().commit()
|
||
self.log.info("Affected=%d (updated=%d, inserted=%d)", affected, updated, affected - updated)
|
||
return affected
|
||
|
||
def _day_range(self, d: date) -> Tuple[datetime, datetime]:
|
||
start = datetime(d.year, d.month, d.day, 0, 0, tzinfo=self.tz)
|
||
return start, start + timedelta(days=1)
|
||
|