Add TGE scraper

This commit is contained in:
Bartosz Wieczorek 2025-10-14 13:01:52 +02:00
parent 46fa8d1b44
commit 26d2ddae54
8 changed files with 317 additions and 11 deletions

View File

@ -6,7 +6,5 @@
</content> </content>
<orderEntry type="jdk" jdkName="Python 3.13 (entsoe_scrapper)" jdkType="Python SDK" /> <orderEntry type="jdk" jdkName="Python 3.13 (entsoe_scrapper)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="ranczo-energy-usage-scraper" />
<orderEntry type="module" module-name="ranczo-energy-costs" />
</component> </component>
</module> </module>

View File

@ -3,4 +3,5 @@
<component name="Black"> <component name="Black">
<option name="sdkName" value="Python 3.13 (entsoe_scrapper)" /> <option name="sdkName" value="Python 3.13 (entsoe_scrapper)" />
</component> </component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.13 (entsoe_scrapper)" project-jdk-type="Python SDK" />
</project> </project>

View File

@ -3,8 +3,6 @@
<component name="ProjectModuleManager"> <component name="ProjectModuleManager">
<modules> <modules>
<module fileurl="file://$PROJECT_DIR$/.idea/energy_price_scraper.iml" filepath="$PROJECT_DIR$/.idea/energy_price_scraper.iml" /> <module fileurl="file://$PROJECT_DIR$/.idea/energy_price_scraper.iml" filepath="$PROJECT_DIR$/.idea/energy_price_scraper.iml" />
<module fileurl="file://$PROJECT_DIR$/../ranczo-energy-costs/.idea/ranczo-energy-costs.iml" filepath="$PROJECT_DIR$/../ranczo-energy-costs/.idea/ranczo-energy-costs.iml" />
<module fileurl="file://$PROJECT_DIR$/../ranczo-energy-usage-scraper/.idea/ranczo-energy-usage-scraper.iml" filepath="$PROJECT_DIR$/../ranczo-energy-usage-scraper/.idea/ranczo-energy-usage-scraper.iml" />
</modules> </modules>
</component> </component>
</project> </project>

279
Scraper/TGE_RDNScraper.py Normal file
View File

