Source code for nrel.hive.initialization.sample_vehicles

import functools as ft
import logging
import random
from typing import Callable

from returns.result import Result, Failure, Success

from nrel.hive.model.entity_position import EntityPosition
from nrel.hive.model.membership import Membership
from nrel.hive.model.roadnetwork.link import Link
from nrel.hive.model.roadnetwork.osm.osm_roadnetwork import OSMRoadNetwork
from nrel.hive.model.vehicle.vehicle import Vehicle
from nrel.hive.runner import Environment
from nrel.hive.state.driver_state.autonomous_driver_state.autonomous_available import (
    AutonomousAvailable,
)
from nrel.hive.state.driver_state.autonomous_driver_state.autonomous_driver_attributes import (
    AutonomousDriverAttributes,
)
from nrel.hive.state.simulation_state.simulation_state import SimulationState
from nrel.hive.state.simulation_state.simulation_state_ops import add_vehicle_safe
from nrel.hive.state.vehicle_state.idle import Idle
from nrel.hive.util import Ratio

log = logging.getLogger(__name__)


[docs]def sample_vehicles( count: int, sim: SimulationState, env: Environment, location_sampling_function: Callable[[SimulationState], Link], soc_sampling_function: Callable[[], Ratio], total_seats: int = 4, offset: int = 0, ) -> Result[SimulationState, Exception]: """ creates {count} vehicles using the provided sampling functions :param count: :param sim: :param env: :param location_sampling_function: samples the initial location for a vehicle :param soc_sampling_function: samples the initial state of charge for a vehicle :param offset: number to begin counting vehicle ids from (by default, begin counting from zero) :return: the updated setup, or, a failure """ mechatronics_id = random.choice(list(env.mechatronics.keys())) mechatronics = env.mechatronics.get(mechatronics_id) if not mechatronics: return Failure(KeyError(f"mechatronics with id {mechatronics_id} not found")) else: def _add_sample(i: int): """ returns a function which creates the i'th vehicle :param i: the number to associate with this sampled vehicle :return: a function that adds vehicle i to the SimulationState """ def _inner( s: SimulationState, ) -> Result[SimulationState, Exception]: """ attempts to add the i'th vehicle to this simulation state :param s: the SimulationState to update :return: the updated simulation state, or, an exception """ if not mechatronics: return Failure(KeyError(f"mechatronics with id {mechatronics_id} not found")) try: vehicle_id = f"v{i}" initial_soc = soc_sampling_function() energy = mechatronics.initial_energy(initial_soc) energy_expended = mechatronics.initial_energy(0) energy_gained = mechatronics.initial_energy(0) link = location_sampling_function(s) position = EntityPosition(link.link_id, link.start) vehicle_state = Idle.build(vehicle_id) driver_state = AutonomousAvailable(AutonomousDriverAttributes(vehicle_id)) vehicle = Vehicle( id=vehicle_id, mechatronics_id=mechatronics_id, energy=energy, energy_expended=energy_expended, energy_gained=energy_gained, position=position, vehicle_state=vehicle_state, driver_state=driver_state, total_seats=total_seats, membership=Membership(), ) add_result = add_vehicle_safe(s, vehicle) return add_result except Exception as e: return Failure(e) return _inner log.info( f"sampling vehicles {offset} through {offset + count - 1} ({count} vehicles) with mechatronics id {mechatronics_id}" ) # sample i vehicles, adding each to the sim # fail fast if an exception is encountered result: Result[SimulationState, Exception] = ft.reduce( lambda acc, i: acc.bind(_add_sample(i)), range(offset, offset + count), Success(sim), ) return result
[docs]def build_default_location_sampling_fn(seed: int = 0) -> Callable[[SimulationState], Link]: """ constructs a link sampling function that uniformly samples from the provided base locations :param bases: the available bases :param seed: random seed value :return: a link """ random.seed(seed) def _inner(sim: SimulationState) -> Link: if not isinstance(sim.road_network, OSMRoadNetwork): raise NotImplementedError( f"this sampling function is only implemented for the OSMRoadNetwork" ) if sim.road_network.link_helper is None: raise Exception("Expected link helper on OSMRoadNetwork but found None") links = list(sim.road_network.link_helper.links.values()) if len(links) == 0: raise AssertionError(f"must have at least one link to sample from") random_link = random.choice(links) return random_link return _inner
[docs]def build_default_soc_sampling_fn( lower_bound: Ratio = 1.0, upper_bound: Ratio = 1.0, seed: int = 0 ) -> Callable[[], Ratio]: """ constructs an SoC sampling function that uniformly samples between a lower and upper bound value :param lower_bound: the lower bound to sample from :param upper_bound: the upper bound to sample from :param seed: random seed value :return: an SoC value """ assert lower_bound <= upper_bound, ArithmeticError( f"lower bound {lower_bound} must be less than or equal to upper bound {upper_bound}" ) assert lower_bound >= 0, ArithmeticError( f"lower bound {lower_bound} must be in the range [0, 1]" ) assert lower_bound <= 1, ArithmeticError( f"lower bound {lower_bound} must be in the range [0, 1]" ) assert upper_bound >= 0, ArithmeticError( f"upper bound {upper_bound} must be in the range [0, 1]" ) assert upper_bound <= 1, ArithmeticError( f"upper bound {upper_bound} must be in the range [0, 1]" ) random.seed(seed) def _inner() -> Ratio: sampled_soc = random.uniform(lower_bound, upper_bound) return sampled_soc return _inner