ranczo-energy-price-scrapers/Scraper/InstratRDN_CSVScraper.py
2025-09-02 07:28:08 +02:00

81 lines
3.5 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta, date
from typing import List, Tuple, Dict, Any
import pandas as pd
from EnergyPriceScraper import EnergyPriceScraperBase
from utils.time_helpers import WARSAW_TZ, UTC
# CSV pobrane z https://energy.instrat.pl/ceny/energia-rdn-godzinowe/
class InstratRDN_CSVScraper(EnergyPriceScraperBase):
"""
Przykładowy scraper RDN z CSV/JSON (public HTTP).
Oczekuje CSV z kolumnami: 'date', 'fixing_i_price' (PLN/MWh) lub już PLN/kWh.
"""
url: str
def __init__(self, path: str, **kwargs):
super().__init__(**kwargs)
self.PROVIDER = "instrat"
self.KIND = "fixing_I"
self.SIDE = "buy"
self.BUYER = "end_user" # sprzedawca rozliczajacy prosumenta
self.SELLER = "market_index"
self.data = self.load_instrat_csv(path)
def load_instrat_csv(self, path: str) -> pd.DataFrame:
"""
Wczytuje CSV Instrat z format
date,fixing_i_price,fixing_i_volume,fixing_ii_price,fixing_ii_volume
01.01.2016 00:00,108.27,2565.10,108.55,89.10
"""
# 1) Wczytanie z autodetekcją polskiego formatu
dateparse = lambda x: datetime.strptime(x, '%d.%m.%Y %H:%M').replace(tzinfo=WARSAW_TZ)
df = pd.read_csv(path, parse_dates=['date'], date_parser=dateparse)
fi_pln_kwh = (df["fixing_i_price"] / 1000.0).round(4)
fii_pln_kwh = (df["fixing_ii_price"] / 1000.0).round(4)
self.out = pd.DataFrame({
"fixing_i_pln_kwh": fi_pln_kwh.values,
"fixing_ii_pln_kwh": fii_pln_kwh.values,
"fixing_i_volume": pd.to_numeric(df.get("fixing_i_volume"), errors="coerce").values,
"fixing_ii_volume": pd.to_numeric(df.get("fixing_ii_volume"), errors="coerce").values,
}, index=df["date"]).sort_index()
# sanity check — nie wyszło pusto
if self.out[["fixing_i_pln_kwh", "fixing_ii_pln_kwh"]].notna().sum().sum() == 0:
raise RuntimeError("Brak cen po przeliczeniu — sprawdź separator/format liczb w CSV.")
def fetch_day(self, business_day: date) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
if not hasattr(self, "out"):
raise RuntimeError("Brak danych: najpierw wczytaj CSV i zbuduj self.out")
# wybór kolumny wg KIND (domyślnie Fixing I)
kind = getattr(self, "KIND", "fixing_I")
kind_norm = str(kind).replace(" ", "_").lower()
if "fixing_ii" in kind_norm:
col = "fixing_ii_pln_kwh"
fixing_tag = "II"
else:
col = "fixing_i_pln_kwh"
fixing_tag = "I"
# zakres doby lokalnej
day_start = datetime(business_day.year, business_day.month, business_day.day, 0, 0, tzinfo=self.tz)
day_end = day_start + timedelta(days=1)
# filtr i emisja punktów
df_day = self.out.loc[(self.out.index >= day_start) & (self.out.index < day_end)]
if col not in df_day.columns:
raise KeyError(f"Kolumna '{col}' nie istnieje w self.out")
points: List[Tuple[datetime, datetime, float, Dict[str, Any]]] = []
for ts, price in df_day[col].dropna().items():
ts_end = ts + getattr(self, "period", timedelta(hours=1))
points.append((ts.to_pydatetime().astimezone(UTC), ts_end.to_pydatetime().astimezone(UTC), float(price),
{"source": "instrat_csv", "unit": "PLN/kWh", "fixing": fixing_tag, "taxes_included": False}))
return points