@ -0,0 +1,279 @@
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from utils.logging import HasLogger
from datetime import date, timedelta, datetime
from pathlib import Path
import requests
from urllib.parse import quote
import time
from EnergyPriceScraper import EnergyPriceScraperBase
CACHE_DIR='/var/cache/energy-price-scrapers'
def tge_rdn_report_url(delivery_day: date, folder: str = "A_SDAC 2025") -> str:
# Zwraca pełny URL do Excela dla doby dostawy.
# UWAGA: folder może zawierać spacje → kodujemy segment katalogu osobno.
y = delivery_day.year
m = f"{delivery_day.month:02d}"
d = f"{delivery_day.day:02d}"
folder_enc = quote(folder, safe="") # "A_SDAC 2025" -> "A_SDAC%202025"
filename = f"Raport_RDN_dzie_dostawy_delivery_day_{y}_{m}_{d}.xlsx"
return f"https://tge.pl/pub/TGE/{folder_enc}/RDN/{filename}"
def download_tge_rdn(delivery_day: date, out_dir: Path, folder_candidates=None, timeout=30, retries=1) -> Path:
"""
Próbuje pobrać raport dla 'delivery_day' testując:
a) domyślny folder 'A_SDAC 2025'
b) ewentualne warianty przekazane w folder_candidates
c) (opcjonalnie) wersję 'A_SDAC' bez roku jeśli podasz w folder_candidates
Zwraca ścieżkę do pobranego pliku w out_dir.
:param logger:
"""
out_dir.mkdir(parents=True, exist_ok=True)
if folder_candidates is None:
folder_candidates = ["A_SDAC 2025", "A_SDAC 2026"]
last_err: Optional[Exception] = None
for folder in folder_candidates:
url = tge_rdn_report_url(delivery_day, folder=folder)
filename = url.split("/")[-1]
dest = out_dir / filename
if dest.exists():
print(f"got {dest.name} exists, skipping")
return dest
for attempt in range(1, retries + 1):
try:
r = requests.get(url, timeout=timeout)
if r.status_code == 200 and r.content:
print(f"got {dest.name}")
dest.write_bytes(r.content)
return dest
else:
last_err = RuntimeError(f"HTTP {r.status_code} dla {url}")
except Exception as e:
last_err = e
time.sleep(min(2 ** (attempt - 1), 8))
# Jeśli wszystko zawiodło:
raise RuntimeError(f"Nie udało się pobrać raportu dla {delivery_day}: {last_err}")
def get_month():
start = datetime.now().date().replace(day=1)
while start <= datetime.now().date()+timedelta(days=1):
download_tge_rdn(start, out_dir=Path(f"{CACHE_DIR}/A_SDAC_2025"))
start = start + timedelta(days=1)
@dataclass
class TGE_RDNScraper(EnergyPriceScraperBase, HasLogger):
"""
Scraper TGE RDN 'Notowania w systemie kursu jednolitego / fixing' z pliku Excel.
Implementuje fetch_day i zwraca wartości NETTO PLN/kWh.
Parametry:
- root_dir: katalog, w którym znajdują się raporty.
- filename_template: wzorzec nazwy pliku; {yyyy}, {mm}, {dd} zostaną podmienione
datą doby dostawy. Domyślnie zgodny z przykładowym plikiem:
'Raport_RDN_dzie_dostawy_delivery_day_{yyyy}_{mm}_{dd}.xlsx'
- price_is_gross: czy ceny w pliku BRUTTO (jeśli tak, zdejmujemy VAT).
- vat_rate: stawka VAT (np. 0.23). Używana tylko gdy price_is_gross=True.
- source_unit: jednostka cen w pliku (domyślnie 'PLN/MWh').
- hour_span_regexes: lista regexów do parsowania godzin (na wypadek różnych formatów).
Identyfikatory (dostosuj w razie potrzeby):
- PROVIDER='TGE'
- KIND='fixing'
- SIDE='sell'
- BUYER='end_user'
- SELLER='market_index'
"""
root_dir: Path = Path(f"{CACHE_DIR}/A_SDAC_2025")
filename_template: str = "Raport_RDN_dzie_dostawy_delivery_day_{yyyy}_{mm}_{dd}.xlsx"
price_is_gross: bool = False
vat_rate: float = 0.23
source_unit: str = "PLN/MWh"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.PROVIDER = "TGE"
self.KIND = "market_price"
self.SIDE = "buy"
self.BUYER = "end_user"
self.SELLER = "market_index"
self.init_logger()
get_month()
# ---------- narzędzia wewnętrzne ----------
@staticmethod
def _norm(s: Any) -> str:
s = "" if s is None else str(s)
s = s.replace("\xa0", " ")
s = re.sub(r"\s+", " ", s.strip()).lower()
return s
def _filename_for_day(self, business_day: date) -> Path:
fn = self.filename_template.format(
yyyy=f"{business_day.year:04d}",
mm=f"{business_day.month:02d}",
dd=f"{business_day.day:02d}",
)
return (self.root_dir / fn).resolve()
def _read_candidate_frames(self, xlsx_path: Path):
"""Generator: zwraca (sheet_name, header_row, df_normalized_columns) tylko dla arkusza 'WYNIKI'."""
xls = pd.ExcelFile(xlsx_path)
# znajdź 'WYNIKI' (case-insensitive)
target_sheet = None
for s in xls.sheet_names:
if self._norm(s) == "wyniki":
target_sheet = s
break
# jeśli nie znaleziono dokładnie, spróbuj zawierające 'wyniki'
if target_sheet is None:
for s in xls.sheet_names:
if "wyniki" in self._norm(s):
target_sheet = s
break
if target_sheet is None:
# brak arkusza WYNIKI nic nie zwracamy
return
for header in range(0, 8):
try:
df = pd.read_excel(xlsx_path, sheet_name=target_sheet, header=header)
except Exception:
continue
if df is None or df.empty:
continue
df = df.dropna(axis=1, how="all")
if df.empty:
continue
df.columns = [self._norm(c) for c in df.columns]
yield target_sheet, header, df
def _find_hour_col(self, df: pd.DataFrame) -> Optional[str]:
return df.columns[6] # kolumna I
def _find_price_col(self, df: pd.DataFrame) -> Optional[str]:
return df.columns[8] # kolumna K
@staticmethod
def _to_float(series: pd.Series) -> pd.Series:
s = series.astype(str)
s = s.str.replace("\xa0", " ").str.replace(" ", "", regex=False)
s = s.str.replace(",", ".", regex=False)
return pd.to_numeric(s, errors="coerce")
def _hour_start_from_label(self, label: str) -> Optional[int]:
t = str(label).strip()
matcher = re.compile(r"^\s*(\d{2})-(\d{2})-(\d{2})_H(\d{2})$")
m = matcher.match(t)
if not m:
return None
h = m.group(4)
if not h:
return None
return int(h)
def _ensure_net_pln_per_kwh(self, value: float) -> float:
"""Konwersja jednostek i VAT → NETTO PLN/kWh."""
if value is None:
return None # type: ignore
v = float(value)
# zdejmij VAT jeśli podano, że ceny są brutto
if self.price_is_gross and self.vat_rate and self.vat_rate > 0:
v = v / (1.0 + self.vat_rate)
# konwersja jednostek
unit = (self.source_unit or "").upper()
if unit in ("PLN/MWH", "PLN / MWH", "PLN MWH"):
v = v / 1000.0
elif unit in ("PLN/KWH", "PLN / KWH", "PLN KWH", "KWH/PLN"): # już OK
pass
else:
# Jeśli jednostka nieznana — przyjmujemy PLN/MWh i konwertujemy:
v = v / 1000.0
return v
# ---------- wymagane przez interfejs ----------
def fetch_day(
self, business_day: date, tz: timezone
) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
"""
Zwraca listę punktów dla doby 'business_day' w strefie 'tz':
(start_dt, end_dt, value_net_pln_kwh, metadata).
"""
src_path = self._filename_for_day(business_day)
if not src_path.exists():
raise FileNotFoundError(f"Nie znaleziono pliku: {src_path}")
chosen = None # (sheet, header, df, hour_col, price_col)
for sheet, header, df in self._read_candidate_frames(src_path):
hour_col = self._find_hour_col(df)
price_col = self._find_price_col(df)
if hour_col and price_col:
chosen = (sheet, header, df, hour_col, price_col)
break
if not chosen:
raise RuntimeError(
"Nie udało się znaleźć kolumn godzin i/lub cen (fixing) w żadnym arkuszu."
)
sheet, header, df, hour_col, price_col = chosen
# Ekstrakcja i czyszczenie
hours_raw = df[hour_col].astype(str).str.strip()
prices_raw = self._to_float(df[price_col])
# mapowanie godzin startowych
hour_start = hours_raw.apply(self._hour_start_from_label)
# Filtr rozsądnych godzin
mask_ok = hour_start.notna() & prices_raw.notna()
mask_ok &= hour_start.astype(float).between(0, 24, inclusive="both")
data = []
for h, p in zip(hour_start[mask_ok].astype(int), prices_raw[mask_ok].astype(float)):
start_hour = max(0, int(h) - 1) # H01 -> 0, H24 -> 23
start_local = datetime(business_day.year, business_day.month, business_day.day, start_hour, 0, 0, tzinfo=tz)
end_local = start_local + timedelta(hours=1)
value_net_kwh = self._ensure_net_pln_per_kwh(p)
if value_net_kwh is None:
continue
meta: Dict[str, Any] = {
"source_file": str(src_path),
"sheet": sheet,
"header_row_used": header,
"hour_column": hour_col,
"price_column": price_col,
"original_unit": self.source_unit,
"price_is_gross": self.price_is_gross,
"provider": self.PROVIDER,
"kind": self.KIND,
}
data.append((start_local, end_local, float(value_net_kwh), meta))
# Sortuj po czasie startu, usuń duplikaty godzin, zostaw pierwsze wystąpienie
data.sort(key=lambda x: x[0])
uniq: Dict[int, Tuple[datetime, datetime, float, Dict[str, Any]]] = {}
for start_dt, end_dt, val, meta in data:
hour_key = start_dt.hour
if hour_key not in uniq:
uniq[hour_key] = (start_dt, end_dt, val, meta)
# Zwróć w kolejności godzin 0..23 (24 jeśli jest w źródle)
ordered = [uniq[k] for k in sorted(uniq.keys())]
return ordered

