Source code for nrel.hive.model.vehicle.vehicle

from __future__ import annotations

from dataclasses import dataclass, replace
from typing import Dict

import h3
import immutables

from nrel.hive.model.energy.energytype import EnergyType
from nrel.hive.model.entity import Entity
from nrel.hive.model.entity_position import EntityPosition
from nrel.hive.model.membership import Membership
from nrel.hive.model.roadnetwork.roadnetwork import RoadNetwork
from nrel.hive.runner.environment import Environment
from nrel.hive.state.driver_state.driver_state import DriverState
from nrel.hive.state.vehicle_state.idle import Idle
from nrel.hive.state.vehicle_state.vehicle_state import VehicleState
from nrel.hive.util.typealiases import *
from nrel.hive.util.units import Kilometers, Currency


[docs]@dataclass(frozen=True) class Vehicle(Entity): """ Tuple that represents a vehicle in the simulation. :param id: A unique vehicle id. :param mechatronics_id: A id of the mechatronics component of the vehicle. :param energy: The energy of the vehicle :param link: The current location of the vehicle :param vehicle_state: The state that the vehicle is in. :param balance: How much revenue the vehicle has accumulated. :param distance_traveled_km: A accumulator to track how far a vehicle has traveled. """ # core vehicle properties id: VehicleId # location position: EntityPosition membership: Membership # mechatronic properties mechatronics_id: MechatronicsId energy: immutables.Map[EnergyType, float] energy_gained: immutables.Map[EnergyType, float] energy_expended: immutables.Map[EnergyType, float] # vehicle planning/operational properties vehicle_state: VehicleState driver_state: DriverState total_seats: int # available_seats: int # vehicle analytical properties balance: Currency = 0.0 distance_traveled_km: Kilometers = 0.0 @property def geoid(self): return self.position.geoid
[docs] @classmethod def from_row( cls, row: Dict[str, str], road_network: RoadNetwork, environment: Environment, ) -> Vehicle: """ reads a csv row from file to generate a Vehicle :param environment: :param row: a row of a .csv which matches hive.util.pattern.vehicle_regex. this string will be stripped of whitespace characters (no spaces allowed in names!) :param road_network: the road network, used to find the vehicle's location in the sim :return: a vehicle, or, an IOError if failure occurred. """ if "vehicle_id" not in row: raise IOError("cannot load a vehicle without a 'vehicle_id'") elif "lat" not in row: raise IOError("cannot load a vehicle without a 'lat'") elif "lon" not in row: raise IOError("cannot load a vehicle without a 'lon'") elif "mechatronics_id" not in row: raise IOError("cannot load a vehicle without a 'mechatronics_id'") else: try: vehicle_id = row["vehicle_id"] lat = float(row["lat"]) lon = float(row["lon"]) mechatronics_id = row["mechatronics_id"] mechatronics = environment.mechatronics.get(mechatronics_id) if not mechatronics: found = set(environment.mechatronics.keys()) raise IOError( f"was not able to find mechatronics '{mechatronics_id}' for vehicle {vehicle_id} in environment: found {found}" ) energy = mechatronics.initial_energy(float(row["initial_soc"])) energy_expended = mechatronics.initial_energy(0.0) energy_gained = mechatronics.initial_energy(0.0) schedule_id = row.get( "schedule_id" ) # if None, it signals an autonomous vehicle, otherwise, human with schedule home_base_id = row.get("home_base_id") if schedule_id and not schedule_id in environment.schedules.keys(): raise IOError( f"was not able to find schedule '{schedule_id}' in environment for vehicle {vehicle_id}" ) allows_pooling = ( bool(row["allows_pooling"]) if row.get("allows_pooling") is not None else False ) available_seats = int(row.get("available_seats", 0)) driver_state = DriverState.build( vehicle_id, schedule_id, home_base_id, allows_pooling ) geoid = h3.geo_to_h3(lat, lon, road_network.sim_h3_resolution) start_position = road_network.position_from_geoid(geoid) if start_position is None: raise IOError( f"vehicle {vehicle_id} cannot be positioned on the road network; check the input lat/lon" ) return Vehicle( id=vehicle_id, mechatronics_id=mechatronics_id, energy=energy, energy_expended=energy_expended, energy_gained=energy_gained, membership=Membership(), position=start_position, vehicle_state=Idle.build(vehicle_id), driver_state=driver_state, total_seats=available_seats, # available_seats=available_seats ) except ValueError as err: raise IOError(f"failure reading vehicle row {row}") from err
[docs] def __repr__(self) -> str: return f"Vehicle({self.id},{self.vehicle_state})"
[docs] def modify_energy(self, energy: immutables.Map[EnergyType, float]) -> Vehicle: """ modify the energy level of the vehicle. should only be used by the mechatronics ops :param energy: :return: """ return replace(self, energy=energy)
[docs] def modify_vehicle_state(self, vehicle_state: VehicleState) -> Vehicle: """ modify the state of the vehicle. should only be use by the vehicle state ops :param vehicle_state: :return: """ return replace(self, vehicle_state=vehicle_state)
[docs] def modify_driver_state(self, driver_state: DriverState) -> Vehicle: """ modify the state of the vehicle's driver. should only be used by the driver state ops :param driver_state: :return: """ return replace(self, driver_state=driver_state)
[docs] def modify_position(self, position: EntityPosition) -> Vehicle: """ modify the link of the vehicle. should only be used by the road network ops :param position: :return: """ return replace(self, position=position)
[docs] def send_payment(self, amount: Currency) -> Vehicle: """ updates the Vehicle's balance based on sending a payment :param amount: the amount to pay :return: the updated Vehicle """ return replace(self, balance=self.balance - amount)
[docs] def receive_payment(self, amount: Currency) -> Vehicle: """ updates the Vehicle's balance based on receiving a payment :param amount: the amount to be paid :return: the updated Vehicle """ return replace(self, balance=self.balance + amount)
[docs] def tick_distance_traveled_km(self, delta_d_km: Kilometers) -> Vehicle: """ adds distance to vehicle :param delta_d_km: :return: """ return replace(self, distance_traveled_km=self.distance_traveled_km + delta_d_km)
[docs] def tick_energy_expended(self, delta_energy: immutables.Map[EnergyType, float]) -> Vehicle: """ adds energy expenditure to vehicle :param delta_energy: :return: """ energy_expended = {k: self.energy_expended[k] + delta_energy[k] for k in self.energy.keys()} return replace(self, energy_expended=immutables.Map(energy_expended))
[docs] def tick_energy_gained(self, delta_energy: immutables.Map[EnergyType, float]) -> Vehicle: """ adds energy gain to vehicle :param delta_energy: :return: """ energy_gained = {k: self.energy_gained[k] + delta_energy[k] for k in self.energy.keys()} return replace(self, energy_gained=immutables.Map(energy_gained))
[docs] def set_membership(self, member_ids: Tuple[str, ...]) -> Vehicle: """ sets the membership(s) of the vehicle :param member_ids: a Tuple containing updated membership(s) of the vehicle :return: """ return replace(self, membership=Membership.from_tuple(member_ids))
[docs] def add_membership(self, membership_id: MembershipId) -> Vehicle: """ adds the membership to the vehicle :param membership_id: a membership for the vehicle :return: """ updated_membership = self.membership.add_membership(membership_id) return replace(self, membership=updated_membership)