Source code for nrel.hive.dispatcher.instruction.instruction_ops

from __future__ import annotations

import functools as ft
import logging
from typing import Tuple, Optional, FrozenSet, TYPE_CHECKING

from nrel.hive.model.vehicle.trip_phase import TripPhase
from nrel.hive.model.vehicle.vehicle import RequestId

if TYPE_CHECKING:
    from nrel.hive.state.simulation_state.simulation_state import SimulationState
    from nrel.hive.state.vehicle_state.servicing_pooling_trip import ServicingPoolingTrip

log = logging.getLogger(__name__)


[docs]def trip_plan_covers_previous( previous_state: ServicingPoolingTrip, new_trip_plan: Tuple[Tuple[RequestId, TripPhase], ...], ) -> bool: """ checks that the incoming trip plan covers the previous plan :param previous_state: previous pooling trip state :param new_trip_plan: the proposed trip plan :return: True, if the plan is valid """ prev_req_ids = set(map(lambda rid_and_phase: rid_and_phase[0], previous_state.trip_plan)) plan_req_ids = set(map(lambda rid_and_phase: rid_and_phase[0], new_trip_plan)) previous_plan_not_covered = len(prev_req_ids.difference(plan_req_ids)) > 0 return previous_plan_not_covered
[docs]def trip_plan_ordering_is_valid( new_trip_plan: Tuple[Tuple[RequestId, TripPhase], ...], previous_state: ServicingPoolingTrip, ) -> bool: """ checks that the incoming trip plan has a logical pickup and dropoff ordering and that no passengers are left on the vehicle after all steps in the trip plan. :param previous_state: previous pooling trip state :param new_trip_plan: the proposed trip plan :return: True, if the plan is valid """ # inspect previous state and test for coverage of requests/state boarded_req_ids = ( frozenset(map(lambda trip: trip[0], previous_state.trip_plan)) if previous_state else frozenset() ) # traverses the new trip plan, confirming that pickup and dropoff orders are correct, # and that, at the end, all trips are dropped off def _test( acc: Tuple[bool, FrozenSet[str]], plan_step: Tuple[RequestId, TripPhase], ) -> Tuple[bool, FrozenSet[str]]: is_good, boarded = acc if not is_good: return acc else: r_id, t = plan_step if t == TripPhase.PICKUP: updated_boarded = boarded.union([r_id]) return True, updated_boarded elif t == TripPhase.DROPOFF: if r_id in boarded: updated_boarded = boarded.difference([r_id]) return True, updated_boarded else: return False, frozenset() else: # unknown trip phase, throw? log.error(f"trip for request {r_id} has invalid TripPhase {t}") return False, frozenset() initial = (True, boarded_req_ids) has_valid_order, final_boarding_state = ft.reduce(_test, new_trip_plan, initial) no_passengers_at_end_of_trip_plan = len(final_boarding_state) == 0 return has_valid_order and no_passengers_at_end_of_trip_plan
[docs]def trip_plan_all_requests_allow_pooling( sim: SimulationState, trip_plan: Tuple[Tuple[RequestId, TripPhase], ...] ) -> Optional[str]: """ confirm that each request in the trip plan allows pooling :param sim: the sim state :param trip_plan: the proposed trip plan from dispatch :return: None if all requests do allow pooling, otherwise, a specific error """ def _test_req( test_errors: Tuple[Tuple[str, ...], Tuple[str, ...]], r_id: RequestId ) -> Tuple[Tuple[str, ...], Tuple[str, ...]]: sim_error_ids, pool_error_ids = test_errors req = sim.requests.get(r_id) if req is None: updated_sim_error_ids = sim_error_ids + (r_id,) return updated_sim_error_ids, pool_error_ids elif not req.allows_pooling: updated_pool_error_ids = sim_error_ids + (r_id,) return sim_error_ids, updated_pool_error_ids else: return test_errors req_ids, _ = frozenset(zip(*trip_plan)) req_ids_unique = frozenset(req_ids) initial_errors: Tuple[Tuple[str, ...], Tuple[str, ...]] = ((), ()) sim_error_req_ids, pool_error_req_ids = ft.reduce(_test_req, req_ids_unique, initial_errors) if len(sim_error_req_ids) > 0 and len(pool_error_req_ids) > 0: msg = f"reqs not in sim: {sim_error_req_ids}; reqs which don't allow pooling: {pool_error_req_ids}" return msg elif len(sim_error_req_ids) > 0: msg = f"reqs not in sim: {sim_error_req_ids}" return msg elif len(pool_error_req_ids) > 0: msg = f"reqs which don't allow pooling: {pool_error_req_ids}" return msg else: return None
# def create_dispatch_pooling_trip(sim: 'SimulationState', # vehicle: Vehicle, # trip_plan: Tuple[Tuple[RequestId, TripPhase], ...] # ) -> Tuple[Optional[Exception], Optional[ServicingPoolingTrip]]: # """ # create a vehicle state representing a new pooling trip plan. # # this pooling state has been validated and so within this scope while constructing the route plan # we trust that the trip plan accounts for all trips already boarded. # # :param sim: the sim state # :param vehicle: vehicle being re-routed # :param trip_plan: the proposed trip plan from dispatch # :return: the dispatch pooling trip state # """ # # # create each route for the route plan # def _create_route(acc: Tuple[Link, Tuple[Route, ...]], # plan_step: Tuple[RequestId, TripPhase]) -> Tuple[Link, Tuple[Route, ...]]: # prev_link, solution = acc # req_id, t = plan_step # request = sim.requests.get(req_id) # if request is None: # log.error(f"attempting to build pooling trip with {req_id} which is not in the simulation") # return acc # else: # next_link = request.origin_link if t == TripPhase.PICKUP else request.destination_link # next_route = sim.road_network.route(prev_link, next_link) # next_routes = solution + (next_route, ) # return next_link, next_routes # # initial = (vehicle.geoid, ()) # _, route_plan = ft.reduce(_create_route, trip_plan, initial) # # req_ids, _ = tuple(zip(*trip_plan)) # req_ids_unique = frozenset(req_ids) # reqs = immutables.Map({r_id: sim.requests.get(r_id) for r_id in req_ids_unique}) # num_passengers = sum([len(r.passengers) for r in reqs.values()]) # # state = ServicingPoolingTrip.build( # vehicle_id=vehicle.id, # trip_plan=trip_plan, # trips=reqs, # routes=route_plan, # num_passengers=num_passengers # ) # return None, state