21
app.py
View File

@ -2,6 +2,7 @@ import logging
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.schedulers.blocking import BlockingScheduler
import EnergyPriceScraperFactory import EnergyPriceScraperFactory
import app_impl
from utils.logging import setup_logging from utils.logging import setup_logging
from utils.time import WARSAW_TZ from utils.time import WARSAW_TZ
@ -9,6 +10,8 @@ from utils.time import WARSAW_TZ
from app_impl import hourly_tick from app_impl import hourly_tick
from utils.logging import function_logger from utils.logging import function_logger
import datetime
setup_logging(logging.DEBUG) setup_logging(logging.DEBUG)
sched = BlockingScheduler(timezone=WARSAW_TZ) sched = BlockingScheduler(timezone=WARSAW_TZ)
@ -24,16 +27,30 @@ def run(name: str):
log.error(f"{name} throw an exception: {e}") log.error(f"{name} throw an exception: {e}")
# def run_arbitrary(name: str, date):
# log = function_logger()
# log.info(f"Running {name}")
# try:
# conn = EnergyPriceScraperFactory.setup_db()
# pstryk_scraper = EnergyPriceScraperFactory.create(name, conn=conn)
# # 11:30 and hourly safety check
# app_impl.date_run_tick(pstryk_scraper, date)
# except Exception as e:
# log.error(f"{name} throw an exception: {e}")
def run_pstryk(): def run_pstryk():
run("Pstryk") run("Pstryk")
def run_PSE_RCE(): def run_PSE_RCE():
run("PSE_RCE") run("PSE_RCE")
def run_TGE_RDN():
run("TGE_RDN")
# Daily triggers at exact local times (DST-safe) # Daily triggers at exact local times (DST-safe)
sched.add_job(run_pstryk, 'cron', hour='*', minute='35') sched.add_job(run_pstryk, 'cron', hour='*', minute='35')
sched.add_job(run_PSE_RCE, 'cron', hour='*', minute='35') sched.add_job(run_PSE_RCE, 'cron', hour='*', minute='35')
sched.add_job(run_TGE_RDN, 'cron', hour='*', minute='25')
sched.start() sched.start()

