Source code for nrel.hive.state.driver_state.driver_state

from __future__ import annotations

from abc import abstractmethod, ABC
from typing import Tuple, Optional, TYPE_CHECKING

from nrel.hive.state.simulation_state import simulation_state_ops
from nrel.hive.util import SimulationStateError

if TYPE_CHECKING:
    from nrel.hive.state.simulation_state.simulation_state import SimulationState
    from nrel.hive.runner.environment import Environment
    from nrel.hive.util.typealiases import ScheduleId, BaseId, VehicleId
    from nrel.hive.dispatcher.instruction.instruction import Instruction


[docs]class DriverState(ABC): """ superclass for all driver state instances """ @property @abstractmethod def schedule_id(cls) -> Optional[ScheduleId]: pass @property @abstractmethod def available(cls): pass @property def is_human_driver(cls) -> bool: return cls.schedule_id is not None @property @abstractmethod def allows_pooling(cls) -> bool: pass @property @abstractmethod def home_base_id(cls) -> Optional[BaseId]: pass
[docs] @abstractmethod def update( self, sim: SimulationState, env: Environment ) -> Tuple[Optional[Exception], Optional[SimulationState]]: pass
[docs] @abstractmethod def generate_instruction( self, sim: SimulationState, env: Environment, previous_instructions: Optional[Tuple[Instruction, ...]] = None, ) -> Optional[Instruction]: """ allows the driver state to issue an optional instruction for the vehicle considering all the previous instructions generated by the dispatcher :param sim: :param env: :param previous_instructions: :return: """ return None
[docs] def enter( self, sim: SimulationState, env: Environment ) -> Tuple[Optional[Exception], Optional[SimulationState]]: """ there are no operations associated with entering a DriverState :param sim: the simulation state :param env: the simulation environment :return: always the unmodified simulation state """ return None, sim
[docs] def exit( self, sim: SimulationState, env: Environment ) -> Tuple[Optional[Exception], Optional[SimulationState]]: """ there are no operations associated with exiting a DriverState :param sim: the simulation state :param env: the simulation environment :return: always the unmodified simulation state """ return None, sim
[docs] @classmethod def apply_new_driver_state( mcs, sim: SimulationState, vehicle_id: VehicleId, new_state: DriverState, ) -> Tuple[Optional[Exception], Optional[SimulationState]]: """ helper for updating a Vehicle with a new DriverState :param sim: the simulation state :param vehicle_id: the id of the vehicle to update :param new_state: the state to apply to the vehicle :return: the updated sim, or, an error """ vehicle = sim.vehicles.get(vehicle_id) if not vehicle: state_name = new_state.__class__.__name__ return ( SimulationStateError( f"vehicle {vehicle_id} not found; context: applying new {state_name} driver state" ), None, ) else: updated_vehicle = vehicle.modify_driver_state(new_state) return simulation_state_ops.modify_vehicle(sim, updated_vehicle)
[docs] @classmethod def build( mcs, vehicle_id: VehicleId, schedule_id: Optional[ScheduleId], base_id: Optional[BaseId], allows_pooling: bool, ) -> DriverState: """ constructs a new DriverState based on the provided arguments :param vehicle_id: the Vehicle associated with this DriverState :param schedule_id: if provided, sets the DriverState as a HumanUnavailable driver :param base_id: used for HumanAvailable and HumanUnavailable :param allows_pooling: set true if this agent will pick up pooling requests :return: the driver state instance created """ from nrel.hive.state.driver_state.autonomous_driver_state.autonomous_available import ( AutonomousAvailable, ) from nrel.hive.state.driver_state.autonomous_driver_state.autonomous_driver_attributes import ( AutonomousDriverAttributes, ) from nrel.hive.state.driver_state.human_driver_state.human_driver_attributes import ( HumanDriverAttributes, ) from nrel.hive.state.driver_state.human_driver_state.human_driver_state import ( HumanUnavailable, ) if not schedule_id: return AutonomousAvailable(AutonomousDriverAttributes(vehicle_id)) else: if not base_id: raise Exception("cannot build a vehicle with schedule but not a home base id") return HumanUnavailable( HumanDriverAttributes(vehicle_id, schedule_id, base_id, allows_pooling) )