from __future__ import annotations from datetime import datetime, timedelta, date, timezone from typing import List, Tuple, Dict, Any import os import requests from EnergyPriceScraper import EnergyPriceScraperBase from utils.logging import HasLogger from utils.time import UTC, WARSAW_TZ class PstrykScraper(EnergyPriceScraperBase, HasLogger): """ Szablon: ceny publikowane przez sprzedawcę (Pstryk). Załóż: Bearer token w ENV PSTRYK_TOKEN, endpoint w ENV PSTRYK_API_BASE, np.: PSTRYK_API_BASE=https://api.pstryk.example.com Endpoint (przykład): GET /prices?date=YYYY-MM-DD -> [{"ts":"2025-08-27T00:00:00+02:00","net_pln_kwh":0.44}, ...] """ api_base: str token: str def __init__(self, **kwargs): super().__init__(**kwargs) self.PROVIDER = "PSTRYK" self.KIND = "market_price" self.SIDE = "buy" self.BUYER = "end_user" self.SELLER = "PSTRYK" self.init_logger() self.api_base = os.getenv("PSTRYK_API_BASE", "https://api.pstryk.pl/").rstrip("/") self.token = os.getenv("PSTRYK_TOKEN", "sk-QLX1AHLF83X15VWPYRUD5G87BK8DBF0SS9XLWQ8R") if not self.api_base or not self.token: raise RuntimeError("Ustaw PSTRYK_API_BASE i PSTRYK_TOKEN w środowisku.") self.session = requests.Session() self.session.headers.update({ "accept": "application/json", "Authorization": f"{self.token}", "user-agent": "energy-scraper/1.0", }) self.log.debug("Initializing PSTRYK Done") def fetch_range(self, start_date: datetime, end_date: datetime) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]: assert start_date < end_date assert start_date.tzinfo is not None assert end_date.tzinfo is not None assert start_date.tzinfo == UTC assert end_date.tzinfo == UTC url = f"{self.api_base}/integrations/pricing" self.log.debug(f"Fetching url: {url}") self.log.info(f"Fetching range: [{start_date}, {end_date}) UTC / [{start_date.astimezone(WARSAW_TZ)}, {end_date.astimezone(WARSAW_TZ)}) Europe/Warsaw") r = self.session.get(url, params= { "resolution": "hour", "window_start":start_date.strftime("%Y-%m-%dT%H:%M:%SZ"), "window_end": end_date.strftime("%Y-%m-%dT%H:%M:%SZ"), }, timeout=30) r.raise_for_status() data = r.json() out: List[Tuple[datetime, datetime, float, Dict[str, Any]]] = [] self.log.debug(f"Fetched {len(data['frames'])} data frames for [{start_date}, {end_date}) UTC") for frame in data['frames']: row = self.parse_generic_price_frame(frame) if row is not None: out.append(row) return out def fetch_day(self, business_day: date, tz: timezone) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]: start = datetime.combine(business_day, datetime.min.time(), tzinfo=tz).astimezone(UTC) end = start + timedelta(days=1) return self.fetch_range(start, end) def parse_generic_price_frame(self, rec: dict): """ Wejście (przykład): {'start':'2025-09-01T00:00:00+00:00','end':'2025-09-01T01:00:00+00:00', 'price_net':0.37,'price_gross':0.65,'is_cheap':True,'is_expensive':False} Wyjście: (ts_start_utc, ts_end_utc, price_pln_kwh_net, meta) """ if rec.get("is_cheap") is None or rec.get("is_expensive") is None: self.log.info(f"Ignoring non-valid price frame {rec}") return None try: ts_start = datetime.fromisoformat(rec["start"]).astimezone(UTC) ts_end = datetime.fromisoformat(rec["end"]).astimezone(UTC) except Exception as e: raise ValueError(f"Bad iso timeformat in 'start'/'end': {e}") from e if ts_end <= ts_start: raise ValueError(f"Bad range: start={ts_start.isoformat()} end={ts_end.isoformat()}") try: price_pln_kwh_net = float(rec["price_net"]) except Exception as e: raise ValueError(f"Price net not available 'price_net': {e}") from e meta = { "unit": "PLN/kWh", "taxes_included": False, "is_cheap": bool(rec.get("is_cheap") ), "is_expensive": bool(rec.get("is_expensive")) } return ts_start, ts_end, price_pln_kwh_net, meta