View File

@ -53,6 +53,18 @@ def maybe_ingest_for_next_day(
log.debug("Ingested rows = {}".format(ingested_rows)) log.debug("Ingested rows = {}".format(ingested_rows))
return ingested_rows return ingested_rows
def date_run_tick(
scraper,
now_local: datetime
) -> int:
log = function_logger()
# 1) Try to ingest the next business day after each con figured window
affected_next = 0
affected_next += maybe_ingest_for_next_day(scraper, now_local, WARSAW_TZ)
log.info(f"Hourly tick ended for {now_local} with {affected_next} affected rows")
return affected_next
def hourly_tick( def hourly_tick(
scraper scraper
) -> int: ) -> int:
@ -69,8 +81,5 @@ def hourly_tick(
if now_local.time().hour < 11: if now_local.time().hour < 11:
log.info("Hourly tick skipped, too early for update") log.info("Hourly tick skipped, too early for update")
return 0 return 0
# 1) Try to ingest the next business day after each con figured window
affected_next = 0 return date_run_tick(scraper, now_local)
affected_next += maybe_ingest_for_next_day(scraper, now_local, WARSAW_TZ)
log.info(f"Hourly tick ended for {now_local} with {affected_next} affected rows")
return affected_next

View File

@ -9,6 +9,8 @@ User=energy
Group=energy Group=energy
WorkingDirectory=/opt/energy-price-scrapers WorkingDirectory=/opt/energy-price-scrapers
EnvironmentFile=/etc/energy-price-scrapers.env EnvironmentFile=/etc/energy-price-scrapers.env
CacheDirectory=energy-price-scrapers
CacheDirectoryMode=0750
# Use the venv python to run your scheduler app # Use the venv python to run your scheduler app
ExecStart=/opt/energy-price-scrapers/.venv/bin/python /opt/energy-price-scrapers/app.py ExecStart=/opt/energy-price-scrapers/.venv/bin/python /opt/energy-price-scrapers/app.py
Restart=always Restart=always
@ -18,6 +20,7 @@ NoNewPrivileges=true
PrivateTmp=true PrivateTmp=true
ProtectHome=true ProtectHome=true
ProtectSystem=full ProtectSystem=full
ReadWritePaths=/var/cache/energy-price-scrapers
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target

View File

@ -1,5 +1,6 @@
psycopg[binary]>=3.1,<3.3 psycopg[binary]>=3.1,<3.3
requests>=2.32,<3.0 requests>=2.32,<3.0
apscheduler>=3.10,<4.0 apscheduler>=3.10,<4.0
openpyxl>=3.1.5
systemd-python>=235; sys_platform != "win32" systemd-python>=235; sys_platform != "win32"