ranczo-energy-price-scrapers/Scraper/PstrykScraper.py
Bartosz Wieczorek afbe6b564a refactor
2025-09-03 10:58:40 +02:00

115 lines
4.5 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta, date, timezone
from typing import List, Tuple, Dict, Any
import os
import requests
from EnergyPriceScraper import EnergyPriceScraperBase
from utils.logging import HasLogger
from utils.time import UTC, WARSAW_TZ
class PstrykScraper(EnergyPriceScraperBase, HasLogger):
"""
Szablon: ceny publikowane przez sprzedawcę (Pstryk).
Załóż: Bearer token w ENV PSTRYK_TOKEN, endpoint w ENV PSTRYK_API_BASE, np.:
PSTRYK_API_BASE=https://api.pstryk.example.com
Endpoint (przykład): GET /prices?date=YYYY-MM-DD
-> [{"ts":"2025-08-27T00:00:00+02:00","net_pln_kwh":0.44}, ...]
"""
api_base: str
token: str
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.PROVIDER = "PSTRYK"
self.KIND = "market_price"
self.SIDE = "buy"
self.BUYER = "end_user"
self.SELLER = "PSTRYK"
self.init_logger()
self.api_base = os.getenv("PSTRYK_API_BASE", "https://api.pstryk.pl/").rstrip("/")
self.token = os.getenv("PSTRYK_TOKEN", "sk-QLX1AHLF83X15VWPYRUD5G87BK8DBF0SS9XLWQ8R")
if not self.api_base or not self.token:
raise RuntimeError("Ustaw PSTRYK_API_BASE i PSTRYK_TOKEN w środowisku.")
self.session = requests.Session()
self.session.headers.update({
"accept": "application/json",
"Authorization": f"{self.token}",
"user-agent": "energy-scraper/1.0",
})
self.log.debug("Initializing PSTRYK Done")
def fetch_range(self, start_date: datetime, end_date: datetime) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
assert start_date < end_date
assert start_date.tzinfo is not None
assert end_date.tzinfo is not None
assert start_date.tzinfo == UTC
assert end_date.tzinfo == UTC
url = f"{self.api_base}/integrations/pricing"
self.log.debug(f"Fetching url: {url}")
self.log.info(f"Fetching range: [{start_date}, {end_date}) UTC / [{start_date.astimezone(WARSAW_TZ)}, {end_date.astimezone(WARSAW_TZ)}) Europe/Warsaw")
r = self.session.get(url, params=
{
"resolution": "hour",
"window_start":start_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
"window_end": end_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
}, timeout=30)
r.raise_for_status()
data = r.json()
out: List[Tuple[datetime, datetime, float, Dict[str, Any]]] = []
self.log.debug(f"Fetched {len(data['frames'])} data frames for [{start_date}, {end_date}) UTC")
for frame in data['frames']:
row = self.parse_generic_price_frame(frame)
if row is not None:
out.append(row)
return out
def fetch_day(self, business_day: date, tz: timezone) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
start = datetime.combine(business_day, datetime.min.time(), tzinfo=tz).astimezone(UTC)
end = start + timedelta(days=1)
return self.fetch_range(start, end)
def parse_generic_price_frame(self, rec: dict):
"""
Wejście (przykład):
{'start':'2025-09-01T00:00:00+00:00','end':'2025-09-01T01:00:00+00:00',
'price_net':0.37,'price_gross':0.65,'is_cheap':True,'is_expensive':False}
Wyjście:
(ts_start_utc, ts_end_utc, price_pln_kwh_net, meta)
"""
if rec.get("is_cheap") is None or rec.get("is_expensive") is None:
self.log.info(f"Ignoring non-valid price frame {rec}")
return None
try:
ts_start = datetime.fromisoformat(rec["start"]).astimezone(UTC)
ts_end = datetime.fromisoformat(rec["end"]).astimezone(UTC)
except Exception as e:
raise ValueError(f"Bad iso timeformat in 'start'/'end': {e}") from e
if ts_end <= ts_start:
raise ValueError(f"Bad range: start={ts_start.isoformat()} end={ts_end.isoformat()}")
try:
price_pln_kwh_net = float(rec["price_net"])
except Exception as e:
raise ValueError(f"Price net not available 'price_net': {e}") from e
meta = {
"unit": "PLN/kWh",
"taxes_included": False,
"is_cheap": bool(rec.get("is_cheap") ),
"is_expensive": bool(rec.get("is_expensive"))
}
return ts_start, ts_end, price_pln_kwh_net, meta