from __future__ import annotations
import functools as ft
import logging
from typing import Dict, Tuple, Callable, NamedTuple, Optional, TYPE_CHECKING
import sys
import h3
import numpy as np
from scipy.optimize import linear_sum_assignment
from nrel.hive.model.roadnetwork.route import (
route_distance_km,
route_travel_time_seconds,
)
from nrel.hive.model.station.station import Station
from nrel.hive.model.vehicle.mechatronics.powercurve import powercurve_ops
from nrel.hive.model.vehicle.vehicle import Vehicle
from nrel.hive.runner import Environment
from nrel.hive.state.simulation_state.simulation_state import SimulationState
from nrel.hive.state.vehicle_state.charge_queueing import ChargeQueueing
from nrel.hive.state.vehicle_state.charging_station import ChargingStation
from nrel.hive.util.h3_ops import H3Ops
from nrel.hive.util.tuple_ops import TupleOps
if TYPE_CHECKING:
from nrel.hive.util.units import Ratio, Seconds
from nrel.hive.util.typealiases import *
from nrel.hive.model.entity import EntityABC
log = logging.getLogger(__name__)
MAX_DIST = 999999999.0
MAX_TIME = 999999999.0
[docs]class AssignmentSolution(NamedTuple):
"""
each call of find_assignment produces an AssignmentSolution which has any
assignments (a pair of two ids), along with the total cost of this assignment.
"""
solution: Tuple[Tuple[EntityId, EntityId], ...] = ()
solution_cost: float = 0.0
[docs] def add(self, pair: Tuple[EntityId, EntityId], cost: float) -> AssignmentSolution:
return self._replace(
solution=(pair,) + self.solution,
solution_cost=self.solution_cost + cost,
)
[docs]def find_assignment(
assignees: Tuple[EntityABC, ...],
targets: Tuple[EntityABC, ...],
cost_fn: Callable[[EntityABC, EntityABC], float],
) -> AssignmentSolution:
"""
:param assignees: entities we are assigning to. assumed to have an id field.
:param targets: the different entities that each assignee can be assigned to. assumed to have an id field.
:param cost_fn: computes the cost of choosing a specific assignee (slot 1) with a specific target (slot 2)
:return: a collection of pairs of (AssigneeId, TargetId) indicating the solution, along with it's cost
"""
if len(assignees) == 0 or len(targets) == 0:
return AssignmentSolution()
else:
initial_cost = float("inf")
table = np.full((len(assignees), len(targets)), initial_cost)
# evaluate the cost of all possible assignments between each assignee/target pair
upper_bound = float("-inf")
for i in range(len(assignees)):
for j in range(len(targets)):
cost = cost_fn(assignees[i], targets[j])
upper_bound = cost if cost > upper_bound and cost != float("inf") else upper_bound
table[i][j] = cost
# linear_sum_assignment borks with infinite values; this 2nd step replaces
# float("inf") values with an upper-bound value which is 1 beyond our highest-observed value
upper_bound += 1
table[table == float("inf")] = upper_bound
# apply the Kuhn-Munkres algorithm
rows, cols = linear_sum_assignment(table)
# interpret the row/column assignments back to EntityIds and compute the total cost of this assignment
def _add_to_solution(assignment_solution: AssignmentSolution, i: int) -> AssignmentSolution:
this_pair = (assignees[rows[i]].id, targets[cols[i]].id)
this_cost = table[rows[i]][cols[i]]
return assignment_solution.add(this_pair, this_cost)
solution = ft.reduce(_add_to_solution, range(len(rows)), AssignmentSolution())
return solution
[docs]def h3_distance_cost(a: EntityABC, b: EntityABC) -> float:
"""
cost function based on the h3_distance between two entities
:param a: one entity, expected to have a geoid
:param b: another entity, expected to have a geoid
:return: the h3_distance (number of cells between)
"""
distance = h3.h3_distance(a.geoid, b.geoid)
return distance
[docs]def great_circle_distance_cost(a: EntityABC, b: EntityABC) -> float:
"""
cost function based on the great circle distance between two entities.
reverts h3 geoid to a lat/lon pair and calculates the haversine distance.
:param a: one entity, expected to have a geoid
:param b: another entity, expected to have a geoid
:return: the haversine (great circle) distance in lat/lon
"""
distance = H3Ops.great_circle_distance(a.geoid, b.geoid)
return distance
[docs]def nearest_shortest_queue_distance(
vehicle: Vehicle, env: Environment
) -> Callable[[Station], float]:
"""
set up a shortest queue distance function which will rank station alternatives based on
the availability of on-shift charging and a simple heuristic based on Euclidean distance
and smallest queue size.
:param vehicle: the vehicle
:param env: simulation environment
:return: a station distance function
"""
def fn(station: Station) -> float:
result = nearest_shortest_queue_ranking(vehicle, station, env, MAX_DIST)
if result is None:
return MAX_DIST #
else:
_, rank = result
return rank
return fn
[docs]def nearest_shortest_queue_ranking(
vehicle: Vehicle, station: Station, env: Environment, max_dist=999999999.0
) -> Tuple[Optional[ChargerId], float]:
"""
sort ordering that prioritizes short vehicle queues where possible, using h3_distance
as the base distance metric and extending that value by the proportion of available chargers
:param vehicle: the vehicle
:param station: a station
:param env: simulation environment
:param max_dist: upper-bound on distance values
:return: the distance metric for this station, a function of it's queue sizes and h3 distance
"""
distance = h3.h3_distance(vehicle.geoid, station.geoid)
def _inner(
acc: Tuple[Optional[ChargerId], float], charger_id: ChargerId
) -> Tuple[Optional[ChargerId], float]:
vehicle_mechatronics = env.mechatronics.get(vehicle.mechatronics_id)
if vehicle_mechatronics is None:
log.error(f"mechatronics {vehicle.mechatronics_id} not found for vehicle {vehicle.id}")
return (None, 0.0)
charger = env.chargers.get(charger_id)
if charger is None:
log.error(f"charger id {charger_id} not found in environment")
return (None, 0.0)
total_chargers = station.get_total_chargers(charger_id)
if (
not vehicle_mechatronics.valid_charger(charger)
or total_chargers is None
or total_chargers == 0
):
# vehicle can't use this charger so we skip it, or,
# station doesn't actually have this charger (an error condition really)
return acc
else:
prev_best_charger_id, prev_best_distance_metric = acc
enqueued_for_charger_id = station.enqueued_vehicle_count_for_charger(charger_id)
if enqueued_for_charger_id is None:
log.error(f"charger id {charger_id} not found at station {station.id}")
enqueued_for_charger_id = 0
queue_factor = enqueued_for_charger_id / total_chargers
this_distance_metric = distance + distance * queue_factor
if prev_best_distance_metric < this_distance_metric:
return acc
else:
return charger_id, this_distance_metric
# find the lowest nearest_shortest_queue distance metric
# amongst the possible on-shift charging options at this station
initial: Tuple[Optional[str], float] = (None, max_dist)
best_charger_id, best_charger_rank = ft.reduce(
_inner, station.on_shift_access_chargers, initial
)
return (
None if best_charger_id is None else best_charger_id,
best_charger_rank,
)
[docs]def shortest_time_to_charge_distance(
vehicle: Vehicle, sim: SimulationState, env: Environment, target_soc: Ratio
) -> Callable[[Station], float]:
"""
ranks this station by an estimate of the time which would pass until this agent reaches a target charge level
this function returns a distance function which accepts a Station and returns Seconds
:param vehicle: a vehicle
:param sim: the simulation state
:param env: the simulation environment
:param target_soc: the SoC we are attempting to reach in this charge session
:return: the distance metric for this vehicle/station pair (lower is better)
"""
def fn(station: Station) -> float:
result = shortest_time_to_charge_ranking(sim, env, vehicle, station, target_soc)
dist = 999999999.0 if result is None else result[1]
return dist
return fn
[docs]def shortest_time_to_charge_ranking(
sim: SimulationState,
env: Environment,
vehicle: Vehicle,
station: Station,
target_soc: Ratio,
) -> Optional[Tuple[ChargerId, float]]:
"""
given a station charging alternative, determine the time it would take to charge
using the best charger type available
:param sim: simulation state
:param env: the simulation environment
:param vehicle: the vehicle
:param station: the station to rank
:param target_soc: target vehicle charging SoC percentage
:return: a ranking (estimated travel + queue + charge time) for accessing the best-ranked charger
"""
vehicle_mechatronics = env.mechatronics.get(vehicle.mechatronics_id)
remaining_range = (
vehicle_mechatronics.range_remaining_km(vehicle) if vehicle_mechatronics else 0.0
)
route = sim.road_network.route(vehicle.position, station.position)
distance_km = route_distance_km(route)
if not vehicle_mechatronics or remaining_range < distance_km:
# vehicle does not have remaining range to reach this station
# return a signal that demotes this Station alternative to the bottom of the ranking
return None
else:
def _veh_at_station(v: Vehicle) -> bool:
return (
isinstance(v.vehicle_state, ChargingStation)
and v.vehicle_state.station_id == station.id
)
def _veh_enqueued(v: Vehicle) -> bool:
return (
isinstance(v.vehicle_state, ChargeQueueing)
and v.vehicle_state.station_id == station.id
)
def _simulate_charge_session(c: ChargerId):
def _sim(v: Vehicle) -> Seconds:
_mech = env.mechatronics.get(v.mechatronics_id)
_charger = env.chargers.get(c)
if not _mech or not _charger:
return 0
else:
max_iter = (
int(env.config.sim.end_time - sim.sim_time)
/ sim.sim_timestep_duration_seconds
)
time_est = powercurve_ops.time_to_full(
v,
_mech,
_charger,
target_soc,
sim.sim_timestep_duration_seconds,
min_delta_energy_change=env.config.sim.min_delta_energy_change,
max_iterations=int(max_iter),
)
return time_est
return _sim
def _sort_enqueue_time(v: Vehicle) -> Tuple[int, str]:
if isinstance(v.vehicle_state, ChargeQueueing):
enqueue_time = int(v.vehicle_state.enqueue_time)
else:
log.error(
"calling _sort_enqueue_time on a vehicle state that is not ChargeQueueing"
)
enqueue_time = 0
return (enqueue_time, v.id)
def _greedy_assignment(
vehicles_at_station: Tuple[Vehicle, ...], charger_id: ChargerId
) -> Seconds:
"""
computes the time estimated that a slot opens up for this vehicle to begin charging
:param _charging: a sorted list of remaining charge time estimates
:param _enqueued: a sorted list of charge time estimates for enqueued vehicles
:param _charger_id: the id of the charger these vehicles are competing for
:param time_passed: the amount of time that has been estimated
:param depth: recursion depth
:return: the time in the future we should expect to begin charging, determined by a greedy assignment
"""
def _using_charger(charging_vehicle: Vehicle) -> bool:
if isinstance(charging_vehicle.vehicle_state, ChargingStation):
if charging_vehicle.vehicle_state.charger_id == charger_id:
return True
return False
def _waiting_for_charger(enqueued_vehicle: Vehicle) -> bool:
if isinstance(enqueued_vehicle.vehicle_state, ChargeQueueing):
if enqueued_vehicle.vehicle_state.charger_id == charger_id:
return True
return False
# collect all estimated remaining charge times for charging vehicles and sort them
charge_times = list(
sorted(
map(
_simulate_charge_session(charger_id),
filter(_using_charger, vehicles_at_station),
)
)
)
# collect estimated remaining charge times for vehicles enqueued for this charger
# leave them sorted by enqueue time
enqueue_times = list(
map(
_simulate_charge_session(charger_id),
filter(_waiting_for_charger, vehicles_enqueued),
)
)
def _simulating():
return len(charge_times) > 0 and len(enqueue_times) > 0
total_chargers = station.get_total_chargers(charger_id)
if total_chargers is None:
log.warn(f"charger id {charger_id} not found at station {station.id}")
return 0
else:
time_passed = 0
while _simulating():
# advance time
next_delta = charge_times.pop(0)
time_passed += next_delta
# remove charging agents who are done, shift times by delta
charging_times = list(
filter(lambda t: t > 0, map(lambda t: t - next_delta, charge_times))
)
vacancies = (total_chargers - len(charging_times)) > 0
if vacancies:
# move enqueued vehicles into the station
charging_times.extend(enqueue_times[0:vacancies])
enqueue_times = enqueue_times[vacancies:]
return time_passed
# collect all vehicles that are either charging or enqueued at this station
vehicles_at_station = sim.get_vehicles(filter_function=_veh_at_station)
vehicles_enqueued = sim.get_vehicles(
filter_function=_veh_enqueued,
sort_key=_sort_enqueue_time,
)
estimates: Dict[ChargerId, int] = {}
for charger_id in sorted(station.state.keys()):
charger_state = station.state.get(charger_id)
charger = charger_state.charger if charger_state is not None else None
if charger is None or not vehicle_mechatronics.valid_charger(charger):
# vehicle can't use this charger so we skip it
continue
# compute the charge time for the vehicle we are ranking
max_iter = (
int(env.config.sim.end_time - sim.sim_time) / sim.sim_timestep_duration_seconds
)
this_vehicle_charge_time = powercurve_ops.time_to_full(
vehicle,
vehicle_mechatronics,
charger,
target_soc,
sim.sim_timestep_duration_seconds,
min_delta_energy_change=env.config.sim.min_delta_energy_change,
max_iterations=int(max_iter),
)
# compute the estimated wait time to access a charger for the vehicle we are ranking
wait_estimate_for_charger = _greedy_assignment(vehicles_at_station, charger_id)
# combine wait time with charge time
overall_time_est = this_vehicle_charge_time + wait_estimate_for_charger
estimates.update({charger_id: overall_time_est})
if not estimates:
# there are no chargers the vehicle can use.
return None
best_charger_id = min(estimates, key=estimates.__getitem__)
# return the best "distance" aka shortest estimated time to finish charging
best_overall_time = estimates[best_charger_id]
dispatch_time_seconds = route_travel_time_seconds(route)
return best_charger_id, dispatch_time_seconds + best_overall_time