107 lines
3.9 KiB
Python
107 lines
3.9 KiB
Python
from __future__ import annotations
|
||
from zoneinfo import ZoneInfo
|
||
import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
from typing import List, Dict, Sequence, Optional
|
||
from datetime import datetime
|
||
import math
|
||
|
||
WAW = ZoneInfo("Europe/Warsaw")
|
||
|
||
def plot_stacked_with_negatives(
|
||
rows: List[Dict[str, float]],
|
||
*,
|
||
x_key: str = "ts", # klucz opisujący oś X (np. datetime lub str)
|
||
order: Optional[Sequence[str]] = None, # kolejność warstw; domyślnie heurystyka
|
||
title: Optional[str] = None,
|
||
ylabel: str = "PLN/kWh",
|
||
sum_label: str = "Cena brutto",
|
||
save_path: Optional[str] = None,
|
||
):
|
||
"""
|
||
Każdy element `rows` to słownik:
|
||
{ x_key: <datetime|str>, "Nazwa składnika 1": val1, "Nazwa składnika 2": val2, ... }
|
||
Funkcja NIC nie liczy „merytorycznie” – tylko rysuje to, co dostanie.
|
||
- Wart. dodatnie układa w górę, ujemne w dół (oddzielne stacki).
|
||
- Linia `sum_label` to suma wszystkich składników (bez x_key) dla każdego słupka.
|
||
"""
|
||
if not rows:
|
||
raise ValueError("Brak danych: 'rows' jest puste.")
|
||
|
||
# 1) etykiety osi X
|
||
labels = []
|
||
for r in rows:
|
||
if x_key not in r:
|
||
raise KeyError(f"Brak klucza '{x_key}' w wierszu: {r}")
|
||
xv = r[x_key]
|
||
labels.append(xv.strftime("%Y-%m-%d\n%H:%M") if isinstance(xv, datetime) else str(xv))
|
||
|
||
# 2) lista kluczy (warstw)
|
||
all_keys = []
|
||
for r in rows:
|
||
for k in r.keys():
|
||
if k == x_key:
|
||
continue
|
||
if k not in all_keys:
|
||
all_keys.append(k)
|
||
|
||
if order:
|
||
ordered = [k for k in order if k in all_keys] + [k for k in all_keys if k not in order]
|
||
else:
|
||
prefer = ["Dystrybucja (net)", "Energia (net)", "Podatki"]
|
||
preferred = [k for k in prefer if k in all_keys]
|
||
rest = sorted([k for k in all_keys if k not in preferred])
|
||
ordered = preferred + rest
|
||
|
||
# 3) macierze wartości
|
||
values_by_key = {k: [float(r.get(k, 0.0)) for r in rows] for k in ordered}
|
||
n = len(rows)
|
||
x = list(range(n))
|
||
|
||
# 4) rysuj – dwa stacki: dodatni (bottom_pos) i ujemny (bottom_neg)
|
||
fig, ax = plt.subplots(figsize=(max(8, n * 0.35), 5))
|
||
bottom_pos = [0.0] * n
|
||
bottom_neg = [0.0] * n
|
||
legend_done = set()
|
||
|
||
for k in ordered:
|
||
vals = values_by_key[k]
|
||
pos_vals = [v if v > 0 else 0.0 for v in vals]
|
||
neg_vals = [v if v < 0 else 0.0 for v in vals]
|
||
|
||
# zdecyduj, gdzie nadać etykietę do legendy (tylko raz)
|
||
label_to_use = None
|
||
if k not in legend_done and (any(v != 0 for v in vals)):
|
||
label_to_use = k
|
||
legend_done.add(k)
|
||
|
||
# dodatnie
|
||
if any(pos_vals):
|
||
ax.bar(x, pos_vals, bottom=bottom_pos, label=label_to_use)
|
||
bottom_pos = [b + v for b, v in zip(bottom_pos, pos_vals)]
|
||
label_to_use = None # etykieta już wykorzystana
|
||
|
||
# ujemne (wysokość ujemna + dolna krawędź = kumulacja ujemnych)
|
||
if any(neg_vals):
|
||
ax.bar(x, neg_vals, bottom=bottom_neg, label=label_to_use)
|
||
bottom_neg = [b + v for b, v in zip(bottom_neg, neg_vals)]
|
||
label_to_use = None
|
||
|
||
# 5) linia sumy (brutto): suma wszystkich składników w słupku
|
||
y_sum = [sum(values_by_key[k][i] for k in ordered) for i in range(n)]
|
||
ax.plot(x, y_sum, label=sum_label)
|
||
|
||
# 6) oś X – pionowe etykiety i przerzedzenie
|
||
step = 1 if n <= 24 else math.ceil(n / 24)
|
||
ax.set_xticks(x[::step], [labels[i] for i in range(0, n, step)], rotation=90, ha="center")
|
||
|
||
ax.set_ylabel(ylabel)
|
||
ax.set_xlabel("Okres")
|
||
if title:
|
||
ax.set_title(title)
|
||
ax.legend()
|
||
ax.grid(axis="y", linestyle="--", alpha=0.4)
|
||
fig.tight_layout()
|
||
if save_path:
|
||
fig.savefig(save_path, dpi=150)
|
||
return fig, ax |