Source code for batsim_py.monitors


from abc import ABC, abstractmethod
from enum import Enum
from itertools import groupby
import time as tm

import pandas as pd
from procset import ProcSet

from .events import SimulatorEvent
from .events import JobEvent
from .events import HostEvent
from .jobs import JobState
from .jobs import Job
from .resources import HostState
from .resources import Host
from .resources import PowerStateType
from .simulator import SimulatorHandler


[docs]class Monitor(ABC): """ Simulation Monitor base class. Args: simulator: The simulator handler. Raises: RuntimeError: In case of simulation is already running. """ def __init__(self, simulator: SimulatorHandler): if simulator.is_running: raise RuntimeError('Monitors cannot be created when the simulation ' 'is already running.') self.simulator = simulator @property @abstractmethod def info(self) -> dict: """ Get all the collected statistics. Returns: A dict containing statistics. """ raise NotImplementedError
[docs] @abstractmethod def to_csv(self, fn: str) -> None: """ Dump info to a CSV file. """ raise NotImplementedError
[docs] @abstractmethod def to_dataframe(self) -> pd.DataFrame: """ Convert info into a DataFrame. """ raise NotImplementedError
[docs]class JobMonitor(Monitor): """ Simulator Job Monitor class. This monitor collects statistics about each submitted job. Args: simulator: The simulator handler. Raises: RuntimeError: In case of simulation is already running. """ def __init__(self, simulator: SimulatorHandler) -> None: super().__init__(simulator) self.__info: dict = { 'job_id': [], 'workload_name': [], 'profile': [], 'submission_time': [], 'requested_number_of_resources': [], 'requested_time': [], 'success': [], 'final_state': [], 'starting_time': [], 'execution_time': [], 'finish_time': [], 'waiting_time': [], 'turnaround_time': [], 'stretch': [], 'allocated_resources': [], 'consumed_energy': [], } simulator.subscribe(SimulatorEvent.SIMULATION_BEGINS, self.on_simulation_begins) simulator.subscribe(JobEvent.COMPLETED, self.update_info) simulator.subscribe(JobEvent.REJECTED, self.update_info) @property def info(self) -> dict: """ Get all the collected statistics. Returns: A dict containing statistics. """ return self.__info
[docs] def to_csv(self, fn: str) -> None: """ Dump info to a CSV file. """ self.to_dataframe().to_csv(fn, index=False)
[docs] def to_dataframe(self) -> pd.DataFrame: """ Convert info into a DataFrame. """ return pd.DataFrame.from_dict(self.__info)
def on_simulation_begins(self, sender) -> None: self.__info = {k: [] for k in self.__info.keys()} def update_info(self, sender: Job) -> None: alloc = ProcSet(*sender.allocation) if sender.allocation else None success = int(sender.state == JobState.COMPLETED_SUCCESSFULLY) self.__info['job_id'].append(sender.name) self.__info['workload_name'].append(sender.workload) self.__info['profile'].append(sender.profile.name) self.__info['submission_time'].append(sender.subtime) self.__info['requested_number_of_resources'].append(sender.res) self.__info['requested_time'].append(sender.walltime) self.__info['success'].append(success) self.__info['final_state'].append(str(sender.state)) self.__info['starting_time'].append(sender.start_time) self.__info['execution_time'].append(sender.runtime) self.__info['finish_time'].append(sender.stop_time) self.__info['waiting_time'].append(sender.waiting_time) self.__info['turnaround_time'].append(sender.turnaround_time) self.__info['stretch'].append(sender.stretch) self.__info['allocated_resources'].append(alloc) self.__info['consumed_energy'].append(-1)
[docs]class SchedulerMonitor(Monitor): """ Simulator Scheduler Monitor class. This monitor collects statistics about scheduler performance. Args: simulator: The simulator handler. Raises: RuntimeError: In case of simulation is already running. """ def __init__(self, simulator: SimulatorHandler) -> None: super().__init__(simulator) self.__info: dict = { 'makespan': 0, 'max_slowdown': 0, 'max_stretch': 0, 'max_waiting_time': 0, 'max_turnaround_time': 0, 'mean_slowdown': 0, 'mean_pp_slowdown': 0, 'mean_stretch': 0, 'mean_waiting_time': 0, 'mean_turnaround_time': 0, 'nb_jobs': 0, 'nb_jobs_finished': 0, 'nb_jobs_killed': 0, 'nb_jobs_rejected': 0, 'nb_jobs_success': 0, } self.simulator.subscribe( SimulatorEvent.SIMULATION_BEGINS, self.on_simulation_begins) self.simulator.subscribe( SimulatorEvent.SIMULATION_ENDS, self.on_simulation_ends) self.simulator.subscribe(JobEvent.COMPLETED, self.on_job_completed) self.simulator.subscribe(JobEvent.SUBMITTED, self.on_job_submitted) self.simulator.subscribe(JobEvent.REJECTED, self.on_job_rejected,) @property def info(self) -> dict: """ Get all the collected statistics. Returns: A dict containing statistics. """ return self.__info
[docs] def to_csv(self, fn: str) -> None: """ Dump info to a CSV file. """ self.to_dataframe().to_csv(fn, index=False)
[docs] def to_dataframe(self) -> pd.DataFrame: """ Convert info into a DataFrame. """ return pd.DataFrame.from_dict(self.__info, orient='index').T
def on_simulation_begins(self, sender: SimulatorHandler) -> None: self.__info = {k: 0 for k in self.__info.keys()} self.simulator = sender def on_simulation_ends(self, sender: SimulatorHandler) -> None: self.__info['makespan'] = sender.current_time nb_finished = max(1, self.__info['nb_jobs_finished']) self.__info['mean_waiting_time'] /= nb_finished self.__info['mean_slowdown'] /= nb_finished self.__info['mean_pp_slowdown'] /= nb_finished self.__info['mean_stretch'] /= nb_finished self.__info['mean_turnaround_time'] /= nb_finished def on_job_submitted(self, sender: Job) -> None: self.__info['nb_jobs'] += 1 def on_job_rejected(self, sender: Job) -> None: self.__info['nb_jobs_rejected'] += 1 def on_job_completed(self, sender: Job) -> None: assert self.simulator self.__info['makespan'] = self.simulator.current_time self.__info['mean_slowdown'] += sender.slowdown self.__info['mean_pp_slowdown'] += sender.per_processor_slowdown self.__info['mean_stretch'] += sender.stretch self.__info['mean_waiting_time'] += sender.waiting_time self.__info['mean_turnaround_time'] += sender.turnaround_time self.__info['max_waiting_time'] = max( sender.waiting_time, self.__info['max_waiting_time']) self.__info['max_stretch'] = max( sender.stretch, self.__info['max_stretch']) self.__info['max_slowdown'] = max( sender.slowdown, self.__info['max_slowdown']) self.__info['max_turnaround_time'] = max( sender.turnaround_time, self.__info['max_turnaround_time']) self.__info['nb_jobs_finished'] += 1 if sender.state == JobState.COMPLETED_SUCCESSFULLY: self.__info['nb_jobs_success'] += 1 elif sender.state == JobState.COMPLETED_KILLED or sender.state == JobState.COMPLETED_WALLTIME_REACHED or sender.state == JobState.COMPLETED_FAILED: self.__info['nb_jobs_killed'] += 1
[docs]class HostMonitor(Monitor): """ Simulator Host Monitor class. This monitor collects statistics about the time each host spent on each of its possible states. Moreover, it computes the total energy consumed and the total number of host state switches. Args: simulator: The simulator handler. Raises: RuntimeError: In case of simulation is already running. """ def __init__(self, simulator: SimulatorHandler) -> None: super().__init__(simulator) self.__last_state: dict = {} self.__info: dict = { 'time_idle': 0, 'time_computing': 0, 'time_switching_off': 0, 'time_switching_on': 0, 'time_sleeping': 0, 'consumed_joules': 0, 'energy_waste': 0, 'nb_switches': 0, 'nb_computing_machines': 0, } self.simulator.subscribe( SimulatorEvent.SIMULATION_BEGINS, self.on_simulation_begins) self.simulator.subscribe( SimulatorEvent.SIMULATION_ENDS, self.on_simulation_ends) self.simulator.subscribe( HostEvent.STATE_CHANGED, self.on_host_state_changed) self.simulator.subscribe( HostEvent.COMPUTATION_POWER_STATE_CHANGED, self.on_host_state_changed) @property def info(self) -> dict: """ Get all the collected statistics. Returns: A dict containing statistics. """ return self.__info
[docs] def to_csv(self, fn: str) -> None: """ Dump info to a CSV file. """ self.to_dataframe().to_csv(fn, index=False)
[docs] def to_dataframe(self) -> pd.DataFrame: """ Convert info into a DataFrame. """ return pd.DataFrame.from_dict(self.__info, orient='index').T
def on_simulation_begins(self, sender: SimulatorHandler) -> None: self.simulator = sender assert self.simulator.platform t_now = self.simulator.current_time self.__last_state = {} for h in self.simulator.platform.hosts: self.__last_state[h.id] = (t_now, h.power, h.state, h.pstate) self.__info = {k: 0 for k in self.__info.keys()} self.__info['nb_computing_machines'] = self.simulator.platform.size def on_simulation_ends(self, sender: SimulatorHandler) -> None: assert self.simulator and self.simulator.platform for h in self.simulator.platform.hosts: self.update_info(h) def on_host_state_changed(self, sender: Host) -> None: self.update_info(sender) def update_info(self, h: Host) -> None: assert self.simulator t_start, power, state, pstate = self.__last_state[h.id] # Update Info time_spent = self.simulator.current_time - t_start energy_consumption = time_spent * (power or 0) energy_wasted = 0 if state == HostState.IDLE: self.__info['time_idle'] += time_spent energy_wasted = energy_consumption elif state == HostState.COMPUTING: self.__info['time_computing'] += time_spent elif state == HostState.SWITCHING_OFF: self.__info['time_switching_off'] += time_spent energy_wasted = energy_consumption elif state == HostState.SWITCHING_ON: self.__info['time_switching_on'] += time_spent energy_wasted = energy_consumption elif state == HostState.SLEEPING: self.__info['time_sleeping'] += time_spent self.__info['consumed_joules'] += energy_consumption self.__info['energy_waste'] += energy_wasted if h.pstate and pstate.id != h.pstate.id: self.__info['nb_switches'] += 1 # Update Last State t_now = self.simulator.current_time self.__last_state[h.id] = (t_now, h.power, h.state, h.pstate)
[docs]class SimulationMonitor(Monitor): """ Simulator Monitor class. This monitor collects statistics about the simulation, which includes statistics about the scheduler performance and about each job. Args: simulator: The simulator handler. Raises: RuntimeError: In case of simulation is already running. """ def __init__(self, simulator: SimulatorHandler) -> None: super().__init__(simulator) self.__sched_monitor = SchedulerMonitor(simulator) self.__hosts_monitor = HostMonitor(simulator) self.__sim_start_time = self.__simulation_time = -1. self.simulator.subscribe( SimulatorEvent.SIMULATION_BEGINS, self.on_simulation_begins) self.simulator.subscribe( SimulatorEvent.SIMULATION_ENDS, self.on_simulation_ends) @property def info(self) -> dict: """ Get all the collected statistics. Returns: A dict containing statistics. """ info = dict(self.__sched_monitor.info, **self.__hosts_monitor.info) info['simulation_time'] = self.__simulation_time return info
[docs] def to_csv(self, fn: str) -> None: """ Dump info to a CSV file. """ self.to_dataframe().to_csv(fn, index=False)
[docs] def to_dataframe(self) -> pd.DataFrame: """ Convert info into a DataFrame. """ return pd.DataFrame.from_dict(self.info, orient='index').T
def on_simulation_begins(self, sender: SimulatorHandler) -> None: self.__sim_start_time = tm.time() def on_simulation_ends(self, sender: SimulatorHandler) -> None: self.__simulation_time = tm.time() - self.__sim_start_time
[docs]class HostStateSwitchMonitor(Monitor): """ Simulator Host State Monitor class. This monitor collects statistics about host state switches. Args: simulator: The simulator handler. Raises: RuntimeError: In case of simulation is already running. """ def __init__(self, simulator: SimulatorHandler) -> None: super().__init__(simulator) self.__last_host_state: dict = {} self.__info: dict = { 'time': [], 'nb_sleeping': [], 'nb_switching_on': [], 'nb_switching_off': [], 'nb_idle': [], 'nb_computing': [], 'nb_unavailable': [] } self.simulator.subscribe( SimulatorEvent.SIMULATION_BEGINS, self.on_simulation_begins) self.simulator.subscribe( HostEvent.STATE_CHANGED, self.on_host_state_changed) @property def info(self) -> dict: """ Get all the collected statistics. Returns: A dict containing statistics. """ return self.__info
[docs] def to_csv(self, fn: str) -> None: """ Dump info to a CSV file. """ self.to_dataframe().to_csv(fn, index=False)
[docs] def to_dataframe(self) -> pd.DataFrame: """ Convert info into a DataFrame. """ return pd.DataFrame.from_dict(self.__info)
def on_simulation_begins(self, sender: SimulatorHandler) -> None: self.simulator = sender assert self.simulator.platform self.__last_host_state = {} self.__info = {k: [0] for k in self.__info.keys()} # Update initial state self.__info['time'][-1] = self.simulator.current_time for h in self.simulator.platform.hosts: key = self.__get_key_from_state(h.state) self.__last_host_state[h.id] = key self.__info[key][-1] += 1 def on_host_state_changed(self, sender: Host) -> None: assert self.simulator if self.__info['time'][-1] != self.simulator.current_time: # Update new info from last one for k in self.__info.keys(): self.__info[k].append(self.__info[k][-1]) self.__info['time'][-1] = self.simulator.current_time # Update info old_key = self.__last_host_state[sender.id] new_key = self.__get_key_from_state(sender.state) self.__info[old_key][-1] -= 1 self.__info[new_key][-1] += 1 self.__last_host_state[sender.id] = new_key def __get_key_from_state(self, state: HostState) -> str: if state == HostState.IDLE: return "nb_idle" elif state == HostState.COMPUTING: return "nb_computing" elif state == HostState.SLEEPING: return "nb_sleeping" elif state == HostState.SWITCHING_OFF: return "nb_switching_off" elif state == HostState.SWITCHING_ON: return "nb_switching_on" elif state == HostState.UNAVAILABLE: return "nb_unavailable" else: raise NotImplementedError
[docs]class HostPowerStateSwitchMonitor(Monitor): """ Simulator Host Power State Monitor class. This monitor collects statistics about host power state switches. Args: simulator: The simulator handler. Raises: RuntimeError: In case of simulation is already running. """ def __init__(self, simulator: SimulatorHandler) -> None: super().__init__(simulator) self.__last_pstate_id: dict = {} self.__info: dict = {'time': [], 'machine_id': [], 'new_pstate': []} self.simulator.subscribe( SimulatorEvent.SIMULATION_BEGINS, self.on_simulation_begins) self.simulator.subscribe( HostEvent.COMPUTATION_POWER_STATE_CHANGED, self.on_host_power_state_changed) self.simulator.subscribe( HostEvent.STATE_CHANGED, self.on_host_power_state_changed) @property def info(self) -> dict: """ Get all the collected statistics. Returns: A dict containing statistics. """ return self.__info
[docs] def to_csv(self, fn: str) -> None: """ Dump info to a CSV file. """ self.to_dataframe().to_csv(fn, index=False)
[docs] def to_dataframe(self) -> pd.DataFrame: """ Convert info into a DataFrame. """ return pd.DataFrame.from_dict(self.__info)
def on_simulation_begins(self, sender: SimulatorHandler) -> None: self.simulator = sender assert self.simulator.platform self.__info = {k: [] for k in self.__info.keys()} hosts = [h for h in self.simulator.platform.hosts if h.pstate] self.__last_pstate_id = { h.id: h.pstate.id for h in hosts} # type:ignore # Record initial state for p, hs in groupby(hosts, key=lambda h: h.pstate.id): # type:ignore self.__info['time'].append(self.simulator.current_time) procset = ProcSet(*[h.id for h in hs]) self.__info['machine_id'].append(str(procset)) self.__info['new_pstate'].append(p) def on_host_power_state_changed(self, sender: Host) -> None: if sender.id in self.__last_pstate_id and sender.pstate and self.__last_pstate_id[sender.id] != sender.pstate.id: assert sender.pstate assert self.simulator self.__last_pstate_id[sender.id] = sender.pstate.id if sender.pstate.type == PowerStateType.SWITCHING_OFF: new_pstate_id = -2 elif sender.pstate.type == PowerStateType.SWITCHING_ON: new_pstate_id = -1 else: new_pstate_id = sender.pstate.id if self.__info['time'][-1] == self.simulator.current_time and self.__info['new_pstate'][-1] == new_pstate_id: # Update last record procset = ProcSet.from_str(self.__info['machine_id'][-1]) procset.update(ProcSet(sender.id)) self.__info['machine_id'][-1] = str(procset) else: # Record new state self.__info['time'].append(self.simulator.current_time) self.__info['machine_id'].append(str(sender.id)) self.__info['new_pstate'].append(new_pstate_id)
[docs]class EnergyEventType(Enum): """ Energy event types used in the ConsumedEnergyMonitor. """ POWER_STATE_TYPE = "p" JOB_STARTED_TYPE = "s" JOB_COMPLETED_TYPE = "e"
[docs]class ConsumedEnergyMonitor(Monitor): """ Simulator Energy Monitor class. This monitor collects energy statistics. Args: simulator: The simulator handler. Raises: RuntimeError: In case of simulation is already running. """ def __init__(self, simulator: SimulatorHandler) -> None: super().__init__(simulator) self.__last_state: dict = {} self.__info: dict = { 'time': [], 'energy': [], 'event_type': [], 'wattmin': [], 'epower': [] } self.simulator.subscribe( SimulatorEvent.SIMULATION_BEGINS, self.on_simulation_begins) self.simulator.subscribe( JobEvent.STARTED, self.on_job_started) self.simulator.subscribe( JobEvent.COMPLETED, self.on_job_completed) self.simulator.subscribe(HostEvent.STATE_CHANGED, self.on_host_state_changed) self.simulator.subscribe( HostEvent.COMPUTATION_POWER_STATE_CHANGED, self.on_host_state_changed) @property def info(self) -> dict: """ Get all the collected statistics. Returns: A dict containing statistics. """ return self.__info
[docs] def to_csv(self, fn: str) -> None: """ Dump info to a CSV file. """ self.to_dataframe().to_csv(fn, index=False)
[docs] def to_dataframe(self) -> pd.DataFrame: """ Convert info into a DataFrame. """ return pd.DataFrame.from_dict(self.__info)
def on_simulation_begins(self, sender: SimulatorHandler) -> None: self.simulator = sender assert self.simulator.platform t_now = self.simulator.current_time self.__info = {k: [] for k in self.__info.keys()} self.__last_state = { h.id: (t_now, h.power, h.pstate) for h in self.simulator.platform.hosts } def on_job_started(self, sender: Job) -> None: self.update_info(event_type=EnergyEventType.JOB_STARTED_TYPE) def on_job_completed(self, sender: Job) -> None: self.update_info(event_type=EnergyEventType.JOB_COMPLETED_TYPE) def on_host_state_changed(self, sender: Host) -> None: self.update_info(event_type=EnergyEventType.POWER_STATE_TYPE) def update_info(self, event_type: EnergyEventType) -> None: assert self.simulator and self.simulator.platform consumed_energy = wattmin = epower = 0. for h in self.simulator.platform.hosts: t_start, power, pstate = self.__last_state[h.id] t_now = self.simulator.current_time wattage = power or 0 epower += wattage consumed_energy += wattage * (t_now - t_start) wattmin += pstate.watt_full if pstate else 0 # Update Last State self.__last_state[h.id] = (t_now, h.power, h.pstate) consumed_energy += self.__info['energy'][-1] if self.__info['energy'] else 0 self.__info["time"].append(self.simulator.current_time) self.__info["energy"].append(consumed_energy) self.__info["event_type"].append(event_type.value) self.__info["wattmin"].append(wattmin) self.__info["epower"].append(epower)