from __future__ import annotations from zoneinfo import ZoneInfo import pandas as pd import matplotlib.pyplot as plt from typing import List, Dict, Sequence, Optional from datetime import datetime import math WAW = ZoneInfo("Europe/Warsaw") def plot_stacked_with_negatives( rows: List[Dict[str, float]], *, x_key: str = "ts", # klucz opisujący oś X (np. datetime lub str) order: Optional[Sequence[str]] = None, # kolejność warstw; domyślnie heurystyka title: Optional[str] = None, ylabel: str = "PLN/kWh", sum_label: str = "Cena brutto", save_path: Optional[str] = None, ): """ Każdy element `rows` to słownik: { x_key: , "Nazwa składnika 1": val1, "Nazwa składnika 2": val2, ... } Funkcja NIC nie liczy „merytorycznie” – tylko rysuje to, co dostanie. - Wart. dodatnie układa w górę, ujemne w dół (oddzielne stacki). - Linia `sum_label` to suma wszystkich składników (bez x_key) dla każdego słupka. """ if not rows: raise ValueError("Brak danych: 'rows' jest puste.") # 1) etykiety osi X labels = [] for r in rows: if x_key not in r: raise KeyError(f"Brak klucza '{x_key}' w wierszu: {r}") xv = r[x_key] labels.append(xv.strftime("%Y-%m-%d\n%H:%M") if isinstance(xv, datetime) else str(xv)) # 2) lista kluczy (warstw) all_keys = [] for r in rows: for k in r.keys(): if k == x_key: continue if k not in all_keys: all_keys.append(k) if order: ordered = [k for k in order if k in all_keys] + [k for k in all_keys if k not in order] else: prefer = ["Dystrybucja (net)", "Energia (net)", "Podatki"] preferred = [k for k in prefer if k in all_keys] rest = sorted([k for k in all_keys if k not in preferred]) ordered = preferred + rest # 3) macierze wartości values_by_key = {k: [float(r.get(k, 0.0)) for r in rows] for k in ordered} n = len(rows) x = list(range(n)) # 4) rysuj – dwa stacki: dodatni (bottom_pos) i ujemny (bottom_neg) fig, ax = plt.subplots(figsize=(max(8, n * 0.35), 5)) bottom_pos = [0.0] * n bottom_neg = [0.0] * n legend_done = set() for k in ordered: vals = values_by_key[k] pos_vals = [v if v > 0 else 0.0 for v in vals] neg_vals = [v if v < 0 else 0.0 for v in vals] # zdecyduj, gdzie nadać etykietę do legendy (tylko raz) label_to_use = None if k not in legend_done and (any(v != 0 for v in vals)): label_to_use = k legend_done.add(k) # dodatnie if any(pos_vals): ax.bar(x, pos_vals, bottom=bottom_pos, label=label_to_use) bottom_pos = [b + v for b, v in zip(bottom_pos, pos_vals)] label_to_use = None # etykieta już wykorzystana # ujemne (wysokość ujemna + dolna krawędź = kumulacja ujemnych) if any(neg_vals): ax.bar(x, neg_vals, bottom=bottom_neg, label=label_to_use) bottom_neg = [b + v for b, v in zip(bottom_neg, neg_vals)] label_to_use = None # 5) linia sumy (brutto): suma wszystkich składników w słupku y_sum = [sum(values_by_key[k][i] for k in ordered) for i in range(n)] ax.plot(x, y_sum, label=sum_label) # 6) oś X – pionowe etykiety i przerzedzenie step = 1 if n <= 24 else math.ceil(n / 24) ax.set_xticks(x[::step], [labels[i] for i in range(0, n, step)], rotation=90, ha="center") ax.set_ylabel(ylabel) ax.set_xlabel("Okres") if title: ax.set_title(title) ax.legend() ax.grid(axis="y", linestyle="--", alpha=0.4) fig.tight_layout() if save_path: fig.savefig(save_path, dpi=150) return fig, ax