280 lines
11 KiB
Python
280 lines
11 KiB
Python
from __future__ import annotations
|
||
|
||
import re
|
||
from dataclasses import dataclass
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
import pandas as pd
|
||
from utils.logging import HasLogger
|
||
from datetime import date, timedelta, datetime, timezone
|
||
from pathlib import Path
|
||
import requests
|
||
from urllib.parse import quote
|
||
import time
|
||
|
||
from EnergyPriceScraper import EnergyPriceScraperBase
|
||
|
||
CACHE_DIR='/var/cache/energy-price-scrapers'
|
||
|
||
def tge_rdn_report_url(delivery_day: date, folder: str = "A_SDAC 2025") -> str:
|
||
# Zwraca pełny URL do Excela dla doby dostawy.
|
||
# UWAGA: folder może zawierać spacje → kodujemy segment katalogu osobno.
|
||
y = delivery_day.year
|
||
m = f"{delivery_day.month:02d}"
|
||
d = f"{delivery_day.day:02d}"
|
||
folder_enc = quote(folder, safe="") # "A_SDAC 2025" -> "A_SDAC%202025"
|
||
filename = f"Raport_RDN_dzie_dostawy_delivery_day_{y}_{m}_{d}.xlsx"
|
||
return f"https://tge.pl/pub/TGE/{folder_enc}/RDN/{filename}"
|
||
|
||
def download_tge_rdn(delivery_day: date, out_dir: Path, folder_candidates=None, timeout=30, retries=1) -> Path:
|
||
"""
|
||
Próbuje pobrać raport dla 'delivery_day' testując:
|
||
a) domyślny folder 'A_SDAC 2025'
|
||
b) ewentualne warianty przekazane w folder_candidates
|
||
c) (opcjonalnie) wersję 'A_SDAC' bez roku – jeśli podasz w folder_candidates
|
||
Zwraca ścieżkę do pobranego pliku w out_dir.
|
||
:param logger:
|
||
"""
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
if folder_candidates is None:
|
||
folder_candidates = ["A_SDAC 2025", "A_SDAC 2026"]
|
||
|
||
last_err: Optional[Exception] = None
|
||
for folder in folder_candidates:
|
||
url = tge_rdn_report_url(delivery_day, folder=folder)
|
||
filename = url.split("/")[-1]
|
||
dest = out_dir / filename
|
||
if dest.exists():
|
||
print(f"got {dest.name} exists, skipping")
|
||
return dest
|
||
|
||
for attempt in range(1, retries + 1):
|
||
try:
|
||
r = requests.get(url, timeout=timeout)
|
||
if r.status_code == 200 and r.content:
|
||
print(f"got {dest.name}")
|
||
dest.write_bytes(r.content)
|
||
return dest
|
||
else:
|
||
last_err = RuntimeError(f"HTTP {r.status_code} dla {url}")
|
||
except Exception as e:
|
||
last_err = e
|
||
time.sleep(min(2 ** (attempt - 1), 8))
|
||
# Jeśli wszystko zawiodło:
|
||
raise RuntimeError(f"Nie udało się pobrać raportu dla {delivery_day}: {last_err}")
|
||
|
||
def get_month():
|
||
start = datetime.now().date().replace(day=1)
|
||
while start <= datetime.now().date()+timedelta(days=1):
|
||
download_tge_rdn(start, out_dir=Path(f"{CACHE_DIR}/A_SDAC_2025"))
|
||
start = start + timedelta(days=1)
|
||
|
||
|
||
@dataclass
|
||
class TGE_RDNScraper(EnergyPriceScraperBase, HasLogger):
|
||
"""
|
||
Scraper TGE RDN 'Notowania w systemie kursu jednolitego / fixing' z pliku Excel.
|
||
Implementuje fetch_day i zwraca wartości NETTO PLN/kWh.
|
||
|
||
Parametry:
|
||
- root_dir: katalog, w którym znajdują się raporty.
|
||
- filename_template: wzorzec nazwy pliku; {yyyy}, {mm}, {dd} zostaną podmienione
|
||
datą doby dostawy. Domyślnie zgodny z przykładowym plikiem:
|
||
'Raport_RDN_dzie_dostawy_delivery_day_{yyyy}_{mm}_{dd}.xlsx'
|
||
- price_is_gross: czy ceny w pliku są BRUTTO (jeśli tak, zdejmujemy VAT).
|
||
- vat_rate: stawka VAT (np. 0.23). Używana tylko gdy price_is_gross=True.
|
||
- source_unit: jednostka cen w pliku (domyślnie 'PLN/MWh').
|
||
- hour_span_regexes: lista regexów do parsowania godzin (na wypadek różnych formatów).
|
||
|
||
Identyfikatory (dostosuj w razie potrzeby):
|
||
- PROVIDER='TGE'
|
||
- KIND='fixing'
|
||
- SIDE='sell'
|
||
- BUYER='end_user'
|
||
- SELLER='market_index'
|
||
"""
|
||
root_dir: Path = Path(f"{CACHE_DIR}/A_SDAC_2025")
|
||
filename_template: str = "Raport_RDN_dzie_dostawy_delivery_day_{yyyy}_{mm}_{dd}.xlsx"
|
||
price_is_gross: bool = False
|
||
vat_rate: float = 0.23
|
||
source_unit: str = "PLN/MWh"
|
||
|
||
def __init__(self, **kwargs):
|
||
super().__init__(**kwargs)
|
||
|
||
self.PROVIDER = "TGE"
|
||
self.KIND = "market_price"
|
||
self.SIDE = "buy"
|
||
self.BUYER = "end_user"
|
||
self.SELLER = "market_index"
|
||
|
||
self.init_logger()
|
||
get_month()
|
||
|
||
# ---------- narzędzia wewnętrzne ----------
|
||
@staticmethod
|
||
def _norm(s: Any) -> str:
|
||
s = "" if s is None else str(s)
|
||
s = s.replace("\xa0", " ")
|
||
s = re.sub(r"\s+", " ", s.strip()).lower()
|
||
return s
|
||
|
||
def _filename_for_day(self, business_day: date) -> Path:
|
||
fn = self.filename_template.format(
|
||
yyyy=f"{business_day.year:04d}",
|
||
mm=f"{business_day.month:02d}",
|
||
dd=f"{business_day.day:02d}",
|
||
)
|
||
return (self.root_dir / fn).resolve()
|
||
|
||
def _read_candidate_frames(self, xlsx_path: Path):
|
||
"""Generator: zwraca (sheet_name, header_row, df_normalized_columns) tylko dla arkusza 'WYNIKI'."""
|
||
xls = pd.ExcelFile(xlsx_path)
|
||
# znajdź 'WYNIKI' (case-insensitive)
|
||
target_sheet = None
|
||
for s in xls.sheet_names:
|
||
if self._norm(s) == "wyniki":
|
||
target_sheet = s
|
||
break
|
||
# jeśli nie znaleziono dokładnie, spróbuj zawierające 'wyniki'
|
||
if target_sheet is None:
|
||
for s in xls.sheet_names:
|
||
if "wyniki" in self._norm(s):
|
||
target_sheet = s
|
||
break
|
||
if target_sheet is None:
|
||
# brak arkusza WYNIKI – nic nie zwracamy
|
||
return
|
||
|
||
for header in range(0, 8):
|
||
try:
|
||
df = pd.read_excel(xlsx_path, sheet_name=target_sheet, header=header)
|
||
except Exception:
|
||
continue
|
||
if df is None or df.empty:
|
||
continue
|
||
df = df.dropna(axis=1, how="all")
|
||
if df.empty:
|
||
continue
|
||
df.columns = [self._norm(c) for c in df.columns]
|
||
yield target_sheet, header, df
|
||
|
||
def _find_hour_col(self, df: pd.DataFrame) -> Optional[str]:
|
||
return df.columns[6] # kolumna I
|
||
|
||
def _find_price_col(self, df: pd.DataFrame) -> Optional[str]:
|
||
return df.columns[8] # kolumna K
|
||
|
||
@staticmethod
|
||
def _to_float(series: pd.Series) -> pd.Series:
|
||
s = series.astype(str)
|
||
s = s.str.replace("\xa0", " ").str.replace(" ", "", regex=False)
|
||
s = s.str.replace(",", ".", regex=False)
|
||
return pd.to_numeric(s, errors="coerce")
|
||
|
||
def _hour_start_from_label(self, label: str) -> Optional[int]:
|
||
t = str(label).strip()
|
||
matcher = re.compile(r"^\s*(\d{2})-(\d{2})-(\d{2})_H(\d{2})$")
|
||
m = matcher.match(t)
|
||
if not m:
|
||
return None
|
||
|
||
h = m.group(4)
|
||
if not h:
|
||
return None
|
||
|
||
return int(h)
|
||
|
||
def _ensure_net_pln_per_kwh(self, value: float) -> float:
|
||
"""Konwersja jednostek i VAT → NETTO PLN/kWh."""
|
||
if value is None:
|
||
return None # type: ignore
|
||
v = float(value)
|
||
# zdejmij VAT jeśli podano, że ceny są brutto
|
||
if self.price_is_gross and self.vat_rate and self.vat_rate > 0:
|
||
v = v / (1.0 + self.vat_rate)
|
||
# konwersja jednostek
|
||
unit = (self.source_unit or "").upper()
|
||
if unit in ("PLN/MWH", "PLN / MWH", "PLN MWH"):
|
||
v = v / 1000.0
|
||
elif unit in ("PLN/KWH", "PLN / KWH", "PLN KWH", "KWH/PLN"): # już OK
|
||
pass
|
||
else:
|
||
# Jeśli jednostka nieznana — przyjmujemy PLN/MWh i konwertujemy:
|
||
v = v / 1000.0
|
||
return v
|
||
|
||
# ---------- wymagane przez interfejs ----------
|
||
def fetch_day(
|
||
self, business_day: date, tz: timezone
|
||
) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
|
||
"""
|
||
Zwraca listę punktów dla doby 'business_day' w strefie 'tz':
|
||
(start_dt, end_dt, value_net_pln_kwh, metadata).
|
||
"""
|
||
src_path = self._filename_for_day(business_day)
|
||
if not src_path.exists():
|
||
raise FileNotFoundError(f"Nie znaleziono pliku: {src_path}")
|
||
|
||
chosen = None # (sheet, header, df, hour_col, price_col)
|
||
for sheet, header, df in self._read_candidate_frames(src_path):
|
||
hour_col = self._find_hour_col(df)
|
||
price_col = self._find_price_col(df)
|
||
if hour_col and price_col:
|
||
chosen = (sheet, header, df, hour_col, price_col)
|
||
break
|
||
|
||
if not chosen:
|
||
raise RuntimeError(
|
||
"Nie udało się znaleźć kolumn godzin i/lub cen (fixing) w żadnym arkuszu."
|
||
)
|
||
|
||
sheet, header, df, hour_col, price_col = chosen
|
||
|
||
# Ekstrakcja i czyszczenie
|
||
hours_raw = df[hour_col].astype(str).str.strip()
|
||
prices_raw = self._to_float(df[price_col])
|
||
|
||
# mapowanie godzin startowych
|
||
hour_start = hours_raw.apply(self._hour_start_from_label)
|
||
|
||
# Filtr rozsądnych godzin
|
||
mask_ok = hour_start.notna() & prices_raw.notna()
|
||
mask_ok &= hour_start.astype(float).between(0, 24, inclusive="both")
|
||
|
||
data = []
|
||
for h, p in zip(hour_start[mask_ok].astype(int), prices_raw[mask_ok].astype(float)):
|
||
start_hour = max(0, int(h) - 1) # H01 -> 0, H24 -> 23
|
||
start_local = datetime(business_day.year, business_day.month, business_day.day, start_hour, 0, 0, tzinfo=tz)
|
||
end_local = start_local + timedelta(hours=1)
|
||
|
||
value_net_kwh = self._ensure_net_pln_per_kwh(p)
|
||
if value_net_kwh is None:
|
||
continue
|
||
|
||
meta: Dict[str, Any] = {
|
||
"source_file": str(src_path),
|
||
"sheet": sheet,
|
||
"header_row_used": header,
|
||
"hour_column": hour_col,
|
||
"price_column": price_col,
|
||
"original_unit": self.source_unit,
|
||
"price_is_gross": self.price_is_gross,
|
||
"provider": self.PROVIDER,
|
||
"kind": self.KIND,
|
||
}
|
||
data.append((start_local, end_local, float(value_net_kwh), meta))
|
||
|
||
# Sortuj po czasie startu, usuń duplikaty godzin, zostaw pierwsze wystąpienie
|
||
data.sort(key=lambda x: x[0])
|
||
uniq: Dict[int, Tuple[datetime, datetime, float, Dict[str, Any]]] = {}
|
||
for start_dt, end_dt, val, meta in data:
|
||
hour_key = start_dt.hour
|
||
if hour_key not in uniq:
|
||
uniq[hour_key] = (start_dt, end_dt, val, meta)
|
||
|
||
# Zwróć w kolejności godzin 0..23 (24 jeśli jest w źródle)
|
||
ordered = [uniq[k] for k in sorted(uniq.keys())]
|
||
return ordered
|
||
|
||
|