Source code for nrel.hive.state.vehicle_state.servicing_ops

from __future__ import annotations

from dataclasses import replace
from typing import Optional, Tuple, TYPE_CHECKING, NamedTuple

from nrel.hive.model.request import Request
from nrel.hive.model.roadnetwork.route import Route
from nrel.hive.model.vehicle.trip_phase import TripPhase
from nrel.hive.model.vehicle.vehicle import Vehicle
from nrel.hive.reporting.vehicle_event_ops import (
    report_pickup_request,
    report_dropoff_request,
)
from nrel.hive.runner import Environment
from nrel.hive.state.simulation_state import simulation_state_ops
from nrel.hive.state.simulation_state.simulation_state import SimulationState
from nrel.hive.state.simulation_state.simulation_state_ops import modify_vehicle
from nrel.hive.state.vehicle_state.vehicle_state_type import VehicleStateType
from nrel.hive.util import RequestId, TupleOps, SimulationStateError, VehicleId

if TYPE_CHECKING:
    from nrel.hive.state.vehicle_state.servicing_pooling_trip import ServicingPoolingTrip


[docs]class ActivePoolingTrip(NamedTuple): request_id: RequestId trip_phase: TripPhase route: Route
[docs]def get_active_pooling_trip( vehicle_state: ServicingPoolingTrip, ) -> Tuple[Optional[Exception], Optional[ActivePoolingTrip]]: """ helper to grab the currently-active trip :param vehicle_state: the servicing state to inspect :return: the active pooling trip we are servicing """ first_trip = TupleOps.head_optional(vehicle_state.trip_plan) first_route = TupleOps.head_optional(vehicle_state.routes) if first_trip is None: error = SimulationStateError( f"attempting to grab the active request in ServicingPoolingTrip for vehicle {vehicle_state.vehicle_id} but request plan is empty" ) return error, None elif first_route is None: error = SimulationStateError( f"attempting to grab the active request in ServicingPoolingTrip for vehicle {vehicle_state.vehicle_id} but route plan is empty" ) return error, None else: request_id, trip_phase = first_trip active_pooling_trip = ActivePoolingTrip(request_id, trip_phase, first_route) return None, active_pooling_trip
[docs]def complete_trip_phase( sim: SimulationState, env: Environment, vehicle: Vehicle, active_trip: ActivePoolingTrip, ) -> Tuple[Optional[Exception], Optional[SimulationState]]: """ performs the action associated with closing out a trip phase. invariant: the Vehicle must be in a ServicingPoolingTrip state! :param sim: the simulation state :param env: the simulation environment :param vehicle: the vehicle :param active_trip: the active pooling trip phase to close out :return: an error, or, the updated simulation state """ if not vehicle.vehicle_state.vehicle_state_type == VehicleStateType.SERVICING_POOLING_TRIP: return ( Exception( f"vehicle state is not ServicingPoolingTrip but {type(vehicle.vehicle_state)}" ), None, ) else: vehicle_state: ServicingPoolingTrip = vehicle.vehicle_state # type: ignore updated_trip_plan = TupleOps.tail(vehicle_state.trip_plan) updated_routes = TupleOps.tail(vehicle_state.routes) if active_trip.trip_phase == TripPhase.PICKUP: # perform pickup operation and update remaining route plan request = sim.requests.get(active_trip.request_id) if request is None: return ( SimulationStateError(f"request {active_trip.request_id} not found"), None, ) else: err2, sim2 = pick_up_trip(sim, env, vehicle.id, request.id) if err2 is not None: response = SimulationStateError( f"failure completing trip phase for vehicle {vehicle.id} during TripPhase.PICKUP" ) response.__cause__ = err2 return response, None elif sim2 is None: return None, None else: # add this request to the boarded vehicles updated_boarded_requests = vehicle_state.boarded_requests.set(request.id, request) updated_num_passengers = vehicle_state.num_passengers + len(request.passengers) updated_departure_times = vehicle_state.departure_times.set( request.id, sim.sim_time ) updated_vehicle_state = replace( vehicle_state, trip_plan=updated_trip_plan, routes=updated_routes, boarded_requests=updated_boarded_requests, num_passengers=updated_num_passengers, departure_times=updated_departure_times, ) updated_vehicle = vehicle.modify_vehicle_state(updated_vehicle_state) result = modify_vehicle(sim2, updated_vehicle) return result elif active_trip.trip_phase == TripPhase.DROPOFF: # perform dropoff operation and update remaining route plan request = vehicle_state.boarded_requests.get(active_trip.request_id) if request is None: error = SimulationStateError( f"request {active_trip.request_id} should have boarded pooling with vehicle {vehicle.id} but not found" ) return error, None else: # result = drop_off_trip(sim, env, vehicle.id, request) # return result err2, sim2 = drop_off_trip(sim, env, vehicle.id, request) if err2 is not None: response = SimulationStateError( f"failure completing trip phase for vehicle {vehicle.id} during TripPhase.DROPOFF" ) response.__cause__ = err2 return response, None elif sim2 is None: return None, None else: # remove this request from the boarded vehicles updated_boarded_requests = vehicle_state.boarded_requests.delete(request.id) updated_num_passengers = vehicle_state.num_passengers - len(request.passengers) updated_vehicle_state = replace( vehicle_state, trip_plan=updated_trip_plan, routes=updated_routes, boarded_requests=updated_boarded_requests, num_passengers=updated_num_passengers, ) updated_vehicle = vehicle.modify_vehicle_state(updated_vehicle_state) result = modify_vehicle(sim2, updated_vehicle) return result else: return ( SimulationStateError(f"invalid trip phase {active_trip.trip_phase}"), None, )
[docs]def update_active_pooling_trip( sim: SimulationState, env: Environment, vehicle_id: VehicleId, ) -> Tuple[Optional[Exception], Optional[SimulationState]]: """ helper to update the route of the leading trip when no route changes are required :param vehicle_state: :return: """ context = f"updating active pooling trip for vehicle {vehicle_id}" vehicle = sim.vehicles.get(vehicle_id) if vehicle is None: return ( SimulationStateError(f"vehicle not found; context: {context}"), None, ) if not vehicle.vehicle_state.vehicle_state_type == VehicleStateType.SERVICING_POOLING_TRIP: return Exception("can only update vehicles in ServicingPoolingTrip states"), None else: pooling_state: ServicingPoolingTrip = vehicle.vehicle_state # type: ignore current_route = pooling_state.route if len(current_route) > 0: # vehicle still on current route, noop return None, sim else: # we reached the end of a route, so, we need to perform whatever trip phase # action is required and then update the vehicle state accordingly err1, active_trip = get_active_pooling_trip(pooling_state) if err1 is not None: response = SimulationStateError( f"failure during update_active_pooling_trip for vehicle {vehicle.id}" ) response.__cause__ = err1 return response, None elif active_trip is None: return None, None else: result = complete_trip_phase(sim, env, vehicle, active_trip) return result
[docs]def pick_up_trip( sim: SimulationState, env: Environment, vehicle_id: VehicleId, request_id: RequestId, ) -> Tuple[Optional[Exception], Optional[SimulationState]]: """ has a vehicle pick up a trip and receive payment for it. :param sim: the sim state :param env: the sim environment :param vehicle_id: the vehicle picking up the request :param request_id: the request to pick up :return: an error, or, the sim with the request picked up by the vehicle """ vehicle = sim.vehicles.get(vehicle_id) request = sim.requests.get(request_id) context = f"vehicle {vehicle_id} pickup trip for request {request_id}" if not vehicle: return ( SimulationStateError(f"vehicle not found; context: {context}"), None, ) elif not request: return ( SimulationStateError(f"request not found; context: {context}"), None, ) else: updated_vehicle = vehicle.receive_payment(request.value) ( mod_error, maybe_sim_with_vehicle, ) = simulation_state_ops.modify_vehicle(sim, updated_vehicle) if mod_error: response = SimulationStateError(f"failure during pick_up_trip for vehicle {vehicle.id}") response.__cause__ = mod_error return response, None elif maybe_sim_with_vehicle is None: return None, None else: try: report = report_pickup_request(updated_vehicle, request, maybe_sim_with_vehicle) env.reporter.file_report(report) except: # previous state may not be DispatchTrip (may not have expected attributes) pass return simulation_state_ops.remove_request(maybe_sim_with_vehicle, request_id)
[docs]def drop_off_trip( sim: SimulationState, env: Environment, vehicle_id: VehicleId, request: Request, ) -> Tuple[Optional[Exception], Optional[SimulationState]]: """ handles the dropping off of passengers, which is really mostly a validation step followed by a logging step. :param sim: the sim state :param env: the sim environment :param vehicle_id: vehicle dropping off the trip :param trip: the trip to drop off :return: an exception due to failure or the simulation state """ vehicle = sim.vehicles.get(vehicle_id) context = f"vehicle {vehicle_id} dropoff trip for request {request.id}" if not vehicle: return ( SimulationStateError(f"vehicle not found; context: {context}"), None, ) else: # confirm each passenger has reached their destination for passenger in request.passengers: if passenger.destination != vehicle.geoid: locations = f"{passenger.destination} != {vehicle.geoid}" message = f"vehicle {vehicle_id} dropping off passenger {passenger.id} but location is wrong: {locations}" return SimulationStateError(message), None report = report_dropoff_request(vehicle, sim, request) env.reporter.file_report(report) return None, sim