from __future__ import annotations
from abc import abstractmethod, ABC
from typing import Tuple, Optional, TYPE_CHECKING
from nrel.hive.state.simulation_state import simulation_state_ops
from nrel.hive.util import SimulationStateError
if TYPE_CHECKING:
from nrel.hive.state.simulation_state.simulation_state import SimulationState
from nrel.hive.runner.environment import Environment
from nrel.hive.util.typealiases import ScheduleId, BaseId, VehicleId
from nrel.hive.dispatcher.instruction.instruction import Instruction
[docs]class DriverState(ABC):
"""
superclass for all driver state instances
"""
@property
@abstractmethod
def schedule_id(cls) -> Optional[ScheduleId]:
pass
@property
@abstractmethod
def available(cls):
pass
@property
def is_human_driver(cls) -> bool:
return cls.schedule_id is not None
@property
@abstractmethod
def allows_pooling(cls) -> bool:
pass
@property
@abstractmethod
def home_base_id(cls) -> Optional[BaseId]:
pass
[docs] @abstractmethod
def update(
self, sim: SimulationState, env: Environment
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
pass
[docs] @abstractmethod
def generate_instruction(
self,
sim: SimulationState,
env: Environment,
previous_instructions: Optional[Tuple[Instruction, ...]] = None,
) -> Optional[Instruction]:
"""
allows the driver state to issue an optional instruction for the vehicle considering all the
previous instructions generated by the dispatcher
:param sim:
:param env:
:param previous_instructions:
:return:
"""
return None
[docs] def enter(
self, sim: SimulationState, env: Environment
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
"""
there are no operations associated with entering a DriverState
:param sim: the simulation state
:param env: the simulation environment
:return: always the unmodified simulation state
"""
return None, sim
[docs] def exit(
self, sim: SimulationState, env: Environment
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
"""
there are no operations associated with exiting a DriverState
:param sim: the simulation state
:param env: the simulation environment
:return: always the unmodified simulation state
"""
return None, sim
[docs] @classmethod
def apply_new_driver_state(
mcs,
sim: SimulationState,
vehicle_id: VehicleId,
new_state: DriverState,
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
"""
helper for updating a Vehicle with a new DriverState
:param sim: the simulation state
:param vehicle_id: the id of the vehicle to update
:param new_state: the state to apply to the vehicle
:return: the updated sim, or, an error
"""
vehicle = sim.vehicles.get(vehicle_id)
if not vehicle:
state_name = new_state.__class__.__name__
return (
SimulationStateError(
f"vehicle {vehicle_id} not found; context: applying new {state_name} driver state"
),
None,
)
else:
updated_vehicle = vehicle.modify_driver_state(new_state)
return simulation_state_ops.modify_vehicle(sim, updated_vehicle)
[docs] @classmethod
def build(
mcs,
vehicle_id: VehicleId,
schedule_id: Optional[ScheduleId],
base_id: Optional[BaseId],
allows_pooling: bool,
) -> DriverState:
"""
constructs a new DriverState based on the provided arguments
:param vehicle_id: the Vehicle associated with this DriverState
:param schedule_id: if provided, sets the DriverState as a HumanUnavailable driver
:param base_id: used for HumanAvailable and HumanUnavailable
:param allows_pooling: set true if this agent will pick up pooling requests
:return: the driver state instance created
"""
from nrel.hive.state.driver_state.autonomous_driver_state.autonomous_available import (
AutonomousAvailable,
)
from nrel.hive.state.driver_state.autonomous_driver_state.autonomous_driver_attributes import (
AutonomousDriverAttributes,
)
from nrel.hive.state.driver_state.human_driver_state.human_driver_attributes import (
HumanDriverAttributes,
)
from nrel.hive.state.driver_state.human_driver_state.human_driver_state import (
HumanUnavailable,
)
if not schedule_id:
return AutonomousAvailable(AutonomousDriverAttributes(vehicle_id))
else:
if not base_id:
raise Exception("cannot build a vehicle with schedule but not a home base id")
return HumanUnavailable(
HumanDriverAttributes(vehicle_id, schedule_id, base_id, allows_pooling)
)