Add logger

This commit is contained in:
Bartosz Wieczorek 2025-09-02 09:20:44 +02:00
parent fcc4d14a9d
commit 879191527c
6 changed files with 97 additions and 50 deletions

View File

@ -2,9 +2,13 @@ from __future__ import annotations
from datetime import datetime
from DistributionCost import DistributionCostBase
from logging_utils import HasLogger
# ---------- TAURON G11 ----------
class TauronG11Provider(DistributionCostBase):
class TauronG11Provider(DistributionCostBase, HasLogger):
def __init__(self):
self._init_logger()
def rate(self, ts: datetime) -> float:
return 0.504

View File

@ -11,7 +11,7 @@ from utils.time_helpers import WARSAW_TZ
IntervalRow = Tuple[datetime, datetime, float, str, str, str, str, str, str] # patrz _rows_to_upsert
UPSERT_SQL = """
INSERT INTO pricing.energy_prices
INSERT INTO pricing.energy_prices ep
(ts_start, ts_end, price_pln_net, provider, kind, side, buyer, seller, source_meta)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (ts_start, ts_end, provider, kind, side)
@ -23,10 +23,24 @@ DO UPDATE SET
|| COALESCE(EXCLUDED.source_meta, '{}'::jsonb),
inserted_at = now()
WHERE
pricing.energy_prices.price_pln_net IS DISTINCT FROM EXCLUDED.price_pln_net
OR pricing.energy_prices.buyer IS DISTINCT FROM EXCLUDED.buyer
OR pricing.energy_prices.seller IS DISTINCT FROM EXCLUDED.seller
OR (COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb) || COALESCE(EXCLUDED.source_meta, '{}'::jsonb)) IS DISTINCT FROM COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb);
ep.price_pln_net IS DISTINCT FROM EXCLUDED.price_pln_net
OR ep.buyer IS DISTINCT FROM EXCLUDED.buyer
OR ep.seller IS DISTINCT FROM EXCLUDED.seller
OR (COALESCE(ep.source_meta, '{}'::jsonb)
|| COALESCE(EXCLUDED.source_meta, '{}'::jsonb))
IS DISTINCT FROM COALESCE(ep.source_meta, '{}'::jsonb)
RETURNING 1 AS affected, (xmax <> 0) AS was_update;"""
SELECT_LAST = """
select ts_end from pricing.energy_prices ep
where
ep.provider = %s
and ep.kind = %s
and ep.side = %s::price_side
and ep.buyer = %s
and ep.seller = %s
order by ts_start desc
limit 1
"""
@dataclass
@ -42,6 +56,7 @@ class EnergyPriceScraperBase:
SIDE: str = "" # 'buy'|'sell'
BUYER: str = "" # 'end_user'
SELLER: str = "" # 'market_index'
log = None
# throttling/retry
max_retries: int = 3
@ -76,14 +91,15 @@ class EnergyPriceScraperBase:
# ---------- public API ----------
def ingest_day(self, business_day: date) -> int:
"""Pobiera i zapisuje całą dobę [00:00, 24:00) lokalnie. Zwraca liczbę upsertowanych wierszy."""
print(f"fetch day {date}")
self.log.info("Ingesting day {}".format(business_day))
points = self.fetch_day(business_day) # [(start,end,price,meta_dict), ...]
rows = self._rows_to_upsert(points)
print(f"inserting {len(rows)} rows")
self.log.debug("{} rows inserted".format(len(rows)))
return self._upsert(rows)
def ingest_range(self, start_day: date, end_day: date) -> int:
"""Backfill: [start_day, end_day] po dniach lokalnych."""
self.log.info("Ingesting range [{}, {})".format(start_day, end_day))
total = 0
d = start_day
while d <= end_day:
@ -91,6 +107,19 @@ class EnergyPriceScraperBase:
d = d + timedelta(days=1)
return total
def last_entry(self) -> Optional[datetime]:
self.log.info("Retrieving last entry")
with self._ensure_conn().cursor() as cur:
params = (self.provider(), self.kind(), self.side(), self.buyer(), self.seller())
cur.execute(SELECT_LAST, params)
res = cur.fetchone() # None if no change, tuple if insert/update
if res is not None:
self.log.debug("last entry {}".format(res))
return res[0]
self.log.debug("No last entry")
return None
def fetch_day(self, business_day: date) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
"""Zaimplementuj w podklasie. Zwracaj listę punktów z NETTO PLN/kWh."""
raise NotImplementedError
@ -106,6 +135,7 @@ class EnergyPriceScraperBase:
def _rows_to_upsert(self, points: Iterable[Tuple[datetime, datetime, float, Dict[str, Any]]]) -> List[IntervalRow]:
rows: List[IntervalRow] = []
self.log.debug("Creating upsert rows from {} points".format(len(points)))
for ts_start, ts_end, price_pln_kwh_net, meta in points:
# force timezones
assert(ts_start.tzname() is not None)
@ -118,27 +148,25 @@ class EnergyPriceScraperBase:
))
return rows
def _upsert(self, rows: List[IntervalRow]) -> int:
if not rows: return 0
for attempt in range(1, self.max_retries + 1):
try:
with self._ensure_conn().cursor() as cur:
try:
cur.executemany(UPSERT_SQL, rows)
except Exception as ee:
print("Bad data", ee)
self._ensure_conn().commit()
def _upsert(self, rows: list[IntervalRow]) -> int:
if not rows:
return 0
# cur.executemany(UPSERT_SQL, rows)
self._ensure_conn().commit()
print(f"upserted {len(rows)} rows")
return len(rows)
except Exception as e:
print(e)
if attempt >= self.max_retries:
raise
_time.sleep(self.backoff_sec * attempt)
return 0
affected = 0
updated = 0
self.log.info("Upserting %d rows ", len(rows))
with self._ensure_conn().cursor() as cur:
for r in rows:
cur.execute(UPSERT_SQL, r)
res = cur.fetchone() # None if no change, tuple if insert/update
if res is not None:
affected += res[0] # res[0] == 1
updated += int(bool(res[1])) # was_update -> 1/0
self._ensure_conn().commit()
self.log.info("Affected=%d (updated=%d, inserted=%d)", affected, updated, affected - updated)
return affected
def _day_range(self, d: date) -> Tuple[datetime, datetime]:
start = datetime(d.year, d.month, d.day, 0, 0, tzinfo=self.tz)

