from __future__ import annotations
from typing import Tuple, Optional, NamedTuple, TYPE_CHECKING
import immutables
from nrel.hive.model.entity_position import EntityPosition
from nrel.hive.model.roadnetwork.route import empty_route
from nrel.hive.model.roadnetwork.routetraversal import traverse, RouteTraversal
from nrel.hive.model.vehicle.vehicle import Vehicle
from nrel.hive.reporting.vehicle_event_ops import (
vehicle_move_event,
vehicle_charge_event,
)
from nrel.hive.state.simulation_state import simulation_state_ops
from nrel.hive.state.vehicle_state.out_of_service import OutOfService
from nrel.hive.util.exception import SimulationStateError
from nrel.hive.util.typealiases import StationId, ChargerId
from nrel.hive.util.typealiases import VehicleId
if TYPE_CHECKING:
from nrel.hive.state.simulation_state.simulation_state import SimulationState
from nrel.hive.runner.environment import Environment
[docs]def charge(
sim: SimulationState,
env: Environment,
vehicle_id: VehicleId,
station_id: StationId,
charger_id: ChargerId,
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
"""
apply any effects due to a vehicle being advanced one discrete time unit in this VehicleState
:param sim: the simulation state
:param env: the simulation environment
:param vehicle_id: the vehicle transitioning
:param station_id: the station where we are charging
:param charger_id: the charger_id we are using
:return: an exception due to failure or an optional updated simulation
"""
context = f"""
vehicle {vehicle_id} attempting to charge at station {station_id} with charger {charger_id}"
"""
vehicle = sim.vehicles.get(vehicle_id)
mechatronics = env.mechatronics.get(vehicle.mechatronics_id) if vehicle else None
station = sim.stations.get(station_id)
if station is None:
return (
SimulationStateError(f"station not found; context {context}"),
None,
)
charger_err, charger = station.get_charger_instance(charger_id)
if not vehicle:
return (
SimulationStateError(f"vehicle not found; context: {context}"),
None,
)
elif not mechatronics:
return (
SimulationStateError(
f"invalid mechatronics_id {vehicle.mechatronics_id}; context: {context}"
),
None,
)
elif not station:
return (
SimulationStateError(f"station not found; context {context}"),
None,
)
elif charger_err is not None:
return charger_err, None
elif not charger:
return (
SimulationStateError(f"invalid charger_id; context: {context}"),
None,
)
elif mechatronics.is_full(vehicle):
return (
SimulationStateError(
f"vehicle is full but still attempting to charge; context {context}"
),
None,
)
else:
charged_vehicle, _ = mechatronics.add_energy(
vehicle, charger, sim.sim_timestep_duration_seconds
)
# determine price of charge event
kwh_transacted = (
charged_vehicle.energy[charger.energy_type] - vehicle.energy[charger.energy_type]
) # kwh
charger_price = station.get_price(charger_id) # Currency
charging_price = kwh_transacted * charger_price if charger_price else 0.0
# perform updates
updated_vehicle = charged_vehicle.send_payment(charging_price)
updated_station = station.receive_payment(charging_price)
updated_station = updated_station.tick_energy_dispensed(
immutables.Map({charger.energy_type: kwh_transacted})
)
veh_error, sim_with_vehicle = simulation_state_ops.modify_vehicle(sim, updated_vehicle)
if veh_error:
response = SimulationStateError(f"failure during charge for vehicle {vehicle.id}")
response.__cause__ = veh_error
return response, None
elif sim_with_vehicle is None:
return None, None
else:
report = vehicle_charge_event(
vehicle,
updated_vehicle,
sim_with_vehicle,
updated_station,
charger,
mechatronics,
)
env.reporter.file_report(report)
return simulation_state_ops.modify_station(sim_with_vehicle, updated_station)
[docs]class MoveResult(NamedTuple):
sim: SimulationState
prev_vehicle: Optional[Vehicle] = None
next_vehicle: Optional[Vehicle] = None
route_traversal: RouteTraversal = RouteTraversal()
[docs]def _go_out_of_service_on_empty(
sim: SimulationState, env: Environment, vehicle_id: VehicleId
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
"""
sets a vehicle to OutOfService if it is out of energy after a move event.
this assumes we've already confirmed a vehicle is out of energy.
:param sim: the sim before the move event
:param env: the sim environment
:param vehicle_id: the vehicle that moved and ran out of energy
:return: an optional error, or an optional sim with the out of service vehicle
"""
# TODO: ways we can improve this:
# - find the exact point in the route where a vehicle runs out of
# energy and move it there before transitioning
# to out of service.
# - report stranded passengers if we're servicing a trip when this happens.
next_state = OutOfService.build(vehicle_id)
return next_state.enter(sim, env)
[docs]def move(
sim: SimulationState, env: Environment, vehicle_id: VehicleId
) -> Tuple[Optional[Exception], Optional[SimulationState]]:
"""
Moves the vehicles.
Transitions to OutOfService if the vehicle is empty
:param sim: the simulation state
:param env: the simulation environment
:param vehicle_id: the vehicle moving
:return: an error, or a sim with the moved vehicle, or (None, None) if no changes
"""
context = f"vehicle {vehicle_id} attempting to move"
vehicle = sim.vehicles.get(vehicle_id)
if not vehicle:
return (
SimulationStateError(f"vehicle not found; context {context}"),
None,
)
mechatronics = env.mechatronics.get(vehicle.mechatronics_id)
if not mechatronics:
return (
SimulationStateError(f"cannot find {vehicle.mechatronics_id} in environment"),
None,
)
if not hasattr(vehicle.vehicle_state, "route"):
return (
SimulationStateError(f"vehicle state does not have route; context {context}"),
None,
)
else:
route = vehicle.vehicle_state.route
error, traverse_result = traverse(
route_estimate=route,
duration_seconds=int(sim.sim_timestep_duration_seconds),
road_network=sim.road_network,
)
if error:
return error, None
elif traverse_result is None:
return None, None
if not traverse_result.experienced_route:
# vehicle did not traverse so we set an empty route
if not hasattr(vehicle.vehicle_state, "update_route"):
return (
SimulationStateError(
f"vehicle state does not have update_route method; context {context}"
),
None,
)
else:
er = empty_route()
updated_vehicle_state = vehicle.vehicle_state.update_route(route=er)
updated_vehicle = vehicle.modify_vehicle_state(updated_vehicle_state)
else:
experienced_route = traverse_result.experienced_route
remaining_route = traverse_result.remaining_route
less_energy_vehicle = mechatronics.consume_energy(vehicle, experienced_route)
if mechatronics.is_empty(less_energy_vehicle):
# impossible to move, let's transition to OutOfService
return _go_out_of_service_on_empty(sim, env, vehicle_id)
step_distance_km = traverse_result.traversal_distance_km
last_link_traversed = experienced_route[-1]
vehicle_position = EntityPosition(last_link_traversed.link_id, last_link_traversed.end)
new_position_vehicle = less_energy_vehicle.modify_position(
position=vehicle_position
).tick_distance_traveled_km(step_distance_km)
if not hasattr(new_position_vehicle.vehicle_state, "update_route"):
return (
SimulationStateError(
f"vehicle state does not have update_route method; context {context}"
),
None,
)
else:
new_route_state = new_position_vehicle.vehicle_state.update_route(route=remaining_route)
updated_vehicle = new_position_vehicle.modify_vehicle_state(new_route_state)
report = vehicle_move_event(sim, vehicle, updated_vehicle, traverse_result, env)
env.reporter.file_report(report)
error, moved_sim = simulation_state_ops.modify_vehicle(sim, updated_vehicle)
if error:
response = SimulationStateError(
f"failure during _apply_route_traversal for vehicle {vehicle.id}"
)
response.__cause__ = error
return response, None
else:
return None, moved_sim