victron-shedule-generator/src/victron_energy_price_calculator/VictronPriceWriter.py
Bartosz Wieczorek 87c6938dd6 fix package
2025-09-09 15:39:24 +02:00

145 lines
5.2 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone, date
from typing import List, Dict, Any, Optional
from zoneinfo import ZoneInfo
import json
import psycopg
from .PriceCalculator import PriceCalculator
from .utils.time import WARSAW_TZ
# --- Dataclass do insertu do victron.vrm_price_daily -------------------------
@dataclass
class VRMPriceRow:
for_date: date
provider: str
side: str # 'buy' | 'sell'
schedule: List[Dict[str, Any]] # [{"from":"HH:MM","to":"HH:MM","price":0.12}, ...]
price_components: Dict[str, Any] = field(default_factory=dict)
source_meta: Dict[str, Any] = field(default_factory=dict)
def to_sql_params(self) -> tuple:
return (
self.for_date,
self.provider,
self.side,
json.dumps(self.schedule, ensure_ascii=False),
json.dumps(self.price_components, ensure_ascii=False),
json.dumps(self.source_meta, ensure_ascii=False)
)
# --- Writer: buduje dobowe schedule (bez scalania) i ZAWSZE upsertuje --------
@dataclass
class VictronPriceDailyWriter:
conn: psycopg.Connection
provider: str
side: str # 'buy' | 'sell'
calculator: PriceCalculator
period: timedelta = timedelta(hours=1) # brak scalania: stały krok, domyślnie 1h
table: str = "victron.vrm_price_daily"
tz: ZoneInfo = WARSAW_TZ
def _bounds_local(self, d: date) -> tuple[datetime, datetime]:
start = datetime(d.year, d.month, d.day, 0, 0, tzinfo=self.tz)
return start, start + timedelta(days=1)
def _fmt_hhmm(self, dt: datetime) -> str:
return dt.strftime("%H:%M")
def build_schedule_for_day(self, d: date) -> List[Dict[str, Any]]:
"""
Zwraca listę odcinków po stałym kroku (23/24/25 „godzin” zależnie od DST),
BEZ scalania. Dla ostatniego segmentu 'to' = '00:00'.
Rzuca KeyError jeśli brakuje stawki dla któregokolwiek segmentu.
"""
start_local, end_local = self._bounds_local(d)
segments: List[Dict[str, Any]] = []
t = start_local
while t < end_local:
t_next = t + self.period
if t_next > end_local:
t_next = end_local
# pobranie brutto dla początku segmentu
price_gross = self.calculator.gross_price(t, side=self.side)
seg = {
"from": self._fmt_hhmm(t),
"to": "00:00" if t_next == end_local else self._fmt_hhmm(t_next),
"price": float(price_gross),
}
segments.append(seg)
t = t_next
# walidacja minimalna: pełna doba
if not segments or segments[0]["from"] != "00:00" or segments[-1]["to"] != "00:00":
raise KeyError(f"Niekompletna doba dla {d} (provider={self.provider}/{self.kind}/{self.side})")
return segments
def _ensure_conn(self) -> psycopg.Connection:
if self.conn is not None:
return self.conn
raise RuntimeError("Provide conn= to DynamicPricesProvider")
def upsert(self, row: VRMPriceRow) -> None:
"""
ZAWSZE update: na konflikcie nadpisuje schedule, price_components, source_meta
i RESETUJE sent_to_vrm=false (żeby Node-RED wysłał świeże dane).
"""
sql = f"""
INSERT INTO {self.table}
(for_date, provider, side, schedule, price_components, source_meta)
VALUES (%s, %s, %s, %s::jsonb, %s::jsonb, %s::jsonb)
ON CONFLICT (for_date, provider, side)
DO UPDATE SET
schedule = EXCLUDED.schedule,
price_components = EXCLUDED.price_components,
source_meta = {self.table}.source_meta || EXCLUDED.source_meta,
inserted_at = now();
"""
with self._ensure_conn() as con, con.cursor() as cur:
cur.execute(sql, row.to_sql_params())
con.commit()
def process_day(self, d: date, extra_meta: Optional[Dict[str, Any]] = None) -> VRMPriceRow:
schedule = self.build_schedule_for_day(d)
source_meta = {
"built_at": datetime.now(timezone.utc).isoformat(),
"period_minutes": int(self.period.total_seconds() // 60),
"provider": self.provider,
"side": self.side,
}
if extra_meta:
source_meta.update(extra_meta)
row = VRMPriceRow(
for_date=d,
provider=self.provider,
side=self.side,
schedule=schedule,
source_meta=source_meta
)
self.upsert(row)
return row
def process_range(self, start: date, end_inclusive: date, extra_meta: Optional[Dict[str, Any]] = None) -> int:
"""
Przetworzy dni od 'start' do 'end_inclusive' włącznie.
Zwraca liczbę udanych upsertów.
"""
count = 0
cur = start
while cur <= end_inclusive:
self.process_day(cur, extra_meta=extra_meta)
count += 1
cur += timedelta(days=1)
return count