from __future__ import annotations
import logging
import uuid
from dataclasses import dataclass, replace
from typing import Tuple, Optional, TYPE_CHECKING
from nrel.hive.runner.environment import Environment
from nrel.hive.state.simulation_state import simulation_state_ops
from nrel.hive.state.vehicle_state.idle import Idle
from nrel.hive.state.vehicle_state.vehicle_state import (
VehicleState,
VehicleStateInstanceId,
)
from nrel.hive.state.vehicle_state.vehicle_state_ops import charge
from nrel.hive.state.vehicle_state.vehicle_state_type import VehicleStateType
from nrel.hive.util.exception import SimulationStateError
from nrel.hive.util.typealiases import StationId, VehicleId, ChargerId
if TYPE_CHECKING:
from nrel.hive.state.simulation_state.simulation_state import SimulationState
from nrel.hive.runner.environment import Environment
log = logging.getLogger(__name__)
[docs]@dataclass(frozen=True)
class ChargingStation(VehicleState):
"""
a vehicle is charging at a station with a specific charger_id type
"""
vehicle_id: VehicleId
station_id: StationId
charger_id: ChargerId
instance_id: VehicleStateInstanceId
[docs] @classmethod
def build(
cls,
vehicle_id: VehicleId,
station_id: StationId,
charger_id: ChargerId,
) -> ChargingStation:
"""
build a charging station state
:param vehicle_id: the vehicle id
:param station_id: the station id
:param charger_id: the charger id
:return: a charging station state
"""
return cls(
vehicle_id=vehicle_id,
station_id=station_id,
charger_id=charger_id,
instance_id=uuid.uuid4(),
)
@property
def vehicle_state_type(cls) -> VehicleStateType:
return VehicleStateType.CHARGING_STATION
[docs] def enter(
self, sim: "SimulationState", env: Environment
) -> Tuple[Optional[Exception], Optional["SimulationState"]]:
"""
entering a charge event requires attaining a charger_id from the station
:param sim: the simulation state
:param env: the simulation environment
:return: an exception due to failure or an optional updated simulation
"""
self = replace(self, instance_id=uuid.uuid4())
# ok, we want to enter a charging state.
# we attempt to claim a charger_id from the station of this self.charger_id type
# what if we can't? is that an Exception, or, is that simply rejected?
vehicle = sim.vehicles.get(self.vehicle_id)
station = sim.stations.get(self.station_id)
context = f"vehicle {self.vehicle_id} entering charging station state at station {self.station_id} with charger {self.charger_id}"
if not vehicle:
return (
SimulationStateError(f"vehicle not found; context: {context}"),
None,
)
elif not station:
return (
SimulationStateError(f"station not found; context: {context}"),
None,
)
mechatronics = env.mechatronics.get(vehicle.mechatronics_id)
charger_err, charger = station.get_charger_instance(self.charger_id)
if mechatronics is None:
return (
SimulationStateError(f"unknown mechatronics id {vehicle.mechatronics_id}"),
None,
)
if vehicle.geoid != station.geoid:
return None, None
elif not station.membership.grant_access_to_membership(vehicle.membership):
msg = f"vehicle {vehicle.id} doesn't have access to station {station.id}"
return SimulationStateError(msg), None
elif charger_err is not None:
return charger_err, None
elif charger is None:
return None, None
elif not mechatronics.valid_charger(charger):
msg = f"vehicle {vehicle.id} of type {vehicle.mechatronics_id} can't use charger {charger.id}"
return SimulationStateError(msg), None
else:
error, updated_station = station.checkout_charger(self.charger_id)
if error is not None:
return error, None
elif updated_station is None:
return None, None
else:
error, updated_sim = simulation_state_ops.modify_station(sim, updated_station)
if error:
response = SimulationStateError(
f"failure during ChargingStation.enter for vehicle {self.vehicle_id}"
)
response.__cause__ = error
return response, None
elif updated_sim is None:
return None, None
else:
return VehicleState.apply_new_vehicle_state(updated_sim, self.vehicle_id, self)
[docs] def update(
self, sim: "SimulationState", env: Environment
) -> Tuple[Optional[Exception], Optional["SimulationState"]]:
return VehicleState.default_update(sim, env, self)
[docs] def exit(
self,
next_state: VehicleState,
sim: "SimulationState",
env: "Environment",
) -> Tuple[Optional[Exception], Optional["SimulationState"]]:
"""
exiting a charge event requires returning the charger_id to the station
:param sim: the simulation state
:param env: the simulation environment
:return: an exception due to failure or an optional updated simulation
"""
vehicle = sim.vehicles.get(self.vehicle_id)
station = sim.stations.get(self.station_id)
context = f"vehicle {self.vehicle_id} exiting charging station state at station {self.station_id} with charger {self.charger_id}"
if not vehicle:
return (
SimulationStateError(f"vehicle not found; context: {context}"),
None,
)
elif not station:
return (
SimulationStateError(f"station not found; context: {context}"),
None,
)
else:
error, updated_station = station.return_charger(self.charger_id)
if error:
response = SimulationStateError(
f"failure returning charger during ChargingStation.exit for vehicle {self.vehicle_id} at station {self.station_id}"
)
response.__cause__ = error
return response, None
elif updated_station is None:
return None, None
return simulation_state_ops.modify_station(sim, updated_station)
[docs] def _has_reached_terminal_state_condition(
self, sim: "SimulationState", env: Environment
) -> bool:
"""
test if charging is finished
:param sim: the simulation state
:param env: the simulation environment
:return: True if the vehicle is fully charged
"""
vehicle = sim.vehicles.get(self.vehicle_id)
if not vehicle:
log.error(f"could not find vehicle {self.vehicle_id} in sim state")
return False
else:
mechatronics = env.mechatronics.get(vehicle.mechatronics_id)
if mechatronics is None:
log.error(f"could not find mechatronics {vehicle.mechatronics_id} in environemnt")
return False
is_full = mechatronics.is_full(vehicle)
return is_full
[docs] def _default_terminal_state(
self, sim: "SimulationState", env: Environment
) -> Tuple[Optional[Exception], Optional[VehicleState]]:
"""
give the default state to transition to after having met a terminal condition
:param sim: the simulation state
:param env: the simulation environment
:return: an exception due to failure or the next_state after finishing a task
"""
next_state = Idle.build(self.vehicle_id)
return None, next_state