Source code for nrel.hive.initialization.initialize_simulation_with_sampling

from __future__ import annotations

import csv
import functools as ft
import logging
from pathlib import Path
from typing import Tuple, Dict, Optional, Callable

import immutables

from nrel.hive.config import HiveConfig
from nrel.hive.initialization.sample_vehicles import (
    sample_vehicles,
    build_default_location_sampling_fn,
    build_default_soc_sampling_fn,
)
from nrel.hive.model.base import Base
from nrel.hive.model.energy.charger import build_chargers_table
from nrel.hive.model.roadnetwork.haversine_roadnetwork import HaversineRoadNetwork
from nrel.hive.model.roadnetwork.link import Link
from nrel.hive.model.roadnetwork.osm.osm_roadnetwork import OSMRoadNetwork
from nrel.hive.model.station.station import Station
from nrel.hive.model.vehicle.mechatronics import build_mechatronics_table
from nrel.hive.model.vehicle.schedules import build_schedules_table, ScheduleId, ScheduleFunction
from nrel.hive.runner.environment import Environment
from nrel.hive.state.simulation_state import simulation_state_ops
from nrel.hive.state.simulation_state.simulation_state import SimulationState
from nrel.hive.util import DictOps, Ratio
from nrel.hive.util.fp import throw_or_return

log = logging.getLogger(__name__)


[docs]def initialize_simulation_with_sampling( config: HiveConfig, vehicle_count: int, vehicle_location_sampling_function: Optional[Callable[..., Link]] = None, vehicle_soc_sampling_function: Optional[Callable[..., Ratio]] = None, random_seed: int = 0, ) -> Tuple[SimulationState, Environment]: """ constructs a SimulationState and Environment with sampled vehicles. uses sampling functions to build vehicles :param config: the configuration of this run :param vehicle_count: how many vehicles to initialize :param vehicle_location_sampling_function: an optional location sampling function; uses default if none :param vehicle_soc_sampling_function: an optional vehicle soc sampling function; uses default if none :param random_seed: the random seed used for all sampling functions :return: a Simulation State and an Environment :raises Exception due to IOErrors, missing keys in DictReader rows, or parsing errors """ # set up road network based on user-configured road network type if config.network.network_type == "euclidean": haversine_road_network = HaversineRoadNetwork( sim_h3_resolution=config.sim.sim_h3_resolution ) sim_initial = SimulationState( road_network=haversine_road_network, sim_time=config.sim.start_time, sim_timestep_duration_seconds=config.sim.timestep_duration_seconds, sim_h3_location_resolution=config.sim.sim_h3_resolution, sim_h3_search_resolution=config.sim.sim_h3_search_resolution, ) elif config.network.network_type == "osm_network": if config.input_config.road_network_file is None: raise ValueError("road_network_file required for osm_network") osm_road_network = OSMRoadNetwork.from_file( sim_h3_resolution=config.sim.sim_h3_resolution, road_network_file=Path(config.input_config.road_network_file), default_speed_kmph=config.network.default_speed_kmph, ) sim_initial = SimulationState( road_network=osm_road_network, sim_time=config.sim.start_time, sim_timestep_duration_seconds=config.sim.timestep_duration_seconds, sim_h3_location_resolution=config.sim.sim_h3_resolution, sim_h3_search_resolution=config.sim.sim_h3_search_resolution, ) else: raise IOError( f"road network type {config.network.network_type} not valid, must be one of {{euclidean|osm_network}}" ) # create simulation environment if config.input_config.fleets_file: log.warning( "the simulation is using sampling which doesn't support a fleet file input;\n" "this input will be ignored and entities will not have any fleet information." ) if config.input_config.schedules_file is None: schedules: immutables.Map[ScheduleId, ScheduleFunction] = immutables.Map() else: schedules = build_schedules_table( config.sim.schedule_type, config.input_config.schedules_file ) env = Environment( config=config, mechatronics=build_mechatronics_table( config.input_config.mechatronics_file, config.input_config.scenario_directory, ), chargers=build_chargers_table(config.input_config.chargers_file), schedules=schedules, ) # populate simulation with static entities sim_with_bases = _build_bases(config.input_config.bases_file, sim_initial) sim_with_stations = _build_stations(config.input_config.stations_file, sim_with_bases, env) # sample vehicles if not vehicle_location_sampling_function: vehicle_location_sampling_function = build_default_location_sampling_fn(seed=random_seed) if not vehicle_soc_sampling_function: vehicle_soc_sampling_function = build_default_soc_sampling_fn(seed=random_seed) sample_result = sample_vehicles( count=vehicle_count, sim=sim_with_stations, env=env, location_sampling_function=vehicle_location_sampling_function, soc_sampling_function=vehicle_soc_sampling_function, ) sim_w_vehicles = throw_or_return(sample_result) return sim_w_vehicles, env
[docs]def _build_bases( bases_file: str, simulation_state: SimulationState, ) -> SimulationState: """ all your base are belong to us :param bases_file: path to file with bases :param simulation_state: the partial simulation state :return: the simulation state with all bases in it :raises Exception if a parse error in Base.from_row or any error adding the Base to the Sim """ def _collect_base(row: Dict[str, str]) -> Base: base = Base.from_row(row, simulation_state.road_network) return base # add all bases from the base file with open(bases_file, "r", encoding="utf-8-sig") as bf: reader = csv.DictReader(bf) bases = [_collect_base(row) for row in reader] sim_with_bases = simulation_state_ops.add_entities(simulation_state, bases) return sim_with_bases
[docs]def _build_stations( stations_file: str, simulation_state: SimulationState, env: Environment ) -> SimulationState: """ all your station are belong to us :param stations_file: the file with stations in it :param simulation_state: the partial simulation state :return: the resulting simulation state with all stations in it :raises Exception if parsing a Station row failed or adding a Station to the Simulation failed """ def _add_row_unsafe( builder: immutables.Map[str, Station], row: Dict[str, str] ) -> immutables.Map[str, Station]: station = Station.from_row(row, builder, simulation_state.road_network, env) updated_builder = DictOps.add_to_dict(builder, station.id, station) return updated_builder # grab all stations (some may exist on multiple rows) with open(stations_file, "r", encoding="utf-8-sig") as bf: reader = csv.DictReader(bf) stations_builder: immutables.Map[str, Station] = ft.reduce( _add_row_unsafe, reader, immutables.Map() ) # add all stations to the simulation once we know they are complete stations = DictOps.iterate_vals(stations_builder) sim_with_stations = simulation_state_ops.add_entities(simulation_state, stations) return sim_with_stations