Source code for nrel.hive.reporting.handler.time_step_stats_handler

from __future__ import annotations

import logging
import os
from collections import Counter
from collections.abc import Sequence
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Dict, FrozenSet, Optional, List

import numpy as np
from immutables import Map

from nrel.hive.reporting.handler.handler import Handler
from nrel.hive.reporting.report_type import ReportType
from nrel.hive.state.vehicle_state.vehicle_state_type import VehicleStateType
from nrel.hive.util.io import to_csv, to_csv_dicts

if TYPE_CHECKING:
    from nrel.hive.config import HiveConfig
    from nrel.hive.model.vehicle.vehicle import Vehicle
    from nrel.hive.runner.runner_payload import RunnerPayload
    from nrel.hive.reporting.reporter import Report
    from nrel.hive.util.typealiases import MembershipId

log = logging.getLogger(__name__)


[docs]class TimeStepStatsHandler(Handler): def __init__( self, config: HiveConfig, scenario_output_directory: Path, fleet_ids: FrozenSet[Optional[MembershipId]], file_name: Optional[str] = "time_step_stats", ): self.file_name = file_name self.start_time = config.sim.start_time self.timestep_duration_seconds = config.sim.timestep_duration_seconds self.vehicle_state_names = tuple(vs.name for vs in VehicleStateType) if config.global_config.log_time_step_stats: self.log_time_step_stats = True self.data: list = [] self.time_step_stats_outpath = scenario_output_directory.joinpath( f"{file_name}_all.csv" ) else: self.log_time_step_stats = False if config.global_config.log_fleet_time_step_stats and len(fleet_ids) > 0: self.log_fleet_time_step_stats = True self.fleets_timestep_stats_outpath = scenario_output_directory.joinpath( "fleet_time_step_stats/" ) self.fleets_data: dict = {} for fleet_id in fleet_ids: if fleet_id is None: self.fleets_data["none"] = [] else: self.fleets_data[fleet_id] = [] else: self.log_fleet_time_step_stats = False
[docs] def get_time_step_stats(self) -> list: """ return a DataFrame of the time step level statistics. :return: the time step stats DataFrame """ return self.data
[docs] def get_fleet_time_step_stats( self, ) -> Map[MembershipId, Sequence]: """ return an immutable map of time step stat data by membership id. :return: the immutable map containing time step stats data by membership id """ result = Map( {fleet_id: data if data else None for fleet_id, data in self.fleets_data.items()} ) return result
[docs] def handle(self, reports: List[Report], runner_payload: RunnerPayload): """ called at each log step. aggregates various statistics to the time bin level :param reports: reports for gathering statistics :param runner_payload :return: """ sim_state = runner_payload.s env = runner_payload.e # get the time step sim_time = sim_state.sim_time time_step = int( (sim_time.as_epoch_time() - self.start_time.as_epoch_time()) / self.timestep_duration_seconds ) # sort reports by type reports_by_type: Dict[ReportType, List[Report]] = {} for report in reports: if report.report_type not in reports_by_type.keys(): reports_by_type[report.report_type] = [] reports_by_type[report.report_type].append(report) # get number of assigned requests in this time step assigned_requests_count = len( sim_state.get_requests(filter_function=lambda r: r.dispatched_vehicle is not None) ) # get number of active requests in this time step (unassigned) active_requests_count = len(sim_state.requests) - assigned_requests_count # get number of canceled requests in this time step if ReportType.CANCEL_REQUEST_EVENT in reports_by_type.keys(): canceled_requests_count = len(reports_by_type[ReportType.CANCEL_REQUEST_EVENT]) else: canceled_requests_count = 0 if self.log_time_step_stats: # grab all vehicles that are pooling veh_pooling = sim_state.get_vehicles( filter_function=lambda v: v.vehicle_state.vehicle_state_type == VehicleStateType.SERVICING_POOLING_TRIP ) # count the number of vehicles in each vehicle state veh_state_counts = Counter( v.vehicle_state.vehicle_state_type.name for v in sim_state.get_vehicles() ) stats_row = { "time_step": time_step, "sim_time": sim_time.as_iso_time(), } # get average SOC of vehicles if len(sim_state.vehicles) > 0: stats_row["avg_soc_percent"] = 100 * np.mean( [ env.mechatronics[v.mechatronics_id].fuel_source_soc(v) for v in sim_state.get_vehicles() ] ) else: stats_row["avg_soc_percent"] = None # get the total vkt if ReportType.VEHICLE_MOVE_EVENT in reports_by_type.keys(): stats_row["vkt"] = sum( [ float(me.report["distance_km"]) for me in reports_by_type[ReportType.VEHICLE_MOVE_EVENT] ] ) else: stats_row["vkt"] = 0 # add assigned request count stats_row["assigned_requests"] = assigned_requests_count # get number of active requests in this time step (unassigned) stats_row["active_requests"] = active_requests_count # get number of canceled requests in this time step stats_row["canceled_requests"] = canceled_requests_count # get count of requests currently being serviced by a vehicle pooling_request_count = sum( [len(v.vehicle_state.boarded_requests) for v in veh_pooling] # type: ignore ) stats_row["servicing_requests"] = ( veh_state_counts[VehicleStateType.SERVICING_TRIP.name] + pooling_request_count ) # add the number of vehicles in each vehicle state stats_row["vehicles"] = len(sim_state.vehicles) for state in self.vehicle_state_names: stats_row[f"vehicles_{state.lower()}"] = veh_state_counts[state] available_driver_counts = Counter( v.driver_state.available for v in sim_state.get_vehicles() ) stats_row["drivers_available"] = available_driver_counts[True] stats_row["drivers_unavailable"] = available_driver_counts[False] # count number of chargers in use by type if ReportType.VEHICLE_CHARGE_EVENT in reports_by_type.keys(): charger_counts = Counter( r.report["charger_id"] for r in reports_by_type[ReportType.VEHICLE_CHARGE_EVENT] ) for charger in env.chargers.keys(): stats_row[f"charger_{charger.lower()}"] = charger_counts[charger] else: for charger in env.chargers.keys(): stats_row[f"charger_{charger.lower()}"] = 0 # append the statistics row to the data list self.data.append(stats_row) if self.log_fleet_time_step_stats: for fleet_id in self.fleets_data.keys(): def _get_veh_filter_func( membership_id: MembershipId, ) -> Callable[[Vehicle], bool]: if membership_id == "none": return lambda v: not any(set(env.fleet_ids) & set(v.membership.memberships)) else: return lambda v: fleet_id in v.membership.memberships def _get_report_filter_func( membership_id: MembershipId, ) -> Callable[[Report], bool]: if membership_id == "none": return lambda r: not any( set(env.fleet_ids) & set(r.report["vehicle_memberships"]) ) else: return lambda r: fleet_id in r.report["vehicle_memberships"] # get vehicles in this fleet veh_in_fleet = sim_state.get_vehicles( filter_function=_get_veh_filter_func(fleet_id) ) # get vehicle reports in this fleet requests_in_fleet = {} for report_type in reports_by_type: if report_type in ( ReportType.VEHICLE_MOVE_EVENT, ReportType.VEHICLE_CHARGE_EVENT, ): requests_in_fleet[report_type] = list( filter( _get_report_filter_func(fleet_id), reports_by_type[report_type], ) ) # get vehicles pooling in this fleet veh_pooling_in_fleet = list( filter( lambda v: v.vehicle_state.vehicle_state_type == VehicleStateType.SERVICING_POOLING_TRIP, veh_in_fleet, ) ) # get vehicles dispatched to service pooling trips in this fleet veh_dispatch_pooling_in_fleet = list( filter( lambda v: v.vehicle_state.vehicle_state_type == VehicleStateType.DISPATCH_POOLING_TRIP, veh_in_fleet, ) ) # count the number of vehicles in each vehicle state in this fleet veh_state_counts_in_fleet = Counter( v.vehicle_state.vehicle_state_type.name for v in veh_in_fleet ) # create stats row with the time step fleet_stats_row = { "time_step": time_step, "sim_time": sim_time.as_iso_time(), } # get average SOC of vehicles in this fleet if len(veh_in_fleet) > 0: fleet_stats_row["avg_soc_percent"] = 100 * np.mean( [ env.mechatronics[v.mechatronics_id].fuel_source_soc(v) for v in veh_in_fleet ] ) else: fleet_stats_row["avg_soc_percent"] = None # get the total vkt of vehicles in this fleet if ReportType.VEHICLE_MOVE_EVENT in requests_in_fleet.keys(): fleet_stats_row["vkt"] = sum( [ float(me.report["distance_km"]) for me in requests_in_fleet[ReportType.VEHICLE_MOVE_EVENT] ] ) else: fleet_stats_row["vkt"] = 0 # get number of assigned requests in this fleet assigned_requests = veh_state_counts_in_fleet[VehicleStateType.DISPATCH_TRIP.name] assigned_pooling_requests = sum( [len(v.vehicle_state.trip_plan) for v in veh_dispatch_pooling_in_fleet] # type: ignore ) fleet_stats_row["assigned_requests"] = assigned_requests + assigned_pooling_requests # add number of active requests in this time step (unassigned) fleet_stats_row["active_requests"] = active_requests_count # add number of canceled requests in this time step fleet_stats_row["canceled_requests"] = canceled_requests_count # get count of requests currently being serviced by a vehicle servicing_requests = veh_state_counts_in_fleet[VehicleStateType.SERVICING_TRIP.name] servicing_pooling_requests = sum( [len(v.vehicle_state.boarded_requests) for v in veh_pooling_in_fleet] # type: ignore ) fleet_stats_row["servicing_requests"] = ( servicing_requests + servicing_pooling_requests ) # add the number of vehicles in each vehicle state in this fleet fleet_stats_row["vehicles"] = len(veh_in_fleet) for state in self.vehicle_state_names: fleet_stats_row[f"vehicles_{state.lower()}"] = veh_state_counts_in_fleet[state] available_driver_counts_in_fleet = Counter( v.driver_state.available for v in veh_in_fleet ) fleet_stats_row["drivers_available"] = available_driver_counts_in_fleet[True] fleet_stats_row["drivers_unavailable"] = available_driver_counts_in_fleet[False] # count number of chargers in use by type if ReportType.VEHICLE_CHARGE_EVENT in requests_in_fleet.keys(): charger_counts_in_fleet = Counter( r.report["charger_id"] for r in requests_in_fleet[ReportType.VEHICLE_CHARGE_EVENT] ) for charger in env.chargers.keys(): fleet_stats_row[f"charger_{charger.lower()}"] = charger_counts_in_fleet[ charger ] else: for charger in env.chargers.keys(): fleet_stats_row[f"charger_{charger.lower()}"] = 0 # append the statistics row to the fleet's data list self.fleets_data[fleet_id].append(fleet_stats_row)
[docs] def close(self, runner_payload: RunnerPayload): """ saves all time step stat data as csv files to the scenario output directory. :return: """ if self.log_time_step_stats: to_csv_dicts( self.get_time_step_stats(), self.time_step_stats_outpath, ) log.info(f"time step stats written to {self.time_step_stats_outpath}") if self.log_fleet_time_step_stats: os.mkdir(self.fleets_timestep_stats_outpath) for fleet_id, fleet_data in self.get_fleet_time_step_stats().items(): if fleet_data is not None: outpath = self.fleets_timestep_stats_outpath.joinpath( f"{self.file_name}_{fleet_id}.csv" ) to_csv(fleet_data, outpath) log.info(f"fleet id: {fleet_id} time step stats written to {outpath}")