Source code for nrel.hive.dispatcher.instruction_generator.dispatcher

from __future__ import annotations

import functools as ft
import logging
from dataclasses import dataclass
from typing import Tuple, TYPE_CHECKING, Optional

from nrel.hive.dispatcher.instruction_generator import assignment_ops
from nrel.hive.state.vehicle_state.charging_base import ChargingBase

if TYPE_CHECKING:
    from nrel.hive.state.simulation_state.simulation_state import SimulationState
    from nrel.hive.runner.environment import Environment
    from nrel.hive.dispatcher.instruction.instruction import Instruction
    from nrel.hive.model.vehicle.vehicle import Vehicle
    from nrel.hive.model.request.request import Request
    from nrel.hive.config.dispatcher_config import DispatcherConfig
    from nrel.hive.util.typealiases import MembershipId

from nrel.hive.dispatcher.instruction_generator.instruction_generator import InstructionGenerator
from nrel.hive.dispatcher.instruction.instructions import DispatchTripInstruction

log = logging.getLogger(__name__)


[docs]@dataclass(frozen=True) class Dispatcher(InstructionGenerator): """ A managers algorithm that assigns vehicles greedily to most expensive request. """ config: DispatcherConfig
[docs] def generate_instructions( self, simulation_state: SimulationState, environment: Environment, ) -> Tuple[Dispatcher, Tuple[Instruction, ...]]: """ Generate fleet targets for the dispatcher to execute based on the simulation state. :param environment: :param simulation_state: The current simulation state :return: the updated Dispatcher along with instructions """ base_charging_range_km_threshold = ( environment.config.dispatcher.base_charging_range_km_threshold ) def _solve_assignment( inst_acc: Tuple[DispatchTripInstruction, ...], membership_id: Optional[MembershipId], ) -> Tuple[DispatchTripInstruction, ...]: def _is_valid_for_dispatch(vehicle: Vehicle) -> bool: vehicle_state_str = vehicle.vehicle_state.__class__.__name__.lower() if vehicle_state_str not in environment.config.dispatcher.valid_dispatch_states: return False elif not vehicle.driver_state.available: return False elif ( membership_id is not None and not vehicle.membership.grant_access_to_membership_id(membership_id) ): return False mechatronics = environment.mechatronics.get(vehicle.mechatronics_id) if mechatronics is None: log.error(f"mechatonrics not found for vehicle {vehicle.id}") return False range_remaining_km = mechatronics.range_remaining_km(vehicle) # if we are at a base, do we have enough remaining range to leave the base? if ( isinstance(vehicle.vehicle_state, ChargingBase) and range_remaining_km < base_charging_range_km_threshold ): return False # do we have enough remaining range to allow us to match? return bool( range_remaining_km > environment.config.dispatcher.matching_range_km_threshold ) def _valid_request(r: Request) -> bool: not_already_dispatched = not r.dispatched_vehicle valid_access = ( r.membership.grant_access_to_membership_id(membership_id) if membership_id is not None else True ) return not_already_dispatched and valid_access # collect the vehicles and requests for the assignment algorithm available_vehicles = simulation_state.get_vehicles( filter_function=_is_valid_for_dispatch, ) unassigned_requests = simulation_state.get_requests( sort_key=lambda r: (-r.value, r.id), filter_function=_valid_request, ) # select assignment of vehicles to requests solution = assignment_ops.find_assignment( available_vehicles, unassigned_requests, assignment_ops.h3_distance_cost, ) instructions = ft.reduce( lambda acc, pair: ( *acc, DispatchTripInstruction(pair[0], pair[1]), ), solution.solution, inst_acc, ) return instructions if len(environment.fleet_ids) > 0: fleet_ids = environment.fleet_ids else: fleet_ids = frozenset([None]) initial_instructions: Tuple[DispatchTripInstruction, ...] = tuple() all_instructions = ft.reduce( _solve_assignment, fleet_ids, initial_instructions, ) return self, all_instructions