from __future__ import annotations
import logging
from dataclasses import dataclass, replace
from typing import Tuple, TYPE_CHECKING, Optional
from uuid import uuid4
import immutables
from nrel.hive.model.request import Request
from nrel.hive.model.roadnetwork.route import Route
from nrel.hive.model.sim_time import SimTime
from nrel.hive.model.vehicle.trip_phase import TripPhase
from nrel.hive.runner.environment import Environment
from nrel.hive.state.vehicle_state import servicing_ops
from nrel.hive.state.vehicle_state.idle import Idle
from nrel.hive.state.vehicle_state.servicing_ops import (
get_active_pooling_trip,
update_active_pooling_trip,
)
from nrel.hive.state.vehicle_state.vehicle_state import (
VehicleState,
VehicleStateInstanceId,
)
from nrel.hive.state.vehicle_state.vehicle_state_ops import move
from nrel.hive.state.vehicle_state.vehicle_state_type import VehicleStateType
from nrel.hive.util import SimulationStateError, TupleOps
from nrel.hive.util.typealiases import RequestId, VehicleId
if TYPE_CHECKING:
from nrel.hive.state.simulation_state.simulation_state import SimulationState
log = logging.getLogger(__name__)
[docs]@dataclass(frozen=True)
class ServicingPoolingTrip(VehicleState):
"""
a pooling trip is in service, for the given trips in the given trip_order.
"""
vehicle_id: VehicleId
trip_plan: Tuple[Tuple[RequestId, TripPhase], ...]
boarded_requests: immutables.Map[RequestId, Request]
departure_times: immutables.Map[RequestId, SimTime]
routes: Tuple[Route, ...]
num_passengers: int
instance_id: VehicleStateInstanceId
[docs] @classmethod
def build(
cls,
vehicle_id: VehicleId,
trip_plan: Tuple[Tuple[RequestId, TripPhase], ...],
boarded_requests: immutables.Map[RequestId, Request],
departure_times: immutables.Map[RequestId, SimTime],
routes: Tuple[Route, ...],
num_passengers: int,
) -> ServicingPoolingTrip:
return ServicingPoolingTrip(
vehicle_id=vehicle_id,
trip_plan=trip_plan,
boarded_requests=boarded_requests,
departure_times=departure_times,
routes=routes,
num_passengers=num_passengers,
instance_id=uuid4(),
)
@property
def vehicle_state_type(cls) -> VehicleStateType:
return VehicleStateType.SERVICING_POOLING_TRIP
@property
def route(cls) -> Route:
"""
makes this uniform with other "move" states to have a "route" property
:return:
"""
return cls.routes[0] if len(cls.routes) > 0 else ()
[docs] def update_route(self, route: Route) -> ServicingPoolingTrip:
tail = TupleOps.tail(self.routes)
updated_routes = TupleOps.prepend(route, tail)
return replace(self, routes=updated_routes)
[docs] def update(
self, sim: SimulationState, env: Environment
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
return VehicleState.default_update(sim, env, self)
[docs] def enter(
self, sim: SimulationState, env: Environment
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
"""
transition from DispatchTrip into a pooling trip service leg. this first frame of servicing
a pooling trip should be happening at the start location of the first request in the pool,
which should already be boarded (so, the first trip_phase is not to pick that request up).
:param sim: the simulation state
:param env: the simulation environment
:return: an error, or, the sim with state entered
"""
vehicle = sim.vehicles.get(self.vehicle_id)
first_trip_plan_step, remaining_trip_plan = TupleOps.head_tail(self.trip_plan)
first_req_id, first_trip_phase = first_trip_plan_step
first_req = sim.requests.get(first_req_id)
context = f"vehicle {self.vehicle_id} entering servicing pooling trip state"
if vehicle is None:
return (
SimulationStateError(f"vehicle note found; context: {context}"),
None,
)
if first_req is None:
return (
SimulationStateError(f"request {first_req_id} not found; context: {context}"),
None,
)
elif not vehicle.vehicle_state.vehicle_state_type == VehicleStateType.DISPATCH_POOLING_TRIP:
# the only supported transition into ServicingPoolingTrip comes from DispatchTrip
prev_state = vehicle.vehicle_state.__class__.__name__
msg = f"ServicingPoolingTrip called for vehicle {vehicle.id} but previous state ({prev_state}) is not DispatchTrip as required"
error = SimulationStateError(msg)
return error, None
elif len(self.trip_plan) == 0:
msg = f"vehicle {self.vehicle_id} attempting to enter a ServicingPoolingTrip state without any trip plan"
error = SimulationStateError(msg)
return error, None
else:
# pick up first request
pickup_error, pickup_sim = servicing_ops.pick_up_trip(
sim, env, self.vehicle_id, first_req_id
)
if pickup_error:
pool_pickup_error = SimulationStateError(
f"failed to pick up first trip in ServicingPoolingTrip {self}"
)
pool_pickup_error.__cause__ = pickup_error
return pool_pickup_error, None
elif pickup_sim is None:
return None, None
else:
# enter ServicingPoolingTrip state with first request boarded
vehicle_state_with_first_trip = replace(
self,
boarded_requests=immutables.Map({first_req_id: first_req}),
departure_times=immutables.Map({first_req_id: sim.sim_time}),
num_passengers=len(first_req.passengers),
trip_plan=remaining_trip_plan,
)
result = VehicleState.apply_new_vehicle_state(
pickup_sim, self.vehicle_id, vehicle_state_with_first_trip
)
return result
[docs] def exit(
self, next_state: VehicleState, sim: SimulationState, env: Environment
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
"""
exit when there is no remaining trip_phase to complete
:param sim: the sim state
:param env: the sim environment
:return: None, None - cannot invoke "exit" on ServicingPoolingTrip
"""
if len(self.trip_plan) == 0:
return None, sim
elif next_state.vehicle_state_type == VehicleStateType.DISPATCH_POOLING_TRIP:
# a pooling replanning can interrupt a ServicingPoolingTrip in process
return None, sim
else:
return None, None
[docs] def _has_reached_terminal_state_condition(self, sim: SimulationState, env: Environment) -> bool:
"""
ignored: this should be handled in the update phase when the length of the final route is zero.
:param sim: the simulation state
:param env: the simulation environment
:return: true if our trip is done
"""
return len(self.trip_plan) == 0
[docs] def _default_terminal_state(
self, sim: SimulationState, env: Environment
) -> Tuple[Optional[Exception], Optional[VehicleState]]:
"""
give the default state to transition to after having met a terminal condition
:param sim: the simulation state
:param env: the simulation environment
:return: an exception due to failure or the next_state after finishing a task
"""
next_state = Idle.build(self.vehicle_id)
return None, next_state