Source code for nrel.hive.runner.local_simulation_runner

from __future__ import annotations

import functools as ft
import logging
from typing import NamedTuple, TYPE_CHECKING, Callable, Optional

from tqdm import tqdm

from nrel.hive.runner.runner_payload import RunnerPayload

log = logging.getLogger(__name__)

if TYPE_CHECKING:
    from nrel.hive.runner.environment import Environment


[docs]class LocalSimulationRunner(NamedTuple): """ The local simulation runner. """
[docs] @classmethod def run( cls, runner_payload: RunnerPayload, ) -> RunnerPayload: """ steps through time, running a simulation, and producing a simulation result :param runner_payload: the initial state of the simulation :return: the final simulation state and dispatcher state """ time_steps = tqdm( range( int(runner_payload.e.config.sim.start_time), int(runner_payload.e.config.sim.end_time), runner_payload.e.config.sim.timestep_duration_seconds, ) ) final_payload = ft.reduce( _run_step_in_context(runner_payload.e), time_steps, runner_payload ) return final_payload
[docs] @classmethod def step(cls, runner_payload: RunnerPayload) -> Optional[RunnerPayload]: """ takes exactly one step forward, or, if the simulation has reached the end time, does nothing :param runner_payload: the current state of the simulation :return: the next simulation state after one simtime step, or, None if we have reached the end_time """ if runner_payload.s.sim_time >= runner_payload.e.config.sim.end_time: return None else: return _run_step_in_context(runner_payload.e)(runner_payload)
[docs]def _run_step_in_context(env: Environment) -> Callable: def _run_step(payload: RunnerPayload, t: int = -1) -> RunnerPayload: # applies the most recent version of each update function updated_payload = payload.u.apply_update(payload) env.reporter.flush(updated_payload) return updated_payload return _run_step