ranczo-energy-price-scrapers/Scraper/PSE_RCEScraper.py
2025-08-28 14:15:42 +02:00

47 lines
1.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from datetime import datetime, timedelta, date
from typing import List, Tuple, Dict, Any
import requests
from EnergyPriceScraper import EnergyPriceScraperBase, WAW
class PSE_RCEScraper(EnergyPriceScraperBase):
"""
PSE RCE (PLN) godziny dla danej doby.
Zwraca NETTO PLN/kWh (jeżeli RCE jest w PLN/MWh, dzielimy przez 1000).
"""
PROVIDER = "PSE"
KIND = "rce"
SIDE = "sell"
BUYER = "reteiler" # sprzedawca rozliczajacy prosumenta
SELLER = "prosumer"
api_url: str = "https://api.raporty.pse.pl/api/rce-pln"
session: requests.Session
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.session = requests.Session()
self.session.headers.update({"accept": "application/json"})
def fetch_day(self, business_day: date) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
# RCE v2: filter by business_date, select rce_pln,dtime,period
params = {
"$select": "rce_pln,dtime,period",
"$filter": f"business_date eq '{business_day:%Y-%m-%d}'",
}
r = self.session.get(self.api_url, params=params, timeout=30)
r.raise_for_status()
data = r.json().get("value", [])
out: List[Tuple[datetime, datetime, float, Dict[str, Any]]] = []
for item in data:
# dtime to ISO; period (w minutach) bywa 60
ts = datetime.fromisoformat(item["dtime"]).astimezone(WAW)
per_min = int(item.get("period", 60))
ts_end = ts + timedelta(minutes=per_min)
price_pln_mwh = float(item["rce_pln"])
price_pln_kwh = price_pln_mwh / 1000.0 # NETTO PLN/kWh
out.append((ts, ts_end, price_pln_kwh, {"source": "PSE_RCE_v2"}))
return out