ranczo-energy-price-scrapers/Scraper/TGE_RDNScraper.py
Bartosz Wieczorek db9269f056 small fixes
2025-10-14 17:01:36 +02:00

279 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from utils.logging import HasLogger
from datetime import date, timedelta, datetime, timezone
from pathlib import Path
import requests
from urllib.parse import quote
import time
from EnergyPriceScraper import EnergyPriceScraperBase
CACHE_DIR='/var/cache/energy-price-scrapers'
def tge_rdn_report_url(delivery_day: date, folder: str = "A_SDAC 2025") -> str:
# Zwraca pełny URL do Excela dla doby dostawy.
# UWAGA: folder może zawierać spacje → kodujemy segment katalogu osobno.
y = delivery_day.year
m = f"{delivery_day.month:02d}"
d = f"{delivery_day.day:02d}"
folder_enc = quote(folder, safe="") # "A_SDAC 2025" -> "A_SDAC%202025"
filename = f"Raport_RDN_dzie_dostawy_delivery_day_{y}_{m}_{d}.xlsx"
return f"https://tge.pl/pub/TGE/{folder_enc}/RDN/{filename}"
def download_tge_rdn(delivery_day: date, out_dir: Path, folder_candidates=None, timeout=30, retries=1) -> Path:
"""
Próbuje pobrać raport dla 'delivery_day' testując:
a) domyślny folder 'A_SDAC 2025'
b) ewentualne warianty przekazane w folder_candidates
c) (opcjonalnie) wersję 'A_SDAC' bez roku jeśli podasz w folder_candidates
Zwraca ścieżkę do pobranego pliku w out_dir.
:param logger:
"""
out_dir.mkdir(parents=True, exist_ok=True)
if folder_candidates is None:
folder_candidates = ["A_SDAC 2025", "A_SDAC 2026"]
last_err: Optional[Exception] = None
for folder in folder_candidates:
url = tge_rdn_report_url(delivery_day, folder=folder)
filename = url.split("/")[-1]
dest = out_dir / filename
if dest.exists():
print(f"got {dest.name} exists, skipping")
return dest
for attempt in range(1, retries + 1):
try:
r = requests.get(url, timeout=timeout)
if r.status_code == 200 and r.content:
print(f"got {dest.name}")
dest.write_bytes(r.content)
return dest
else:
last_err = RuntimeError(f"HTTP {r.status_code} dla {url}")
except Exception as e:
last_err = e
time.sleep(min(2 ** (attempt - 1), 8))
# Jeśli wszystko zawiodło:
raise RuntimeError(f"Nie udało się pobrać raportu dla {delivery_day}: {last_err}")
def get_month():
start = datetime.now().date().replace(day=1)
while start <= datetime.now().date()+timedelta(days=1):
download_tge_rdn(start, out_dir=Path(f"{CACHE_DIR}/A_SDAC_2025"))
start = start + timedelta(days=1)
class TGE_RDNScraper(EnergyPriceScraperBase, HasLogger):
"""
Scraper TGE RDN 'Notowania w systemie kursu jednolitego / fixing' z pliku Excel.
Implementuje fetch_day i zwraca wartości NETTO PLN/kWh.
Parametry:
- root_dir: katalog, w którym znajdują się raporty.
- filename_template: wzorzec nazwy pliku; {yyyy}, {mm}, {dd} zostaną podmienione
datą doby dostawy. Domyślnie zgodny z przykładowym plikiem:
'Raport_RDN_dzie_dostawy_delivery_day_{yyyy}_{mm}_{dd}.xlsx'
- price_is_gross: czy ceny w pliku są BRUTTO (jeśli tak, zdejmujemy VAT).
- vat_rate: stawka VAT (np. 0.23). Używana tylko gdy price_is_gross=True.
- source_unit: jednostka cen w pliku (domyślnie 'PLN/MWh').
- hour_span_regexes: lista regexów do parsowania godzin (na wypadek różnych formatów).
Identyfikatory (dostosuj w razie potrzeby):
- PROVIDER='TGE'
- KIND='fixing'
- SIDE='sell'
- BUYER='end_user'
- SELLER='market_index'
"""
root_dir: Path = Path(f"{CACHE_DIR}/A_SDAC_2025")
filename_template: str = "Raport_RDN_dzie_dostawy_delivery_day_{yyyy}_{mm}_{dd}.xlsx"
price_is_gross: bool = False
vat_rate: float = 0.23
source_unit: str = "PLN/MWh"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.PROVIDER = "TGE"
self.KIND = "market_price"
self.SIDE = "buy"
self.BUYER = "end_user"
self.SELLER = "market_index"
self.init_logger()
get_month()
# ---------- narzędzia wewnętrzne ----------
@staticmethod
def _norm(s: Any) -> str:
s = "" if s is None else str(s)
s = s.replace("\xa0", " ")
s = re.sub(r"\s+", " ", s.strip()).lower()
return s
def _filename_for_day(self, business_day: date) -> Path:
fn = self.filename_template.format(
yyyy=f"{business_day.year:04d}",
mm=f"{business_day.month:02d}",
dd=f"{business_day.day:02d}",
)
return (self.root_dir / fn).resolve()
def _read_candidate_frames(self, xlsx_path: Path):
"""Generator: zwraca (sheet_name, header_row, df_normalized_columns) tylko dla arkusza 'WYNIKI'."""
xls = pd.ExcelFile(xlsx_path)
# znajdź 'WYNIKI' (case-insensitive)
target_sheet = None
for s in xls.sheet_names:
if self._norm(s) == "wyniki":
target_sheet = s
break
# jeśli nie znaleziono dokładnie, spróbuj zawierające 'wyniki'
if target_sheet is None:
for s in xls.sheet_names:
if "wyniki" in self._norm(s):
target_sheet = s
break
if target_sheet is None:
# brak arkusza WYNIKI nic nie zwracamy
return
for header in range(0, 8):
try:
df = pd.read_excel(xlsx_path, sheet_name=target_sheet, header=header)
except Exception:
continue
if df is None or df.empty:
continue
df = df.dropna(axis=1, how="all")
if df.empty:
continue
df.columns = [self._norm(c) for c in df.columns]
yield target_sheet, header, df
def _find_hour_col(self, df: pd.DataFrame) -> Optional[str]:
return df.columns[6] # kolumna I
def _find_price_col(self, df: pd.DataFrame) -> Optional[str]:
return df.columns[8] # kolumna K
@staticmethod
def _to_float(series: pd.Series) -> pd.Series:
s = series.astype(str)
s = s.str.replace("\xa0", " ").str.replace(" ", "", regex=False)
s = s.str.replace(",", ".", regex=False)
return pd.to_numeric(s, errors="coerce")
def _hour_start_from_label(self, label: str) -> Optional[int]:
t = str(label).strip()
matcher = re.compile(r"^\s*(\d{2})-(\d{2})-(\d{2})_H(\d{2})$")
m = matcher.match(t)
if not m:
return None
h = m.group(4)
if not h:
return None
return int(h)
def _ensure_net_pln_per_kwh(self, value: float) -> float:
"""Konwersja jednostek i VAT → NETTO PLN/kWh."""
if value is None:
return None # type: ignore
v = float(value)
# zdejmij VAT jeśli podano, że ceny są brutto
if self.price_is_gross and self.vat_rate and self.vat_rate > 0:
v = v / (1.0 + self.vat_rate)
# konwersja jednostek
unit = (self.source_unit or "").upper()
if unit in ("PLN/MWH", "PLN / MWH", "PLN MWH"):
v = v / 1000.0
elif unit in ("PLN/KWH", "PLN / KWH", "PLN KWH", "KWH/PLN"): # już OK
pass
else:
# Jeśli jednostka nieznana — przyjmujemy PLN/MWh i konwertujemy:
v = v / 1000.0
return v
# ---------- wymagane przez interfejs ----------
def fetch_day(
self, business_day: date, tz: timezone
) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
"""
Zwraca listę punktów dla doby 'business_day' w strefie 'tz':
(start_dt, end_dt, value_net_pln_kwh, metadata).
"""
src_path = self._filename_for_day(business_day)
if not src_path.exists():
raise FileNotFoundError(f"Nie znaleziono pliku: {src_path}")
chosen = None # (sheet, header, df, hour_col, price_col)
for sheet, header, df in self._read_candidate_frames(src_path):
hour_col = self._find_hour_col(df)
price_col = self._find_price_col(df)
if hour_col and price_col:
chosen = (sheet, header, df, hour_col, price_col)
break
if not chosen:
raise RuntimeError(
"Nie udało się znaleźć kolumn godzin i/lub cen (fixing) w żadnym arkuszu."
)
sheet, header, df, hour_col, price_col = chosen
# Ekstrakcja i czyszczenie
hours_raw = df[hour_col].astype(str).str.strip()
prices_raw = self._to_float(df[price_col])
# mapowanie godzin startowych
hour_start = hours_raw.apply(self._hour_start_from_label)
# Filtr rozsądnych godzin
mask_ok = hour_start.notna() & prices_raw.notna()
mask_ok &= hour_start.astype(float).between(0, 24, inclusive="both")
data = []
for h, p in zip(hour_start[mask_ok].astype(int), prices_raw[mask_ok].astype(float)):
start_hour = max(0, int(h) - 1) # H01 -> 0, H24 -> 23
start_local = datetime(business_day.year, business_day.month, business_day.day, start_hour, 0, 0, tzinfo=tz)
end_local = start_local + timedelta(hours=1)
value_net_kwh = self._ensure_net_pln_per_kwh(p)
if value_net_kwh is None:
continue
meta: Dict[str, Any] = {
"source_file": str(src_path),
"sheet": sheet,
"header_row_used": header,
"hour_column": hour_col,
"price_column": price_col,
"original_unit": self.source_unit,
"price_is_gross": self.price_is_gross,
"provider": self.PROVIDER,
"kind": self.KIND,
}
data.append((start_local, end_local, float(value_net_kwh), meta))
# Sortuj po czasie startu, usuń duplikaty godzin, zostaw pierwsze wystąpienie
data.sort(key=lambda x: x[0])
uniq: Dict[int, Tuple[datetime, datetime, float, Dict[str, Any]]] = {}
for start_dt, end_dt, val, meta in data:
hour_key = start_dt.hour
if hour_key not in uniq:
uniq[hour_key] = (start_dt, end_dt, val, meta)
# Zwróć w kolejności godzin 0..23 (24 jeśli jest w źródle)
ordered = [uniq[k] for k in sorted(uniq.keys())]
return ordered