Source code for nrel.hive.state.simulation_state.update.charging_price_update

from __future__ import annotations

import functools as ft
import logging
from csv import DictReader
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Tuple, Optional, Dict

import h3
import immutables

from nrel.hive.model.energy.charger import build_chargers_table
from nrel.hive.model.sim_time import SimTime
from nrel.hive.runner.environment import Environment
from nrel.hive.state.simulation_state import simulation_state_ops
from nrel.hive.state.simulation_state.simulation_state import SimulationState
from nrel.hive.state.simulation_state.update.simulation_update import SimulationUpdateFunction
from nrel.hive.util import DictOps
from nrel.hive.util.iterators import DictReaderStepper
from nrel.hive.util.typealiases import StationId, ChargerId
from nrel.hive.util.units import Currency

log = logging.getLogger(__name__)


[docs]@dataclass(frozen=True) class ChargingPriceUpdate(SimulationUpdateFunction): """ loads charging prices from a file or sets all prices to zero if none provided """ reader: DictReaderStepper use_defaults: bool
[docs] @classmethod def build( cls, charging_price_file: Optional[str], chargers_file: str, lazy_file_reading: bool = False, ) -> ChargingPriceUpdate: """ reads a requests file and builds a ChargingPriceUpdate SimulationUpdateFunction if no charging_file is specified, builds a price update which sets all charger_id types to a kw price of zero Currency units per kw :param charging_price_file: the optional charging price file provided by the user :param chargers_file: the file listing all chargers for this scenario :param lazy_file_reading: if true, we load all file data into memory :return: a SimulationUpdate function pointing at the first line of a request file :raises: an exception if there were file reading issues """ if not charging_price_file: # assign default price of $0.00 for any charger types table = build_chargers_table(chargers_file) def create_default_price( acc: Tuple[Dict, ...], charger_id: ChargerId ) -> Tuple[Dict, ...]: default_price = { "time": "0", "station_id": "default", "charger_id": charger_id, "price_kwh": "0.0", } return acc + (default_price,) initial: Tuple[Dict, ...] = () fallback_values = ft.reduce(create_default_price, table.keys(), initial) stepper = DictReaderStepper.from_iterator( iter(fallback_values), "time", parser=SimTime.build ) return ChargingPriceUpdate(reader=stepper, use_defaults=True) else: charging_path = Path(charging_price_file) if not charging_path.is_file(): raise IOError(f"{charging_price_file} is not a valid path to a request file") else: if lazy_file_reading: error, maybe_stepper = DictReaderStepper.build( charging_path, "time", parser=SimTime.build ) if error: raise error elif maybe_stepper is None: raise Exception("Was not able to build DictReaderStepper") else: stepper = maybe_stepper else: with charging_path.open() as f: reader = iter(tuple(DictReader(f))) stepper = DictReaderStepper.from_iterator(reader, "time", parser=SimTime.build) return ChargingPriceUpdate(stepper, False)
[docs] def update( self, sim_state: SimulationState, env: Environment ) -> Tuple[SimulationState, Optional[ChargingPriceUpdate]]: """ update charging price when the simulation reaches the update's time :param sim_state: the current sim state :param env: static environment variables :return: sim state plus new requests """ current_sim_time = sim_state.sim_time def stop_condition(value: int) -> bool: return value < current_sim_time # parse the most recently available charger_id price data up to the current sim time initial: immutables.Map[str, immutables.Map[ChargerId, Currency]] = immutables.Map() charger_update = ft.reduce( _add_row_to_this_update, self.reader.read_until_stop_condition(stop_condition), initial, ) if len(charger_update) == 0: # no update return sim_state, self elif self.use_defaults: # we are applying the same values across all Stations # the default constructor creates one station_id called "default" and we # apply it to every station here. result = ft.reduce( lambda sim, s_id: _update_station_prices(sim, s_id, charger_update["default"]), sim_state.get_station_ids(), sim_state, ) return result, self else: # apply update to all stations # if these updates are in the form of GeoIds, map them to StationIds as_station_updates = _map_to_station_ids(charger_update, sim_state) station_ids_to_update = set(sim_state.get_station_ids()).union( as_station_updates.keys() ) # we are applying only the updates related to valid StationIds with updates result = ft.reduce( lambda sim, s_id: _update_station_prices(sim, s_id, as_station_updates[s_id]), station_ids_to_update, sim_state, ) return result, self
[docs]def _add_row_to_this_update( acc: immutables.Map[str, immutables.Map[ChargerId, Currency]], row: Dict[str, str], ) -> immutables.Map[str, immutables.Map[ChargerId, Currency]]: """ adds a single row to an accumulator that is storing only the most recently observed {StationId|GeoId}/charger_id/currency combinations :param acc: the accumulator :param row: the row to add :return: the updated accumulator """ rows = acc try: price = float(row["price_kwh"]) charger_id = row["charger_id"] if "station_id" in row: station_id = row["station_id"] this_entry = rows[station_id] if rows.get(station_id) else immutables.Map() updated = DictOps.add_to_dict(rows, station_id, this_entry.set(charger_id, price)) return updated elif "geoid" in row: geoid = row["geoid"] this_entry = rows[geoid] if rows.get(geoid) else immutables.Map() updated = DictOps.add_to_dict(rows, geoid, this_entry.set(charger_id, price)) return updated else: log.error(f"missing geoid|station_id for row: {row}") return rows except Exception as e: log.error(f"error: {e.args} for row {row}") return rows
[docs]def _update_station_prices( simulation_state: SimulationState, station_id: StationId, prices_update: immutables.Map[ChargerId, Currency], ) -> SimulationState: """ updates a simulation state with prices for a station by station id :param result: the simulation state in a partial update state :param station_id: the station id of the station to update (assumed to be valid) :param prices_update: the prices in Currency that we are updating for each Charger :return: the updated SimulationState with station prices modified """ station = simulation_state.stations.get(station_id) if not station: return simulation_state else: error, updated_station = station.update_prices(prices_update) if error is not None: log.error(error) return simulation_state elif updated_station is None: return simulation_state else: error, updated_sim = simulation_state_ops.modify_station( simulation_state, updated_station ) if error: log.error(error) return simulation_state elif updated_sim is None: return simulation_state else: return updated_sim
[docs]def _map_to_station_ids( this_update: immutables.Map[str, immutables.Map[ChargerId, Currency]], sim: SimulationState, ) -> immutables.Map[StationId, immutables.Map[ChargerId, Currency]]: """ in the case that updates are written by GeoId, map those to StationIds :param this_update: the update, which may be by StationId or GeoId :param sim: the SimulationState provides h3 resolution and lookup tables :return: the price data organized by StationId """ updated = {} # refactor using immutables.Map()? for k in this_update.keys(): if k in sim.stations: # k is a StationId; leave as is updated.update({k: this_update[k]}) else: # k may be a geoid try: res = h3.h3_get_resolution(k) # find the set of all station search geoids corresponding with the # provided station charge price geoid search_geoids: Tuple[Any, ...] = (k,) if res > sim.sim_h3_search_resolution: search_geoids = (h3.h3_to_parent(k, sim.sim_h3_search_resolution),) elif res < sim.sim_h3_search_resolution: search_geoids = tuple(h3.h3_to_children(k, sim.sim_h3_search_resolution)) station_ids = ( station_id for search_geoid in search_geoids if sim.s_search.get(search_geoid) for station_id in sim.s_search[search_geoid] ) # all of these station ids should get entries managers the provided geoid for station_id in station_ids: updated.update({station_id: this_update[k]}) except ValueError as e: # todo: handle failure here log.debug(f"tried to update charging price for key {k} but failed.") return immutables.Map(updated)