from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Tuple, Optional, TYPE_CHECKING
from uuid import uuid4
from nrel.hive.state.simulation_state import simulation_state_ops
from nrel.hive.state.vehicle_state.reserve_base import ReserveBase
from nrel.hive.state.vehicle_state.vehicle_state import (
VehicleState,
VehicleStateInstanceId,
)
from nrel.hive.state.vehicle_state.vehicle_state_ops import charge
from nrel.hive.state.vehicle_state.vehicle_state_type import VehicleStateType
from nrel.hive.util.exception import SimulationStateError
from nrel.hive.util.typealiases import BaseId, VehicleId, ChargerId
if TYPE_CHECKING:
from nrel.hive.state.simulation_state.simulation_state import SimulationState
from nrel.hive.runner.environment import Environment
log = logging.getLogger(__name__)
[docs]@dataclass(frozen=True)
class ChargingBase(VehicleState):
"""
a vehicle is charging at a base with a specific charger_id type
"""
vehicle_id: VehicleId
base_id: BaseId
charger_id: ChargerId
instance_id: VehicleStateInstanceId
[docs] @classmethod
def build(cls, vehicle_id: VehicleId, base_id: BaseId, charger_id: ChargerId) -> ChargingBase:
return ChargingBase(
vehicle_id=vehicle_id,
base_id=base_id,
charger_id=charger_id,
instance_id=uuid4(),
)
@property
def vehicle_state_type(cls) -> VehicleStateType:
return VehicleStateType.CHARGING_BASE
[docs] def enter(
self, sim: SimulationState, env: Environment
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
"""
entering a charge event requires attaining a charger_id from the station situated at the base
:param sim: the simulation state
:param env: the simulation environment
:return: an exception due to failure or an optional updated simulation, or (None, None) if not possible
"""
vehicle = sim.vehicles.get(self.vehicle_id)
base = sim.bases.get(self.base_id)
station_id = base.station_id if base is not None and base.station_id is not None else None
station = sim.stations.get(station_id) if station_id is not None else None
mechatronics = (
env.mechatronics.get(vehicle.mechatronics_id) if vehicle is not None else None
)
context = f"vehicle {self.vehicle_id} entering charging base state at base {self.base_id} with charger {self.charger_id}"
if not vehicle:
msg = f"vehicle not found; context {context}"
return SimulationStateError(msg), None
elif not base:
msg = f"base not found; context: {context}"
return SimulationStateError(msg), None
elif not base.station_id:
msg = f"base does not have attached station; context: {context}"
return SimulationStateError(msg), None
elif not station:
msg = f"station {base.station_id} at base not found; context: {context}"
return SimulationStateError(msg), None
elif not mechatronics:
msg = f"vehicle {vehicle.id} has invalid mechatronics id; context: {context}"
return SimulationStateError(msg), None
elif not base.membership.grant_access_to_membership(vehicle.membership):
msg = f"vehicle doesn't have access to base; context: {context}"
return SimulationStateError(msg), None
else:
# actually claim the parking stall
updated_base = base.checkout_stall()
if not updated_base:
# no stall available for charging
return None, None
else:
# grab the charger from the station
charger_err, charger = station.get_charger_instance(self.charger_id)
if charger_err is not None:
return charger_err, None
elif charger is None:
return None, None
elif not mechatronics.valid_charger(charger):
msg = f"vehicle of type {vehicle.mechatronics_id} can't use charger; context: {context}"
return SimulationStateError(msg), None
else:
# check out this charger from the station
error, updated_station = station.checkout_charger(self.charger_id)
if error is not None:
return error, None
elif not updated_station:
log.warning(
f"vehicle {self.vehicle_id} can't checkout {self.charger_id} from {station.id}"
)
return None, None
else:
# update the base + station state
err1, sim2 = simulation_state_ops.modify_base(sim, updated_base)
if err1:
response = SimulationStateError(
f"failure during ChargingBase.enter for vehicle {self.vehicle_id}"
)
response.__cause__ = err1
return response, None
elif sim2 is None:
return (
Exception("sim state should not be none if there was no error"),
None,
)
else:
err2, sim3 = simulation_state_ops.modify_station(sim2, updated_station)
if err2:
response = SimulationStateError(
f"failure during ChargingBase.enter for vehicle {self.vehicle_id}"
)
response.__cause__ = err2
return response, None
elif sim3 is None:
return (
Exception("sim state should not be none if there was no error"),
None,
)
else:
return VehicleState.apply_new_vehicle_state(
sim3, self.vehicle_id, self
)
[docs] def update(
self, sim: SimulationState, env: Environment
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
return VehicleState.default_update(sim, env, self)
[docs] def exit(
self, next_state: VehicleState, sim: SimulationState, env: Environment
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
"""
exiting a charge event requires returning the charger_id to the station at the base
:param sim: the simulation state
:param env: the simulation environment
:return: an exception due to failure or an optional updated simulation
"""
context = f"vehicle {self.vehicle_id} exiting charging base state at base {self.base_id}"
vehicle = sim.vehicles.get(self.vehicle_id)
base = sim.bases.get(self.base_id)
if base is None:
return (
SimulationStateError(f"base not found; context: {context}"),
None,
)
station = sim.stations.get(base.station_id) if base.station_id else None
if not vehicle:
return (
SimulationStateError(f"vehicle not found; context: {context}"),
None,
)
elif not station:
return (
SimulationStateError(f"station for base not found; context: {context}"),
None,
)
else:
err1, updated_base = base.return_stall()
if err1:
response = SimulationStateError(
f"failure during ChargingBase.exit for vehicle {self.vehicle_id}"
)
response.__cause__ = err1
return response, None
elif updated_base is None:
return Exception("base should not be none if there was no error"), None
else:
err2, sim2 = simulation_state_ops.modify_base(sim, updated_base)
if err2:
response = SimulationStateError(
f"failure during ChargingBase.exit for vehicle {self.vehicle_id}"
)
response.__cause__ = err2
return response, None
elif sim2 is None:
return Exception("sim should not be none if there was no error"), None
else:
err3, updated_station = station.return_charger(self.charger_id)
if err3:
response = SimulationStateError(
f"failure during ChargingBase.exit for vehicle {self.vehicle_id}"
)
response.__cause__ = err3
return response, None
elif updated_station is None:
return Exception("station should not be none if there was no error"), None
return simulation_state_ops.modify_station(sim2, updated_station)
[docs] def _has_reached_terminal_state_condition(self, sim: SimulationState, env: Environment) -> bool:
"""
test if charging is finished
:param sim: the simulation state
:param env: the simulation environment
:return: True if the vehicle is fully charged
"""
vehicle = sim.vehicles.get(self.vehicle_id)
if not vehicle:
return False
mechatronics = env.mechatronics.get(vehicle.mechatronics_id)
if mechatronics is None:
return False
else:
return mechatronics.is_full(vehicle)
[docs] def _default_terminal_state(
self, sim: SimulationState, env: Environment
) -> Tuple[Optional[Exception], Optional[VehicleState]]:
"""
give the default state to transition to after having met a terminal condition
:param sim: the simulation state
:param env: the simulation environment
:return: an exception due to failure or the next_state after finishing a task
"""
next_state = ReserveBase.build(self.vehicle_id, self.base_id)
return None, next_state