View File

@ -62,14 +62,12 @@ class InstratRDN_CSVScraper(EnergyPriceScraperBase):
col = "fixing_i_pln_kwh"
fixing_tag = "I"
# zakres doby lokalnej
day_start = datetime(business_day.year, business_day.month, business_day.day, 0, 0, tzinfo=self.tz)
day_end = day_start + timedelta(days=1)
# filtr i emisja punktów
df_day = self.out.loc[(self.out.index >= day_start) & (self.out.index < day_end)]
if col not in df_day.columns:
raise KeyError(f"Kolumna '{col}' nie istnieje w self.out")
raise KeyError(f"Column '{col}' does not exists")
points: List[Tuple[datetime, datetime, float, Dict[str, Any]]] = []
for ts, price in df_day[col].dropna().items():

View File

@ -1,16 +1,15 @@
from __future__ import annotations
from datetime import datetime, timedelta, date, time
from traceback import format_list
from typing import List, Tuple, Dict, Any
import requests
from numpy import rec
from psycopg.generators import fetch
from EnergyPriceScraper import EnergyPriceScraperBase
from utils.time_helpers import WARSAW_TZ, UTC
from utils.time_helpers import UTC
from logging_utils import HasLogger
class PSE_RCEScraper(EnergyPriceScraperBase):
class PSE_RCEScraper(EnergyPriceScraperBase, HasLogger):
"""
PSE RCE (PLN) godziny dla danej doby.
Zwraca NETTO PLN/kWh (jeżeli RCE jest w PLN/MWh, dzielimy przez 1000).
@ -24,12 +23,14 @@ class PSE_RCEScraper(EnergyPriceScraperBase):
self.PROVIDER = "PSE"
self.KIND = "rce"
self.SIDE = "sell"
self.BUYER = "reteiler" # sprzedawca rozliczajacy prosumenta
self.BUYER = "reteiler"
self.SELLER = "prosumer"
self.init_logger()
self.session = requests.Session()
self.session.headers.update({"accept": "application/json"})
self.log.info("Initializing PSE RCE Done")
def fetch_day(self, business_day: date) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
# RCE v2: filter by business_date, select rce_pln,dtime,period

View File

@ -4,10 +4,12 @@ from typing import List, Tuple, Dict, Any, Optional
import os
import requests
from EnergyPriceScraper import EnergyPriceScraperBase
from utils.time_helpers import UTC, WARSAW_TZ
from logging_utils import HasLogger
from utils.time_helpers import UTC
class PstrykScraper(EnergyPriceScraperBase):
class PstrykScraper(EnergyPriceScraperBase, HasLogger):
"""
Szablon: ceny publikowane przez sprzedawcę (Pstryk).
Załóż: Bearer token w ENV PSTRYK_TOKEN, endpoint w ENV PSTRYK_API_BASE, np.:
@ -27,6 +29,8 @@ class PstrykScraper(EnergyPriceScraperBase):
self.BUYER = "end_user"
self.SELLER = "PSTRYK"
self.init_logger()
self.api_base = os.getenv("PSTRYK_API_BASE", "https://api.pstryk.pl/").rstrip("/")
self.token = os.getenv("PSTRYK_TOKEN", "sk-QLX1AHLF83X15VWPYRUD5G87BK8DBF0SS9XLWQ8R")
if not self.api_base or not self.token:
@ -38,10 +42,14 @@ class PstrykScraper(EnergyPriceScraperBase):
"user-agent": "energy-scraper/1.0",
})
self.log.info("Initializing PSTRYK Done")
def fetch_day(self, business_day: date) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
url = f"{self.api_base}/integrations/pricing"
self.log.debug(f"Fetching {url}")
start = datetime(year=business_day.year, month=business_day.month, day=business_day.day, hour=0, minute=0, second=0).astimezone(UTC)
end = start + timedelta(days=1)
self.log.info(f"Fetching {business_day} from [{start}, {end})")
r = self.session.get(url, params=
{
"resolution": "hour",
@ -52,14 +60,14 @@ class PstrykScraper(EnergyPriceScraperBase):
data = r.json()
out: List[Tuple[datetime, datetime, float, Dict[str, Any]]] = []
self.log.debug(f"Fetched {len(data['frames'])} data frames for [{start}, {end})")
for frame in data['frames']:
row = self.parse_generic_price_frame(frame)
if row is not None:
out.append(row)
return out
@staticmethod
def parse_generic_price_frame(rec: dict):
def parse_generic_price_frame(self, rec: dict):
"""
Wejście (przykład):
{'start':'2025-09-01T00:00:00+00:00','end':'2025-09-01T01:00:00+00:00',
@ -68,6 +76,7 @@ class PstrykScraper(EnergyPriceScraperBase):
(ts_start_utc, ts_end_utc, price_pln_kwh_net, meta)
"""
if rec.get("is_cheap") is None or rec.get("is_expensive") is None:
self.log.info(f"Ignoring non-valid price frame {rec}")
return None
try:

23
main.py
View File

@ -1,17 +1,21 @@
import logging
from datetime import datetime, timedelta
from pandas import timedelta_range
import DistributionCostFactory
from EnergyPriceProvider import DynamicPricesProvider
from plot_cost_breakdown import plot_stacked_with_negatives
from matplotlib import pyplot as plt
# from pandas import timedelta_range
#
# import DistributionCostFactory
# from EnergyPriceProvider import DynamicPricesProvider
# from plot_cost_breakdown import plot_stacked_with_negatives
# from matplotlib import pyplot as plt
import EnergyPriceScraperFactory
from utils.time_helpers import WARSAW_TZ
from logging_setup import setup_logging
if __name__ == "__main__":
path = "electricity_prices_day_ahead_hourly_all.csv"
setup_logging(logging.DEBUG)
# path = "electricity_prices_day_ahead_hourly_all.csv"
conn = EnergyPriceScraperFactory.setup_db()
# scraper = EnergyPriceScraperFactory.create("InstratRDN_CSV", conn=conn, path=path)
@ -19,7 +23,10 @@ if __name__ == "__main__":
# scraper = EnergyPriceScraperFactory.create("Pstryk", conn=conn)
# scraper.ingest_day(datetime.now(tz=WARSAW_TZ))
scraper.ingest_day(datetime.now(tz=WARSAW_TZ) + timedelta(days=1))
last = scraper.last_entry()
# scraper.ingest_day(datetime.now(tz=WARSAW_TZ) )
# distribution_cost = DistributionCostFactory.create("TauronG13")
# energy_price = DynamicPricesProvider.DynamicPricesProvider(PROVIDER="instrat", conn=conn, KIND="fixing I", SIDE="buy")