91 lines
3.5 KiB
Python
91 lines
3.5 KiB
Python
from __future__ import annotations
|
||
from datetime import datetime, timedelta, date, timezone
|
||
from typing import List, Tuple, Dict, Any
|
||
import requests
|
||
|
||
from EnergyPriceScraper import EnergyPriceScraperBase
|
||
from utils.time import UTC, WARSAW_TZ
|
||
from utils.logging import HasLogger
|
||
|
||
|
||
class PSE_RCEScraper(EnergyPriceScraperBase, HasLogger):
|
||
"""
|
||
PSE RCE (PLN) – godziny dla danej doby.
|
||
Zwraca NETTO PLN/kWh (jeżeli RCE jest w PLN/MWh, dzielimy przez 1000).
|
||
"""
|
||
api_url: str = "https://api.raporty.pse.pl/api/rce-pln"
|
||
session: requests.Session
|
||
|
||
def __init__(self, **kwargs):
|
||
super().__init__(**kwargs)
|
||
|
||
self.PROVIDER = "PSE"
|
||
self.KIND = "rce"
|
||
self.SIDE = "sell"
|
||
self.BUYER = "reteiler"
|
||
self.SELLER = "prosumer"
|
||
|
||
self.init_logger()
|
||
|
||
self.session = requests.Session()
|
||
self.session.headers.update({"accept": "application/json"})
|
||
self.log.info("Initializing PSE RCE Done")
|
||
|
||
def fetch_range(self, start_date: datetime, end_date: datetime) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
|
||
assert start_date < end_date
|
||
assert start_date.tzinfo is not None
|
||
assert end_date.tzinfo is not None
|
||
assert start_date.tzinfo == UTC
|
||
assert end_date.tzinfo == UTC
|
||
assert end_date - start_date == timedelta(days=1) # for now no way to import more than one day
|
||
|
||
self.log.info(f"Fetching range: [{start_date}, {end_date}) UTC / [{start_date.astimezone(WARSAW_TZ)}, {end_date.astimezone(WARSAW_TZ)}) Europe/Warsaw")
|
||
business_day = start_date.astimezone(WARSAW_TZ).date()
|
||
self.log.debug(f"business_day: {business_day}")
|
||
|
||
# RCE v2: filter by business_date, select rce_pln,dtime,period
|
||
params = {
|
||
"$select": "rce_pln,publication_ts_utc,dtime_utc,business_date",
|
||
"$filter": f"business_date eq '{business_day:%Y-%m-%d}'",
|
||
}
|
||
r = self.session.get(self.api_url, params=params, timeout=30)
|
||
r.raise_for_status()
|
||
data = r.json().get("value", [])
|
||
self.log.debug(f"Fetched data len: {len(data)} points")
|
||
|
||
out: List[Tuple[datetime, datetime, float, Dict[str, Any]]] = []
|
||
for item in data:
|
||
out.append(self.parse_pse_rce_record(item))
|
||
|
||
return out
|
||
|
||
def fetch_day(self, business_day: date, tz: timezone) -> List[Tuple[datetime, datetime, float, Dict[str, Any]]]:
|
||
start = datetime.combine(business_day, datetime.min.time(), tzinfo=tz).astimezone(UTC)
|
||
end = start + timedelta(days=1)
|
||
return self.fetch_range(start, end)
|
||
|
||
@staticmethod
|
||
def PSE_date_range(dtime: datetime):
|
||
ts_end = dtime
|
||
ts_start = dtime - timedelta(minutes=15)
|
||
|
||
return ts_start, ts_end
|
||
|
||
@staticmethod
|
||
def parse_pse_rce_record(rec: dict):
|
||
# 'dtime' date is the END of timeslot, so begining is dtime - t_stop
|
||
dtime_utc = datetime.strptime(rec["dtime_utc"], "%Y-%m-%d %H:%M:%S").replace(tzinfo=UTC)
|
||
ts_start, ts_end = PSE_RCEScraper.PSE_date_range(dtime=dtime_utc)
|
||
|
||
price_pln_mwh = float(rec["rce_pln"])
|
||
price_pln_kwh = price_pln_mwh / 1000.0
|
||
|
||
meta = {
|
||
"business_date": rec["business_date"],
|
||
"source": "PSE_RCE",
|
||
"publication_ts_utc": rec["publication_ts_utc"],
|
||
"unit": "PLN/kWh",
|
||
"taxes_included": False
|
||
}
|
||
return ts_start, ts_end, price_pln_kwh, meta
|