72 lines
2.2 KiB
Python
72 lines
2.2 KiB
Python
import os
|
|
from datetime import timedelta
|
|
import json
|
|
import psycopg
|
|
import pandas as pd
|
|
import zoneinfo
|
|
|
|
TZ = zoneinfo.ZoneInfo("Europe/Warsaw")
|
|
|
|
DB_HOST = os.getenv("PGHOST", "192.168.30.10")
|
|
DB_PORT = int(os.getenv("PGPORT", "5432"))
|
|
DB_NAME = os.getenv("PGDATABASE", "postgres")
|
|
DB_USER = os.getenv("PGUSER", "energy_ingest")
|
|
DB_PASS = os.getenv("PGPASSWORD", "2f1rLCa03mQrbmlCbD6envk")
|
|
|
|
UPSERT_SQL = """
|
|
INSERT INTO pricing.energy_prices
|
|
(ts_start, ts_end, price_pln_net, provider, kind, side, buyer, seller, source_meta)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (ts_start, ts_end, provider, kind, side)
|
|
DO UPDATE SET
|
|
price_pln_net = EXCLUDED.price_pln_net,
|
|
buyer = EXCLUDED.buyer,
|
|
seller = EXCLUDED.seller,
|
|
source_meta = COALESCE(pricing.energy_prices.source_meta, '{}'::jsonb)
|
|
|| COALESCE(EXCLUDED.source_meta, '{}'::jsonb),
|
|
inserted_at = now();
|
|
"""
|
|
|
|
def setup_db():
|
|
# psycopg 3
|
|
conn = psycopg.connect(
|
|
host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASS
|
|
)
|
|
return conn
|
|
|
|
def rows_from_series(series_pln_per_kwh: pd.Series, provider: str, kind: str,
|
|
period: timedelta = timedelta(hours=1), meta: dict | None = None):
|
|
"""
|
|
Zamienia serię godzinową (index = start okresu, tz-aware) na listę wierszy dla upsertu.
|
|
"""
|
|
if series_pln_per_kwh.empty:
|
|
return []
|
|
|
|
s = series_pln_per_kwh.copy()
|
|
idx = s.index
|
|
if getattr(idx, "tz", None) is None:
|
|
idx = idx.tz_localize(TZ, nonexistent="shift_forward", ambiguous="infer")
|
|
s.index = idx
|
|
else:
|
|
s = s.tz_convert(TZ)
|
|
|
|
meta_json = json.dumps(meta or {})
|
|
rows = []
|
|
for ts_start, price in s.dropna().items():
|
|
ts_end = ts_start + period
|
|
rows.append((ts_start, ts_end, float(price), provider, kind, meta_json))
|
|
return rows
|
|
|
|
def upsert_energy_prices(conn, rows):
|
|
"""
|
|
rows: iterable krotek:
|
|
(ts_start, ts_end, price_pln_net, provider, kind, source_meta_json)
|
|
"""
|
|
if not rows:
|
|
return
|
|
with conn.cursor() as cur:
|
|
cur.executemany(UPSERT_SQL, rows)
|
|
conn.commit()
|
|
|
|
if __name__ == "__main__":
|
|
conn = setup_db() |