Source code for nrel.hive.state.simulation_state.simulation_state

from __future__ import annotations

from typing import (
    NamedTuple,
    Optional,
    cast,
    Tuple,
    Callable,
    TYPE_CHECKING,
    FrozenSet,
)

import immutables

from nrel.hive.model.roadnetwork.haversine_roadnetwork import HaversineRoadNetwork
from nrel.hive.model.sim_time import SimTime
from nrel.hive.state.simulation_state.at_location_response import AtLocationResponse
from nrel.hive.util import geo
from nrel.hive.util.dict_ops import DictOps
from nrel.hive.util.typealiases import (
    RequestId,
    VehicleId,
    BaseId,
    StationId,
    GeoId,
)

if TYPE_CHECKING:
    from nrel.hive.model.roadnetwork.roadnetwork import RoadNetwork
    from nrel.hive.util.units import Seconds
    from nrel.hive.model.base import Base
    from nrel.hive.model.request import Request
    from nrel.hive.model.station.station import Station
    from nrel.hive.model.vehicle.vehicle import Vehicle
    from nrel.hive.dispatcher.instruction.instruction import Instruction


[docs]class SimulationState(NamedTuple): """ resolution of '11' is within 25 meters/82 feet. this decides how granular the simulation will operate for internal operations. the user can still specify their own level of granularity for https://uber.github.io/h3/#/documentation/core-library/resolution-table """ # road network representation road_network: RoadNetwork = HaversineRoadNetwork() # simulation parameters sim_time: SimTime = SimTime(0) sim_timestep_duration_seconds: Seconds = 60 sim_h3_location_resolution: int = 15 sim_h3_search_resolution: int = 7 # objects of the simulation. # note: if you need to iterate on these collections, prefer the get methods # provided below which ensure entities are properly sorted. stations: immutables.Map[StationId, Station] = immutables.Map() bases: immutables.Map[BaseId, Base] = immutables.Map() vehicles: immutables.Map[VehicleId, Vehicle] = immutables.Map() requests: immutables.Map[RequestId, Request] = immutables.Map() # the instructions applied in the most recent state transition applied_instructions: immutables.Map[VehicleId, Instruction] = immutables.Map() # location collections - the lowest-level spatial representation in Hive v_locations: immutables.Map[GeoId, FrozenSet[VehicleId]] = immutables.Map() r_locations: immutables.Map[GeoId, FrozenSet[RequestId]] = immutables.Map() s_locations: immutables.Map[GeoId, FrozenSet[StationId]] = immutables.Map() b_locations: immutables.Map[GeoId, FrozenSet[StationId]] = immutables.Map() # search collections - a higher-level spatial representation used for ring search v_search: immutables.Map[GeoId, FrozenSet[VehicleId]] = immutables.Map() r_search: immutables.Map[GeoId, FrozenSet[RequestId]] = immutables.Map() s_search: immutables.Map[GeoId, FrozenSet[StationId]] = immutables.Map() b_search: immutables.Map[GeoId, FrozenSet[BaseId]] = immutables.Map()
[docs] def get_station_ids(self) -> Tuple[StationId, ...]: return tuple(sorted(self.stations.keys()))
[docs] def get_vehicle_ids(self) -> Tuple[VehicleId, ...]: return tuple(sorted(self.vehicles.keys()))
[docs] def get_base_ids(self) -> Tuple[BaseId, ...]: return tuple(sorted(self.bases.keys()))
[docs] def get_request_ids(self) -> Tuple[RequestId, ...]: return tuple(sorted(self.requests.keys()))
[docs] def get_stations( self, filter_function: Optional[Callable[[Station], bool]] = None, sort_key: Optional[Callable] = None, ) -> Tuple[Station, ...]: return DictOps.iterate_sim_coll(self.stations, filter_function, sort_key)
[docs] def get_bases( self, filter_function: Optional[Callable[[Base], bool]] = None, sort_key: Optional[Callable] = None, ) -> Tuple[Base, ...]: return DictOps.iterate_sim_coll(self.bases, filter_function, sort_key)
[docs] def get_vehicles( self, filter_function: Optional[Callable[[Vehicle], bool]] = None, sort_key: Optional[Callable] = None, ) -> Tuple[Vehicle, ...]: return DictOps.iterate_sim_coll(self.vehicles, filter_function, sort_key)
[docs] def get_requests( self, filter_function: Optional[Callable[[Request], bool]] = None, sort_key: Optional[Callable] = None, ) -> Tuple[Request, ...]: return DictOps.iterate_sim_coll(self.requests, filter_function, sort_key)
[docs] def at_geoid(self, geoid: GeoId) -> AtLocationResponse: """ returns a dictionary with the list of ids found at this location for all entities :deprecated: no longer used :param geoid: geoid to look up, should be at the self.sim_h3_location_resolution :return: an Optional AtLocationResponse """ vehicles = self.v_locations[geoid] if geoid in self.v_locations else frozenset() requests = self.r_locations[geoid] if geoid in self.r_locations else frozenset() station = self.s_locations[geoid] if geoid in self.s_locations else frozenset() base = self.b_locations[geoid] if geoid in self.b_locations else frozenset() result = cast( AtLocationResponse, { "vehicles": vehicles, "requests": requests, "station": station, "base": base, }, ) return result
[docs] def vehicle_at_request( self, vehicle_id: VehicleId, request_id: RequestId, override_resolution: Optional[int] = None, ) -> bool: """ tests whether vehicle is at the request within the scope of the given geoid resolution :param vehicle_id: the vehicle we are testing for proximity to a request :param request_id: the request we are testing for proximity to a vehicle :param override_resolution: a resolution to use for geo intersection test; if None, use self.sim_h3_location_resolution :return: bool """ vehicle = self.vehicles.get(vehicle_id) request = self.requests.get(request_id) if not vehicle or not request: return False else: return geo.same_simulation_location( vehicle.geoid, request.origin, self.sim_h3_location_resolution, override_resolution, )
[docs] def vehicle_at_station( self, vehicle_id: VehicleId, station_id: StationId, override_resolution: Optional[int] = None, ) -> bool: """ tests whether vehicle is at the station within the scope of the given geoid resolution :param vehicle_id: the vehicle we are testing for proximity to a request :param station_id: the station we are testing for proximity to a vehicle :param override_resolution: a resolution to use for geo intersection test; if None, use self.sim_h3_location_resolution :return: bool """ vehicle = self.vehicles.get(vehicle_id) station = self.stations.get(station_id) if not vehicle or not station: return False else: return geo.same_simulation_location( vehicle.geoid, station.geoid, self.sim_h3_location_resolution, override_resolution, )
[docs] def vehicle_at_base( self, vehicle_id: VehicleId, base_id: BaseId, override_resolution: Optional[int] = None, ) -> bool: """ tests whether vehicle is at the request within the scope of the given geoid resolution :param vehicle_id: the vehicle we are testing for proximity to a request :param base_id: the base we are testing for proximity to a vehicle :param override_resolution: a resolution to use for geo intersection test; if None, use self.sim_h3_location_resolution :return: bool """ vehicle = self.vehicles.get(vehicle_id) base = self.bases.get(base_id) if not vehicle or not base: return False else: return geo.same_simulation_location( vehicle.geoid, base.geoid, self.sim_h3_location_resolution, override_resolution, )