remove unused cvs files with data, refactor scrappers to be easy to use for featch tomorow'

This commit is contained in:
Bartosz Wieczorek 2025-09-02 14:18:34 +02:00
parent 90aaffaf90
commit e9c31a5ce5
9 changed files with 222 additions and 84709 deletions

View File

@ -1,17 +1,16 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, date, timedelta
from datetime import datetime, date, timedelta, timezone
from typing import Iterable, List, Tuple, Dict, Any, Optional
from zoneinfo import ZoneInfo
import json
import psycopg
import time as _time
from utils.time_helpers import WARSAW_TZ
IntervalRow = Tuple[datetime, datetime, float, str, str, str, str, str, str] # patrz _rows_to_upsert
UPSERT_SQL = """
INSERT INTO pricing.energy_prices ep
INSERT INTO pricing.energy_prices
(ts_start, ts_end, price_pln_net, provider, kind, side, buyer, seller, source_meta)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (ts_start, ts_end, provider, kind, side)
@ -23,12 +22,12 @@ DO UPDATE SET
|| COALESCE(EXCLUDED.source_meta, '{}'::jsonb),
inserted_at = now()
WHERE
ep.price_pln_net IS DISTINCT FROM EXCLUDED.price_pln_net
OR ep.buyer IS DISTINCT FROM EXCLUDED.buyer
OR ep.seller IS DISTINCT FROM EXCLUDED.seller
OR (COALESCE(ep.source_meta, '{}'::jsonb)
pricing.energy_prices.price_pln_net IS DISTINCT FROM EXCLUDED.price_pln_net
OR pricing.energy_prices.buyer IS DISTINCT FROM EXCLUDED.buyer
OR pricing.energy_prices.seller IS DISTINCT FROM EXCLUDED.seller
OR (COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb)
|| COALESCE(EXCLUDED.source_meta, '{}'::jsonb))
IS DISTINCT FROM COALESCE(ep.source_meta, '{}'::jsonb)
IS DISTINCT FROM COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb)
RETURNING 1 AS affected, (xmax <> 0) AS was_update;"""
SELECT_LAST = """
@ -92,21 +91,11 @@ class EnergyPriceScraperBase:
def ingest_day(self, business_day: date) -> int:
"""Pobiera i zapisuje całą dobę [00:00, 24:00) lokalnie. Zwraca liczbę upsertowanych wierszy."""
self.log.info("Ingesting day {}".format(business_day))
points = self.fetch_day(business_day) # [(start,end,price,meta_dict), ...]
points = self.fetch_day(business_day, WARSAW_TZ)
rows = self._rows_to_upsert(points)
self.log.debug("{} rows inserted".format(len(rows)))
return self._upsert(rows)
def ingest_range(self, start_day: date, end_day: date) -> int:
"""Backfill: [start_day, end_day] po dniach lokalnych."""
self.log.info("Ingesting range [{}, {})".format(start_day, end_day))
total = 0
d = start_day
while d <= end_day:
total += self.ingest_day(d)
d = d + timedelta(days=1)
return total
def last_entry(self) -> Optional[datetime]:
self.log.info("Retrieving last entry")
with self._ensure_conn().cursor() as cur:
@ -120,7 +109,7 @@ class EnergyPriceScraperBase:
self.log.debug("No last entry")
return None
def fetch_day(self, business_day: date) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
def fetch_day(self, business_day: date, tz: timezone) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
"""Zaimplementuj w podklasie. Zwracaj listę punktów z NETTO PLN/kWh."""
raise NotImplementedError

View File

@ -1,10 +1,10 @@
from __future__ import annotations
from datetime import datetime, timedelta, date, time
from datetime import datetime, timedelta, date, timezone
from typing import List, Tuple, Dict, Any
import requests
from EnergyPriceScraper import EnergyPriceScraperBase
from utils.time_helpers import UTC
from utils.time_helpers import UTC, WARSAW_TZ
from logging_utils import HasLogger
@ -32,7 +32,18 @@ class PSE_RCEScraper(EnergyPriceScraperBase, HasLogger):
self.session.headers.update({"accept": "application/json"})
self.log.info("Initializing PSE RCE Done")
def fetch_day(self, business_day: date) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
def fetch_range(self, start_date: datetime, end_date: datetime) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
assert start_date < end_date
assert start_date.tzinfo is not None
assert end_date.tzinfo is not None
assert start_date.tzinfo == UTC
assert end_date.tzinfo == UTC
assert end_date - start_date == timedelta(days=1) # for now no way to import more than one day
self.log.info(f"Fetching range: [{start_date}, {end_date}) UTC / [{start_date.astimezone(WARSAW_TZ)}, {end_date.astimezone(WARSAW_TZ)}) Europe/Warsaw")
business_day = start_date.astimezone(WARSAW_TZ).date()
self.log.debug(f"business_day: {business_day}")
# RCE v2: filter by business_date, select rce_pln,dtime,period
params = {
"$select": "rce_pln,publication_ts_utc,dtime_utc,business_date",
@ -41,6 +52,7 @@ class PSE_RCEScraper(EnergyPriceScraperBase, HasLogger):
r = self.session.get(self.api_url, params=params, timeout=30)
r.raise_for_status()
data = r.json().get("value", [])
self.log.debug(f"Fetched data len: {len(data)} points")
out: List[Tuple[datetime, datetime, float, Dict[str, Any]]] = []
for item in data:
@ -48,8 +60,21 @@ class PSE_RCEScraper(EnergyPriceScraperBase, HasLogger):
return out
def fetch_day(self, business_day: date, tz: timezone) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
start = datetime(
year=business_day.year,
month=business_day.month,
day=business_day.day,
hour=0,
minute=0,
second=0,
tzinfo=tz
).astimezone(UTC)
end = start + timedelta(days=1)
return self.fetch_range(start, end)
@staticmethod
def fetch_date_range(dtime: datetime):
def PSE_date_range(dtime: datetime):
ts_end = dtime
ts_start = dtime - timedelta(minutes=15)
@ -59,7 +84,7 @@ class PSE_RCEScraper(EnergyPriceScraperBase, HasLogger):
def parse_pse_rce_record(rec: dict):
# 'dtime' date is the END of timeslot, so begining is dtime - t_stop
dtime_utc = datetime.strptime(rec["dtime_utc"], "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC)
ts_start, ts_end = PSE_RCEScraper.fetch_date_range(dtime=dtime_utc)
ts_start, ts_end = PSE_RCEScraper.PSE_date_range(dtime=dtime_utc)
price_pln_mwh = float(rec["rce_pln"])
price_pln_kwh = price_pln_mwh / 1000.0

View File

@ -1,12 +1,12 @@
from __future__ import annotations
from datetime import datetime, timedelta, date
from datetime import datetime, timedelta, date, timezone
from typing import List, Tuple, Dict, Any, Optional
import os
import requests
from EnergyPriceScraper import EnergyPriceScraperBase
from logging_utils import HasLogger
from utils.time_helpers import UTC
from utils.time_helpers import UTC, WARSAW_TZ
class PstrykScraper(EnergyPriceScraperBase, HasLogger):
@ -42,31 +42,49 @@ class PstrykScraper(EnergyPriceScraperBase, HasLogger):
"user-agent": "energy-scraper/1.0",
})
self.log.info("Initializing PSTRYK Done")
self.log.debug("Initializing PSTRYK Done")
def fetch_range(self, start_date: datetime, end_date: datetime) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
assert start_date < end_date
assert start_date.tzinfo is not None
assert end_date.tzinfo is not None
assert start_date.tzinfo == UTC
assert end_date.tzinfo == UTC
def fetch_day(self, business_day: date) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
url = f"{self.api_base}/integrations/pricing"
self.log.debug(f"Fetching {url}")
start = datetime(year=business_day.year, month=business_day.month, day=business_day.day, hour=0, minute=0, second=0).astimezone(UTC)
end = start + timedelta(days=1)
self.log.info(f"Fetching {business_day} from [{start}, {end})")
self.log.debug(f"Fetching url: {url}")
self.log.info(f"Fetching range: [{start_date}, {end_date}) UTC / [{start_date.astimezone(WARSAW_TZ)}, {end_date.astimezone(WARSAW_TZ)}) Europe/Warsaw")
r = self.session.get(url, params=
{
"resolution": "hour",
"window_start":start.strftime("%Y-%m-%dT%H:%M:%SZ"),
"window_end": end.strftime("%Y-%m-%dT%H:%M:%SZ"),
"window_start":start_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
"window_end": end_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
}, timeout=30)
r.raise_for_status()
data = r.json()
out: List[Tuple[datetime, datetime, float, Dict[str, Any]]] = []
self.log.debug(f"Fetched {len(data['frames'])} data frames for [{start}, {end})")
self.log.debug(f"Fetched {len(data['frames'])} data frames for [{start_date}, {end_date}) UTC")
for frame in data['frames']:
row = self.parse_generic_price_frame(frame)
if row is not None:
out.append(row)
return out
def fetch_day(self, business_day: date, tz: timezone) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
start = datetime(
year=business_day.year,
month=business_day.month,
day=business_day.day,
hour=0,
minute=0,
second=0,
tzinfo=tz
).astimezone(UTC)
end = start + timedelta(days=1)
return self.fetch_range(start, end)
def parse_generic_price_frame(self, rec: dict):
"""
Wejście (przykład):
@ -83,15 +101,15 @@ class PstrykScraper(EnergyPriceScraperBase, HasLogger):
ts_start = datetime.fromisoformat(rec["start"]).astimezone(UTC)
ts_end = datetime.fromisoformat(rec["end"]).astimezone(UTC)
except Exception as e:
raise ValueError(f"Nieprawidłowe ISO w 'start'/'end': {e}") from e
raise ValueError(f"Bad iso timeformat in 'start'/'end': {e}") from e
if ts_end <= ts_start:
raise ValueError(f"Niepoprawny zakres: start={ts_start.isoformat()} end={ts_end.isoformat()}")
raise ValueError(f"Bad range: start={ts_start.isoformat()} end={ts_end.isoformat()}")
try:
price_pln_kwh_net = float(rec["price_net"])
except Exception as e:
raise ValueError(f"Brak lub nieprawidłowe 'price_net': {e}") from e
raise ValueError(f"Price net not available 'price_net': {e}") from e
meta = {

60
app.py
View File

@ -1,34 +1,63 @@
import os
import logging
from datetime import datetime, timedelta
# from pandas import timedelta_range
#
# import DistributionCostFactory
# from EnergyPriceProvider import DynamicPricesProvider
# from plot_cost_breakdown import plot_stacked_with_negatives
# from matplotlib import pyplot as plt
from zoneinfo import ZoneInfo
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import EnergyPriceScraperFactory
from utils.time_helpers import WARSAW_TZ
from logging_setup import setup_logging
from utils.time_helpers import WARSAW_TZ
from app_impl import hourly_tick
from logging_utils import function_logger
setup_logging(logging.DEBUG)
conn = EnergyPriceScraperFactory.setup_db()
# sched = BlockingScheduler(timezone=WARSAW_TZ)
# def run_pstryk():
# log = function_logger()
# log.info("Running PSE-RCE")
# pstryk_scraper = EnergyPriceScraperFactory.create("Pstryk", conn=conn)
# # 11:30 and hourly safety check
# hourly_tick(pstryk_scraper)
#
# def run_PSE_RCE():
# log = function_logger()
# log.info("Running PSE-RCE")
# scraper = EnergyPriceScraperFactory.create("PSE_RCE", conn=conn)
# hourly_tick(scraper)
#
#
# # Daily triggers at exact local times (DST-safe)
# sched.add_job(run_pstryk, 'interval', hours=1)
# sched.add_job(run_PSE_RCE, 'interval', hours=1)
#
# sched.start()
# conn.close()
if __name__ == "__main__":
setup_logging(logging.DEBUG)
# path = "electricity_prices_day_ahead_hourly_all.csv"
conn = EnergyPriceScraperFactory.setup_db()
# scraper = EnergyPriceScraperFactory.create("InstratRDN_CSV", conn=conn, path=path)
scraper = EnergyPriceScraperFactory.create("PSE_RCE", conn=conn)
# scraper = EnergyPriceScraperFactory.create("Pstryk", conn=conn)
# scraper = EnergyPriceScraperFactory.create("PSE_RCE", conn=conn)
scraper = EnergyPriceScraperFactory.create("Pstryk", conn=conn)
# scraper.ingest_day(datetime.now(tz=WARSAW_TZ))
# scraper.ingest_day(datetime.now(tz=WARSAW_TZ).date())
# scraper.ingest_day(datetime.now(tz=WARSAW_TZ).date()+timedelta(days=1))
last = scraper.last_entry()
if last is not None
print(f"last day {scraper.last_entry().astimezone(WARSAW_TZ)}")
hourly_tick(scraper)
conn.close()
# last = scraper.last_entry()
# scraper.ingest_day(datetime.now(tz=WARSAW_TZ) )
#
# distribution_cost = DistributionCostFactory.create("TauronG13")
# energy_price = DynamicPricesProvider.DynamicPricesProvider(PROVIDER="instrat", conn=conn, KIND="fixing I", SIDE="buy")
#
@ -57,4 +86,3 @@ if __name__ == "__main__":
#
# plt.show()
conn.close()

88
app_impl.py Normal file
View File

@ -0,0 +1,88 @@
from __future__ import annotations
from datetime import datetime, date, time, timedelta, timezone
from zoneinfo import ZoneInfo
from typing import Sequence, Tuple, Optional
from logging_utils import function_logger
from utils.time_helpers import WARSAW_TZ, UTC
def local_midnight(d: date, tz: ZoneInfo) -> datetime:
"""Return local midnight (tz-aware) for a given calendar date."""
return datetime(d.year, d.month, d.day, 0, 0, tzinfo=tz)
def end_of_business_day_utc(business_day: date, tz: ZoneInfo) -> datetime:
"""
End of the business day [start, end) expressed in UTC.
For day D this is local midnight of D+1 converted to UTC.
Correct across 23h/25h DST days.
"""
end_local = local_midnight(business_day + timedelta(days=1), tz=tz)
return end_local.astimezone(UTC)
def is_day_fully_ingested(scraper, business_day: date, tz: ZoneInfo) -> bool:
"""
Decide if 'business_day' is fully present in DB based on scraper.last_entry().
Assumes that a fully ingested day ends at local midnight of D+1 in the DB.
"""
log = function_logger()
last: Optional[datetime] = scraper.last_entry()
if not last:
log.info("No last entry found for scrapper")
return False
log.debug("Last entry: {}".format(last))
# Normalize to UTC in case session TimeZone differs
assert last.tzinfo is not None, "last must be tz-aware"
needed_end_utc = end_of_business_day_utc(business_day, tz)
return last >= needed_end_utc
def maybe_ingest_for_next_day(
scraper,
now_local: datetime,
tz: ZoneInfo
) -> int:
"""
If the publication time for tomorrow has passed (with grace), and tomorrow is not in DB yet,
call ingest_day(tomorrow). Return the number of rows affected by the ingestion.
"""
log = function_logger()
log.debug("Start")
assert now_local.tzinfo is not None, "now_local must be tz-aware (Europe/Warsaw)."
today = now_local.date()
# Publication happens today for the next business day
target_day = today + timedelta(days=1)
log.info("Target day = {}".format(target_day))
if is_day_fully_ingested(scraper, target_day, tz):
log.info("Publication already happend, we got the data already, skipping")
return 0
# Ingest the next business day
log.info("Target day is NOT fully ingested, trying to read {}".format(target_day))
ingested_rows = scraper.ingest_day(target_day)
log.debug("Ingested rows = {}".format(ingested_rows))
return ingested_rows
def hourly_tick(
scraper
) -> int:
"""
Run this once per hour.
- Backfills if needed (once on startup or when the DB lags).
- Attempts ingestion for the next business day after each publication window.
Returns (affected_from_backfill, affected_from_next_day).
"""
log = function_logger()
now_local = datetime.now(WARSAW_TZ)
log.info(f"Hourly tick started for {now_local}")
if now_local.time().hour < 11:
log.info("Hourly tick skipped, too early for update")
return 0
# 1) Try to ingest the next business day after each con figured window
affected_next = 0
affected_next += maybe_ingest_for_next_day(scraper, now_local, WARSAW_TZ)
log.info(f"Hourly tick ended for {now_local} with {affected_next} affected rows")
return affected_next

View File

@ -26,3 +26,10 @@ fi
chown -R energy:energy "$APP_DIR"
echo "Install complete."
sudo install -m 0755 $APP_DIR/os/energy-price-scrapers-update.sh /usr/local/bin/energy-price-scrapers-update
sudo install -m 0755 $APP_DIR/os/energy-price-scrapers.service /etc/systemd/system/energy-price-scrapers.service
sudo systemctl daemon-reload
sudo systemctl enable --now energy-price-scrapers.service
sudo systemctl status energy-price-scrapers.service

View File

@ -1,5 +1,6 @@
import logging
from contextlib import contextmanager
import inspect
def _fmt_ctx(ctx: dict) -> str:
# Build a single bracketed context block for the formatter: "[k=v a=b] "
@ -45,21 +46,14 @@ class HasLogger:
yield self.child_logger(**extra)
#class PSE_RCEScraper(HasLogger):
# SIDE = "sell"
#
# def __init__(self, **kwargs):
# super().__init__(logger_context={"side": self.SIDE})
# self.log.info("Initialized")
#
# def fetch_day(self, day):
# # Add per-call context without changing the base context
# with self.scoped_log(day=day) as log:
# log.info("Fetching")
# # ...
# log.info("Fetched %d rows", 96)
#
# def switch_side(self, side: str):
# # Persistently extend instance context
# self.set_logger_context(side=side)
# self.log.info("Side updated")
def function_logger(name: str | None = None, **context):
"""Return a LoggerAdapter that follows the HasLogger format for free functions."""
# Derive a readable default name: "<module>.<function>"
if name is None:
frame = inspect.currentframe().f_back # caller
func = frame.f_code.co_name
mod = frame.f_globals.get("__name__", "app")
name = f"{mod}.{func}"
h = HasLogger()
h.init_logger(name=name, context=context)
return h.log

File diff suppressed because it is too large Load Diff