from __future__ import annotations
from typing import Iterable, Optional, TYPE_CHECKING, Tuple
import h3
from returns.result import Success, Failure, ResultE
from nrel.hive.model.sim_time import SimTime
from nrel.hive.util.dict_ops import DictOps
from nrel.hive.util.exception import SimulationStateError
from nrel.hive.util.fp import apply_op_to_accumulator, throw_or_return
from nrel.hive.util.typealiases import RequestId, StationId, VehicleId, BaseId
if TYPE_CHECKING:
from nrel.hive.state.simulation_state.simulation_state import SimulationState
from nrel.hive.model.entity import Entity
from nrel.hive.model.base import Base
from nrel.hive.model.request import Request
from nrel.hive.model.station.station import Station
from nrel.hive.model.vehicle.vehicle import Vehicle
"""
a collection of operations to modify the SimulationState which are not
intended to be exposed to HIVE users
"""
[docs]def tick(sim: SimulationState) -> SimulationState:
"""
advances the simulation clock
:param sim: the simulation state
:return: the simulation after being updated
"""
return sim._replace(sim_time=sim.sim_time + sim.sim_timestep_duration_seconds)
[docs]def add_entity(sim: SimulationState, entity: Entity) -> SimulationState:
"""
helper for adding an entity to the simulation
:param sim: the simulation state
:param entity: the entity to add
:return: the updated simulation state
:raises: an error if the entity cannot be added
"""
return throw_or_return(add_entity_safe(sim, entity))
[docs]def modify_entity(sim: SimulationState, entity: Entity) -> SimulationState:
"""
helper for modifying an entity in the simulation
:param sim: the simulation state
:param entity: the entity to modify
:return: the updated simulation state
:raises: an error if the entity cannot be modified
"""
return throw_or_return(modify_entity_safe(sim, entity))
[docs]def add_entities(sim: SimulationState, entities: Iterable[Entity]) -> SimulationState:
"""
helper for adding multiple entities to the simulation
:param sim: the simulation state
:param entities: the entities to add
:return: the updated simulation state
:raises: an error if any of the entities cannot be added
"""
return throw_or_return(add_entities_safe(sim, entities))
[docs]def modify_entities(sim: SimulationState, entities: Iterable[Entity]) -> SimulationState:
"""
helper for modifying multiple entities in the simulation
:param sim: the simulation state
:param entities: the entities to modify
:return: the updated simulation state
:raises: an error if any of the entities cannot be modified
"""
return throw_or_return(modify_entities_safe(sim, entities))
[docs]def add_entity_safe(sim: SimulationState, entity: Entity) -> ResultE[SimulationState]:
"""
helper for adding a general entity to the simulation
:param sim: the simulation state
:param entity: the entity to add
:return: the updated simulation state or an error
"""
if entity.__class__.__name__ == "Vehicle":
return add_vehicle_safe(sim, entity) # type: ignore
if entity.__class__.__name__ == "Station":
return add_station_safe(sim, entity) # type: ignore
if entity.__class__.__name__ == "Base":
return add_base_safe(sim, entity) # type: ignore
if entity.__class__.__name__ == "Request":
return add_request_safe(sim, entity) # type: ignore
else:
err = SimulationStateError(f"cannot add entity {entity} to simulation")
return Failure(err)
[docs]def add_entities_safe(sim: SimulationState, entities: Iterable[Entity]) -> ResultE[SimulationState]:
"""
helper for adding multiple general entities to the simulation
:param sim: the simulation state
:param entities: the entities to add
:return: the updated simulation state or an error
"""
def _add(entity: Entity):
def _inner(sim: SimulationState) -> ResultE[SimulationState]:
return add_entity_safe(sim, entity)
return _inner
return apply_op_to_accumulator(_add, entities, sim)
[docs]def modify_entity_safe(sim: SimulationState, entity: Entity) -> ResultE[SimulationState]:
"""
helper for modifying a general entity in the simulation
:param sim: the simulation state
:param entity: the entity to modify
:return: the updated simulation state or an error
"""
if entity.__class__.__name__ == "Vehicle":
return modify_vehicle_safe(sim, entity) # type: ignore
if entity.__class__.__name__ == "Station":
return modify_station_safe(sim, entity) # type: ignore
if entity.__class__.__name__ == "Base":
return modify_base_safe(sim, entity) # type: ignore
if entity.__class__.__name__ == "Request":
return modify_request_safe(sim, entity) # type: ignore
else:
err = SimulationStateError(f"cannot modify entity {entity} to simulation")
return Failure(err)
[docs]def modify_entities_safe(
sim: SimulationState, entities: Iterable[Entity]
) -> ResultE[SimulationState]:
"""
helper for moidfying multiple general entities in the simulation
:param sim: the simulation state
:param entities: the entities to modify
:return: the updated simulation state or an error
"""
def _mod(entity: Entity):
def _inner(sim: SimulationState) -> ResultE[SimulationState]:
return modify_entity_safe(sim, entity)
return _inner
return apply_op_to_accumulator(_mod, entities, sim)
[docs]def add_request_safe(sim: SimulationState, request: Request) -> ResultE[SimulationState]:
"""
adds a request to the SimulationState
:param sim: the simulation state
:param request: the request to add
:return: the updated simulation state, or an error
"""
if not sim.road_network.geoid_within_geofence(request.origin):
return Failure(
SimulationStateError(f"origin {request.origin} not within road network geofence")
)
else:
search_geoid = h3.h3_to_parent(request.geoid, sim.sim_h3_search_resolution)
updated_sim = sim._replace(
requests=DictOps.add_to_dict(sim.requests, request.id, request),
r_locations=DictOps.add_to_collection_dict(sim.r_locations, request.geoid, request.id),
r_search=DictOps.add_to_collection_dict(sim.r_search, search_geoid, request.id),
)
return Success(updated_sim)
[docs]def remove_request_safe(sim: SimulationState, request_id: RequestId) -> ResultE[SimulationState]:
"""
removes a request from this simulation.
called once a Request has been fully serviced and is no longer
alive in the simulation.
:param sim: the simulation state
:param request_id: id of the request to delete
:return: the updated simulation state (does not report failure)
"""
request = sim.requests.get(request_id)
if not request:
error = SimulationStateError(
f"attempting to remove request {request_id} which is not in simulation"
)
return Failure(error)
else:
request = sim.requests[request_id]
search_geoid = h3.h3_to_parent(request.geoid, sim.sim_h3_search_resolution)
updated_requests = DictOps.remove_from_dict(sim.requests, request.id)
updated_r_locations = DictOps.remove_from_collection_dict(
sim.r_locations, request.geoid, request.id
)
updated_r_search = DictOps.remove_from_collection_dict(
sim.r_search, search_geoid, request.id
)
updated_sim = sim._replace(
requests=updated_requests,
r_locations=updated_r_locations,
r_search=updated_r_search,
)
return Success(updated_sim)
[docs]def remove_request(
sim: SimulationState, request_id: RequestId
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
# TODO: refactor usage of this function to be the safe version
result = remove_request_safe(sim, request_id)
if isinstance(result, Failure):
return result.failure(), None
else:
return None, result.unwrap()
[docs]def modify_request_safe(sim: SimulationState, updated_request: Request) -> ResultE[SimulationState]:
"""
given an updated request, update the SimulationState with that request
:param sim: the simulation state
:param updated_request:
:return: the updated simulation, or an error
"""
request = sim.requests.get(updated_request.id)
if not request:
error = SimulationStateError(
f"cannot update request {updated_request.id}, it was not already in the sim"
)
return Failure(error)
elif not sim.road_network.geoid_within_geofence(updated_request.origin):
error = SimulationStateError(
f"cannot modify request {updated_request.id}: origin not within road network"
)
return Failure(error)
elif not sim.road_network.geoid_within_geofence(updated_request.destination):
error = SimulationStateError(
f"cannot modify request {updated_request.id}: destination not within road network"
)
return Failure(error)
else:
result = DictOps.update_entity_dictionaries(
updated_request,
sim.requests,
sim.r_locations,
sim.r_search,
sim.sim_h3_search_resolution,
)
updated_sim = sim._replace(
requests=result.entities if result.entities else sim.requests, # type: ignore
r_locations=result.locations if result.locations else sim.r_locations,
r_search=result.search if result.search else sim.r_search,
)
return Success(updated_sim)
[docs]def modify_request(
sim: SimulationState, updated_request: Request
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
# TODO: refactor usage of this function to be the safe version
result = modify_request_safe(sim, updated_request)
if isinstance(result, Failure):
return result.failure(), None
else:
return None, result.unwrap()
[docs]def add_vehicle_safe(sim: SimulationState, vehicle: Vehicle) -> ResultE[SimulationState]:
"""
adds a vehicle into the region supported by the RoadNetwork in this SimulationState
:param sim: the simulation state
:param vehicle: a vehicle
:return: updated SimulationState, or SimulationStateError
"""
if not sim.road_network.geoid_within_geofence(vehicle.geoid):
error = SimulationStateError(
f"cannot add vehicle {vehicle.id} to sim: not within road network geofence"
)
return Failure(error)
else:
search_geoid = h3.h3_to_parent(vehicle.geoid, sim.sim_h3_search_resolution)
updated_v_locations = DictOps.add_to_collection_dict(
sim.v_locations, vehicle.geoid, vehicle.id
)
updated_v_search = DictOps.add_to_collection_dict(sim.v_search, search_geoid, vehicle.id)
updated_sim = sim._replace(
vehicles=DictOps.add_to_dict(sim.vehicles, vehicle.id, vehicle),
v_locations=updated_v_locations,
v_search=updated_v_search,
)
return Success(updated_sim)
[docs]def modify_vehicle_safe(sim: SimulationState, updated_vehicle: Vehicle) -> ResultE[SimulationState]:
"""
given an updated vehicle, update the SimulationState with that vehicle
:param sim: the simulation state
:param updated_vehicle: the vehicle after calling a transition function and .step()
:return: the updated simulation, or an error
"""
# TODO: since the geofence is is made up of hexes, it is possible to exit the geofence mid route when
# traveling between two protruding hexes. I think we can allow this since we guarantee that a request
# o-d pair will always be within the geofence.
vehicle = sim.vehicles.get(updated_vehicle.id)
if vehicle is None:
error = SimulationStateError(
f"cannot update vehicle {updated_vehicle.id}, it was not already in the sim"
)
return Failure(error)
elif not sim.road_network.geoid_within_geofence(updated_vehicle.geoid):
error = SimulationStateError(
f"cannot add vehicle {updated_vehicle.id} to sim: not within road network"
)
return Failure(error)
else:
updated_dictionaries = DictOps.update_entity_dictionaries(
updated_vehicle,
sim.vehicles,
sim.v_locations,
sim.v_search,
sim.sim_h3_search_resolution,
)
updated_sim = sim._replace(
vehicles=updated_dictionaries.entities # type: ignore
if updated_dictionaries.entities
else sim.vehicles,
v_locations=updated_dictionaries.locations
if updated_dictionaries.locations
else sim.v_locations,
v_search=updated_dictionaries.search if updated_dictionaries.search else sim.v_search,
)
return Success(updated_sim)
[docs]def modify_vehicle(
sim: SimulationState, updated_vehicle: Vehicle
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
# TODO: refactor usage of this function to be the safe version
result = modify_vehicle_safe(sim, updated_vehicle)
if isinstance(result, Failure):
return result.failure(), None
else:
return None, result.unwrap()
[docs]def remove_vehicle_safe(sim: SimulationState, vehicle_id: VehicleId) -> ResultE[SimulationState]:
"""
removes the vehicle from play (perhaps to simulate a broken vehicle or end of a shift)
:param sim: the simulation state
:param vehicle_id: the id of the vehicle
:return: the updated simulation state
"""
if not isinstance(vehicle_id, VehicleId):
error = SimulationStateError(
f"remove_vehicle() takes a VehicleId (str), not a {type(vehicle_id)}"
)
return Failure(error)
elif vehicle_id not in sim.vehicles:
error = SimulationStateError(
f"attempting to remove vehicle {vehicle_id} which is not in simulation"
)
return Failure(error)
else:
vehicle = sim.vehicles[vehicle_id]
search_geoid = h3.h3_to_parent(vehicle.geoid, sim.sim_h3_search_resolution)
updated_sim = sim._replace(
vehicles=DictOps.remove_from_dict(sim.vehicles, vehicle_id),
v_locations=DictOps.remove_from_collection_dict(
sim.v_locations, vehicle.geoid, vehicle_id
),
v_search=DictOps.remove_from_collection_dict(sim.v_search, search_geoid, vehicle_id),
)
return Success(updated_sim)
[docs]def remove_vehicle(
sim: SimulationState, vehicle_id: VehicleId
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
# TODO: refactor usage of this function to be the safe version
result = remove_vehicle_safe(sim, vehicle_id)
if isinstance(result, Failure):
return result.failure(), None
else:
return None, result.unwrap()
[docs]def pop_vehicle_safe(
sim: SimulationState, vehicle_id: VehicleId
) -> ResultE[Tuple[SimulationState, Vehicle]]:
"""
removes a vehicle from this SimulationState, which updates the state and also returns the vehicle.
supports shipping this vehicle to another cluster node.
:param sim: the simulation state
:param vehicle_id: the id of the vehicle to pop
:return: either a Tuple containing the updated state and the vehicle, or, an error
"""
vehicle = sim.vehicles.get(vehicle_id)
if not vehicle:
error = SimulationStateError(
f"attempting to pop vehicle {vehicle_id} which is not in simulation"
)
return Failure(error)
else:
remove_result = remove_vehicle_safe(sim, vehicle_id)
if isinstance(remove_result, Failure):
response = SimulationStateError(f"failure in pop_vehicle for vehicle {vehicle_id}")
response.__cause__ = remove_result.failure()
return Failure(response)
else:
return Success((remove_result.unwrap(), vehicle))
[docs]def pop_vehicle(
sim: SimulationState, vehicle_id: VehicleId
) -> Tuple[Optional[Exception], Optional[Tuple[SimulationState, Vehicle]]]:
# TODO: refactor usage of this function to be the safe version
result = pop_vehicle_safe(sim, vehicle_id)
if isinstance(result, Failure):
return result.failure(), None
else:
return None, result.unwrap()
[docs]def add_station_safe(sim: SimulationState, station: Station) -> ResultE[SimulationState]:
"""
adds a station to the simulation
:param sim: the simulation state
:param station: the station to add
:return: the updated SimulationState, or a error = SimulationStateError
"""
if not sim.road_network.geoid_within_geofence(station.geoid):
error = SimulationStateError(
f"cannot add station {station.id} to sim: not within road network geofence"
)
return Failure(error)
else:
search_geoid = h3.h3_to_parent(station.geoid, sim.sim_h3_search_resolution)
updated_s_locations = DictOps.add_to_collection_dict(
sim.s_locations, station.geoid, station.id
)
updated_s_search = DictOps.add_to_collection_dict(sim.s_search, search_geoid, station.id)
updated_sim = sim._replace(
stations=DictOps.add_to_dict(sim.stations, station.id, station),
s_locations=updated_s_locations,
s_search=updated_s_search,
)
return Success(updated_sim)
[docs]def remove_station_safe(sim: SimulationState, station_id: StationId) -> ResultE[SimulationState]:
"""
remove a station from the simulation. maybe they closed due to inclement weather.
:param sim: the simulation state
:param station_id: the id of the station to remove
:return: the updated simulation state, or an exception
"""
station = sim.stations.get(station_id)
if not station:
error = SimulationStateError(f"cannot remove station {station_id}, it does not exist")
return Failure(error)
else:
search_geoid = h3.h3_to_parent(station.geoid, sim.sim_h3_search_resolution)
updated_s_locations = DictOps.remove_from_collection_dict(
sim.s_locations, station.geoid, station_id
)
updated_s_search = DictOps.remove_from_collection_dict(
sim.s_search, search_geoid, station_id
)
updated_sim = sim._replace(
stations=DictOps.remove_from_dict(sim.stations, station_id),
s_locations=updated_s_locations,
s_search=updated_s_search,
)
return Success(updated_sim)
[docs]def remove_station(
sim: SimulationState, station_id: StationId
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
# TODO: refactor usage of this function to be the safe version
result = remove_station_safe(sim, station_id)
if isinstance(result, Failure):
return result.failure(), None
else:
return None, result.unwrap()
[docs]def modify_station_safe(sim: SimulationState, updated_station: Station) -> ResultE[SimulationState]:
"""
given an updated station, update the SimulationState with that station
:param sim: the simulation state
:param updated_station: the revised station data
:return: the updated simulation, or an error
"""
station = sim.stations.get(updated_station.id)
if not station:
error = SimulationStateError(
f"cannot update station {updated_station.id}, it was not already in the sim"
)
return Failure(error)
elif station.geoid != updated_station.geoid:
msg = f"station {station.id} attempting to move from {station.geoid} to {updated_station.geoid}, which is not permitted"
error = SimulationStateError(msg)
return Failure(error)
elif not sim.road_network.geoid_within_geofence(updated_station.geoid):
error = SimulationStateError(
f"cannot add station {station.id} to sim: not within road network geofence"
)
return Failure(error)
else:
updated_sim = sim._replace(
stations=DictOps.add_to_dict(sim.stations, updated_station.id, updated_station)
)
return Success(updated_sim)
[docs]def modify_station(
sim: SimulationState, updated_station: Station
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
# TODO: refactor usage of this function to be the safe version
result = modify_station_safe(sim, updated_station)
if isinstance(result, Failure):
return result.failure(), None
else:
return None, result.unwrap()
[docs]def add_base_safe(sim: SimulationState, base: Base) -> ResultE[SimulationState]:
"""
adds a base to the simulation
:param sim: the simulation state
:param base: the base to add
:return: the updated SimulationState, or a SimulationStateError
"""
if not sim.road_network.geoid_within_geofence(base.geoid):
error = SimulationStateError(
f"cannot add base {base.id} to sim: not within road network geofence"
)
return Failure(error)
else:
search_geoid = h3.h3_to_parent(base.geoid, sim.sim_h3_search_resolution)
updated_b_locations = DictOps.add_to_collection_dict(sim.b_locations, base.geoid, base.id)
updated_b_search = DictOps.add_to_collection_dict(sim.b_search, search_geoid, base.id)
updated_sim = sim._replace(
bases=DictOps.add_to_dict(sim.bases, base.id, base),
b_locations=updated_b_locations,
b_search=updated_b_search,
)
return Success(updated_sim)
[docs]def remove_base_safe(sim: SimulationState, base_id: BaseId) -> ResultE[SimulationState]:
"""
remove a base from the simulation. all your base belong to us.
:param sim: the simulation state
:param base_id: the id of the base to remove
:return: the updated simulation state, or an exception
"""
base = sim.bases.get(base_id)
if not base:
error = SimulationStateError(f"cannot remove base {base_id}, it does not exist")
return Failure(error)
else:
search_geoid = h3.h3_to_parent(base.geoid, sim.sim_h3_search_resolution)
updated_b_locations = DictOps.remove_from_collection_dict(
sim.b_locations, base.geoid, base_id
)
updated_b_search = DictOps.remove_from_collection_dict(sim.b_search, search_geoid, base_id)
updated_sim = sim._replace(
bases=DictOps.remove_from_dict(sim.bases, base_id),
b_locations=updated_b_locations,
b_search=updated_b_search,
)
return Success(updated_sim)
[docs]def remove_base(
sim: SimulationState, base_id: BaseId
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
# TODO: refactor usage of this function to be the safe version
result = remove_base_safe(sim, base_id)
if isinstance(result, Failure):
return result.failure(), None
else:
return None, result.unwrap()
[docs]def modify_base_safe(sim: SimulationState, updated_base: Base) -> ResultE[SimulationState]:
"""
given an updated base, update the SimulationState with that base
invariant: base locations will not be changed!
:param sim: the simulation state
:param updated_base:
:return: the updated simulation, or an error
"""
base = sim.bases.get(updated_base.id)
if not base:
error = SimulationStateError(
f"cannot update base {updated_base.id}, it was not already in the sim"
)
return Failure(error)
elif base.geoid != updated_base.geoid:
msg = f"base {base.id} attempting to move from {base.geoid} to {updated_base.geoid}, which is not permitted"
error = SimulationStateError(msg)
return Failure(error)
elif not sim.road_network.geoid_within_geofence(updated_base.geoid):
error = SimulationStateError(
f"cannot add base {updated_base.id} to sim: not within road network geofence"
)
return Failure(error)
else:
updated_sim = sim._replace(
bases=DictOps.add_to_dict(sim.bases, updated_base.id, updated_base)
)
return Success(updated_sim)
[docs]def modify_base(
sim: SimulationState, updated_base: Base
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
# TODO: refactor usage of this function to be the safe version
result = modify_base_safe(sim, updated_base)
if isinstance(result, Failure):
return result.failure(), None
else:
return None, result.unwrap()
[docs]def update_road_network(sim: SimulationState, sim_time: SimTime) -> SimulationState:
"""
trigger the update of the road network model based on the current sim time
:param sim: the simulation state
:param sim_time: the current sim time
:return: updated simulation state (and road network)
"""
return sim._replace(road_network=sim.road_network.update(sim_time))