Source code for nrel.hive.model.roadnetwork.linktraversal

from __future__ import annotations

from typing import Optional, NamedTuple, Tuple, TYPE_CHECKING

from nrel.hive.util.h3_ops import H3Ops
from nrel.hive.util.units import Seconds, Kilometers, Kmph, hours_to_seconds

if TYPE_CHECKING:
    from nrel.hive.util.typealiases import LinkId, GeoId


[docs]class LinkTraversal(NamedTuple): """ represents either an intention to traverse some or all of a link or, an experience over some or all of a link """ link_id: LinkId start: GeoId end: GeoId distance_km: Kilometers # TODO: this should come from the road network in real time to support variable link speeds speed_kmph: Kmph
[docs] @classmethod def build( cls, link_id: LinkId, start: GeoId, end: GeoId, speed_kmph: Kmph, distance_km: Optional[Kilometers] = None, ) -> LinkTraversal: if not distance_km: distance_km = H3Ops.great_circle_distance(start, end) return LinkTraversal( link_id=link_id, start=start, end=end, distance_km=distance_km, speed_kmph=speed_kmph, )
@property def travel_time_seconds(self) -> Seconds: return hours_to_seconds(self.distance_km / self.speed_kmph)
[docs] def update_start(self, new_start: GeoId) -> LinkTraversal: """ changes the start GeoId of the (experienced) LinkTraversal. used to set trip positions on partially-traversed Links or to set a location of a stationary entity. :param new_start: the new start GeoId (should be a position along the h3_line between Link.start and Link.end) :return: the link with an updated start GeoId """ if new_start == self.start: return self else: return self._replace(start=new_start)
[docs] def update_end(self, new_end: GeoId) -> LinkTraversal: """ changes the end GeoId of the (experienced) LinkTraversal. used to set trip positions on partially-traversed Links or to set a location of a stationary entity. :param new_end: the new end GeoId (should be a position along the h3_line between Link.start and Link.end) :return: the link with an updated end GeoId """ if new_end == self.end: return self else: return self._replace(end=new_end)
[docs]class LinkTraversalResult(NamedTuple): """ the result from a vehicle moving over a link traversal. :param traversed: represents any part of the link that was traversed. :param remaining: represents any part of the link that remains to be traversed :param remaining_time_seconds: represents any time the agent has left to traverse additional links """ traversed: Optional[LinkTraversal] remaining: Optional[LinkTraversal] remaining_time_seconds: Seconds
[docs]def traverse_up_to( link: LinkTraversal, available_time_seconds: Seconds ) -> Tuple[Optional[Exception], Optional[LinkTraversalResult]]: """ using the ground truth road network, and some agent Link traversal, attempt to traverse the link, based on travel time calculations from the Link's PropertyLink attributes. :param link: the plan the agent has to traverse a subset of a road network link :param available_time_seconds: the remaining time the agent has in this time step :return: the updated traversal, or, an exception. on update, if there is any remaining traversal, return an updated Link. if no traversal remains, return None. regardless, return the agent's remaining time after traversing if there was any error, return the exception instead. """ if link is None: return ( AttributeError(f"attempting to traverse link which does not exist"), None, ) elif link.start == link.end: # already done! result = LinkTraversalResult( traversed=None, remaining=None, remaining_time_seconds=available_time_seconds, ) return None, result else: # traverse up to available_time_hours across this link if link.travel_time_seconds <= available_time_seconds: # we can complete this link, so we return (remaining) Link = None result = LinkTraversalResult( traversed=link, remaining=None, remaining_time_seconds=(available_time_seconds - link.travel_time_seconds), ) return None, result else: # we do not have enough time to finish traversing this link, so, just traverse part of it, # leaving no remaining time. # find the point in this link to split into two sub-links mid_geoid = H3Ops.point_along_link(link, available_time_seconds) # create two sub-links, one for the part that was traversed, and one for the remaining part traversed = LinkTraversal.build( link.link_id, link.start, mid_geoid, speed_kmph=link.speed_kmph ) remaining = LinkTraversal.build( link.link_id, mid_geoid, link.end, speed_kmph=link.speed_kmph ) result = LinkTraversalResult( traversed=traversed, remaining=remaining, remaining_time_seconds=0, ) return None, result