import atexit
from collections import defaultdict
import math
from shutil import which
import signal
import subprocess
import sys
import tempfile
from typing import Dict
from typing import Callable, Union
from typing import Union
from typing import Sequence
from typing import List
from typing import DefaultDict
from typing import Optional
from typing import Iterator
from typing import overload
from typing import Literal
import numpy as np
from .events import SimulatorEvent
from .events import JobEvent
from .events import HostEvent
from .jobs import Job
from .protocol import NotifyBatsimEvent
from .protocol import BatsimNotifyType
from .protocol import SimulationBeginsBatsimEvent
from .protocol import NetworkHandler
from .protocol import BatsimMessage
from .protocol import ResourcePowerStateChangedBatsimEvent
from .protocol import BatsimEventType
from .protocol import JobCompletedBatsimEvent
from .protocol import JobSubmittedBatsimEvent
from .protocol import BatsimRequest
from .protocol import CallMeLaterBatsimRequest
from .protocol import KillJobBatsimRequest
from .protocol import ExecuteJobBatsimRequest
from .protocol import RejectJobBatsimRequest
from .protocol import SetResourceStateBatsimRequest
from .resources import Platform
from .resources import Host
from .utils import get_free_tcp_address
# Type alias
EventSenders = Union[Host, Job, 'SimulatorHandler']
JobListener = Callable[[Job], None]
HostListener = Callable[[Host], None]
SimulatorListener = Callable[['SimulatorHandler'], None]
Listener = Union[JobListener, HostListener, SimulatorListener]
Event = Union[JobEvent, HostEvent, SimulatorEvent]
Listeners = DefaultDict[Event, List[Listener]]
Callback = Callable[[float], None]
Callbacks = DefaultDict[float, List[Callback]]
BatsimVerbosity = Literal["quiet", "network-only", "information", "debug"]
[docs]class Reservation:
""" Describes the reservation of a host.
Args:
host: The host.
release_time: The remaining time to the host be released.
Attributes:
host: The host.
release_time: The remaining time to the host be released.
"""
def __init__(self, host: Host, release_time: float) -> None:
self.host = host
self.release_time = release_time
[docs]class SimulatorHandler:
""" Simulator handler class.
This class will handle the Batsim simulation process.
Args:
tcp_address: An address string consisting of three parts as follows:
protocol://interface:port. Defaults to None. If no address
is provided, a random one will be used.
Raises:
ImportError: In case of Batsim cannot be found.
Examples:
>>> handler = SimulatorHandler("tcp://localhost:28000")
"""
def __init__(self, tcp_address: Optional[str] = None) -> None:
if which('batsim') is None:
raise ImportError('(HINT: you need to install Batsim. '
'Check the setup instructions here: '
'https://batsim.readthedocs.io/en/latest/.). '
'Run "batsim --version" to make sure it is working.')
self.__network = NetworkHandler(tcp_address or get_free_tcp_address())
self.__current_time: float = 0.
self.__simulator: Optional[subprocess.Popen] = None
self.__simulation_time: Optional[float] = None
self.__platform: Platform = None # type: ignore
self.__no_more_jobs_to_submit = False
self.__no_more_external_event_to_occur = False
self.__batsim_requests: List[BatsimRequest] = []
self.__jobs: List[Job] = []
self.__callbacks: Callbacks = defaultdict(list)
self.__subscriptions: Listeners = defaultdict(list)
# Batsim events handlers
self.__batsim_event_handlers: dict = {
BatsimEventType.SIMULATION_ENDS: self.__on_batsim_simulation_ends,
BatsimEventType.SIMULATION_BEGINS: self.__on_batsim_simulation_begins,
BatsimEventType.JOB_COMPLETED: self.__on_batsim_job_completed,
BatsimEventType.JOB_SUBMITTED: self.__on_batsim_job_submitted,
BatsimEventType.RESOURCE_STATE_CHANGED: self.__on_batsim_host_state_changed,
BatsimEventType.REQUESTED_CALL: self.__on_batsim_requested_call,
BatsimEventType.NOTIFY: self.__on_batsim_notify
}
atexit.register(self.__close_simulator)
signal.signal(signal.SIGTERM, self.__on_sigterm)
@property
def address(self) -> str:
""" The address string. """
return self.__network.address
@property
def jobs(self) -> Sequence[Job]:
""" A sequence with all the jobs in the system.
This does not include jobs that already finished.
"""
return list(self.__jobs)
@property
def queue(self) -> Sequence[Job]:
""" A sequence of all jobs in the queue. """
return [j for j in self.__jobs if j.is_submitted]
@property
def agenda(self) -> Iterator[Reservation]:
""" Host reservations. """
if self.__platform:
for host in self.__platform.hosts:
release_t = 0.
for job_id in host.jobs:
job = next(j for j in self.__jobs if j.id == job_id)
if job.walltime:
runtime = 0.
if job.is_running:
assert job.start_time is not None
runtime = self.current_time - job.start_time
job_release_t = job.walltime - runtime
else:
job_release_t = np.inf
release_t = max(release_t, job_release_t)
yield Reservation(host, release_t)
@property
def platform(self) -> Platform:
""" The simulation platform."""
return self.__platform
@property
def is_running(self) -> bool:
""" Whether the simulation is running or not."""
return self.__simulator is not None
@property
def current_time(self) -> int:
""" The current simulation time (seconds). """
return math.floor(self.__current_time)
@property
def is_submitter_finished(self) -> bool:
""" Whether there are still some jobs to be submitted or not. """
return self.__no_more_jobs_to_submit
[docs] def start(self,
platform: str,
workload: str,
verbosity: BatsimVerbosity = "quiet",
simulation_time: Optional[float] = None,
allow_compute_sharing=False,
allow_storage_sharing=True,
external_events: Optional[str] = None) -> None:
""" Start the simulation process.
Args:
platform: The XML file defining the platform. It must follow the
format expected by Batsim and SimGrid. Check their documentation
to know how to define a platform
workload: A JSON file defining the jobs and their profiles.
The simulation process will only submit the jobs that are
defined in this JSON. Moreover, Batsim is responsible for
the submission process.
verbosity: The Batsim verbosity level. Defaults to "quiet".
simulation_time: The maximum simulation time. Defaults to None.
If this argument is set, the simulation will stop only when this
time is reached, no matter if there are jobs to be submitted or
running. Otherwise, the simulation will only stop when all jobs
in the workload were completed or rejected.
allow_compute_sharing: Whether a host can be used by multiple jobs
or not. Defaults to False.
allow_storage_sharing: Whether a storage can be used by multiple jobs
or not. Defaults to True.
external_events: The file containing external events to simulate.
Check the Batsim documentation to know what kind of external
events are supported.
Raises:
RuntimeError: In case of simulation already running.
ValueError: In case of `simulation_time` is less than or equals to
zero or the verbosity is invalid.
Examples:
>>> handler = SimulatorHandler("tcp://localhost:28000")
>>> handler.start("platform.xml", "workload.json", "information", 1440)
"""
assert workload and platform
if self.is_running:
raise RuntimeError("The simulation is already running.")
if verbosity not in ("quiet", "network-only", "information", "debug"):
raise ValueError('This `verbosity` argument value is not accepted '
f'by Batsim, got {verbosity}')
if simulation_time is not None and simulation_time <= 0:
raise ValueError('Expected `simulation_time` to be greater '
f'than zero, got {simulation_time}.')
self.__jobs = []
self.__current_time = 0.
self.__simulation_time = simulation_time
self.__no_more_jobs_to_submit = False
# There isn't an option to avoid exporting batsim results
tmp_dir = tempfile.gettempdir() + "/batsim"
cmd = (
'batsim -E --forward-profiles-on-submission '
'--disable-schedule-tracing --disable-machine-state-tracing '
f'-s {self.__network.address} -p {platform} -w {workload} '
f'-v {verbosity} -e {tmp_dir}'
)
if allow_compute_sharing:
cmd += ' --enable-compute-sharing'
if not allow_storage_sharing:
cmd += ' --disable-storage-sharing'
if external_events:
self.__no_more_external_event_to_occur = False
cmd += f' --events {external_events}'
else:
self.__no_more_external_event_to_occur = True
self.__simulator = subprocess.Popen(
cmd.split(), stdout=subprocess.PIPE)
self.__network.bind()
self.__handle_batsim_events()
if self.__simulation_time:
self.__set_batsim_call_me_later(self.__simulation_time)
self.__dispatch_event(SimulatorEvent.SIMULATION_BEGINS, self)
[docs] def close(self) -> None:
""" Close the simulation process. """
if self.is_running:
self.__close_simulator()
self.__network.close()
self.__simulation_time = None
self.__batsim_requests.clear()
self.__callbacks.clear()
self.__dispatch_event(SimulatorEvent.SIMULATION_ENDS, self)
[docs] def proceed_time(self, time: int = 0) -> None:
""" Proceed the simulation process to the next event or time.
Args:
time: The time to proceed (min is 1 sec). Defaults to 0. It's
possible to proceed directly to the next event or to a specific
time. If the time is unset (equals 0), the simulation will proceed
to the next event. Otherwise, if a time 't' is provided, the
simulation will proceed directly to it and only events that occur
before 't' will be dispatched.
Raises:
ValueError: In case of invalid arguments value.
RuntimeError: In case of the simulation is not running or
a deadlock happened. The latter case occurs only when there are
no more events to happen. Consequently, the simulation does not
know what to do and a deadlock error is raised by Batsim.
"""
def unflag(_):
# this a internal function to be called by the callback procedure.
self.__wait_callback = False
if not self.is_running:
raise RuntimeError("The simulation is not running.")
if not time:
# Go to the next event.
self.__wait_callback = False
elif not self.__simulation_time and self.is_submitter_finished and not self.__jobs and self.__no_more_external_event_to_occur:
# There are no more actions to do. Go to the next event.
self.__wait_callback = False
else:
# Setup a call me later request.
self.__wait_callback = True
self.set_callback(time + self.current_time, unflag)
self.__goto_next_batsim_event()
while self.is_running and self.__wait_callback:
self.__goto_next_batsim_event()
@overload
def subscribe(self, event: JobEvent, call: JobListener) -> None: ...
@overload
def subscribe(self, event: HostEvent, call: HostListener) -> None: ...
@overload
def subscribe(self, event: SimulatorEvent,
call: SimulatorListener) -> None: ...
[docs] def subscribe(self, event: Event, call: Listener) -> None:
""" Subscribe to an event.
Args:
event: The event to subscribe.
call: The function to be called when the event is dispatched. It
must accept the event sender as an argument.
Raises:
RuntimeError: In case of simulation is not running.
"""
assert callable(call)
self.__subscriptions[event].append(call)
[docs] def set_callback(self, at: int, call: Callable[[float], None]) -> None:
""" Setup a callback.
The simulation will call the function at the defined time.
Args:
at: The time which the function must be called (min is 1 sec).
call: A function that receives the current simulation time as
an argument.
Raises:
ValueError: In case of invalid arguments value.
RuntimeError: In case of simulation is not running.
"""
assert callable(call)
if not self.is_running:
raise RuntimeError("The simulation is not running.")
if at <= self.current_time:
raise ValueError('Expected `at` argument to be a number '
'greater than the current simulation time'
f', got {at}.')
self.__callbacks[at].append(call)
self.__set_batsim_call_me_later(at)
[docs] def allocate(self,
job_id: str,
hosts_id: Sequence[int],
storage_mapping: Dict[str, int] = None) -> None:
""" Allocate resources for a job.
To start computing, a job must allocate some resources first. When these
resources are ready, the simulator will automatically start the job. If
for some reason any allocated resource is not ready (because it’s off or
switching On/Off), the simulator will try to initialize the resource and
will wait until it’s ready to start the job.
Args:
job_id: The job id.
hosts_id: A sequence of host ids to be allocated for the job.
storage_mapping: A mapping of storage names to resource ids. If
the job needs a storage resource, this argument is required.
Otherwise, it can just be ignored.
Raises:
RuntimeError: In case of simulation is not running.
LookupError: In case of job/resource not found or resource type
is invalid.
"""
assert job_id and hosts_id
if not self.is_running:
raise RuntimeError("The simulation is not running.")
assert self.__platform, "For some reason the platform was not loaded"
job = next((j for j in self.__jobs if j.id == job_id), None)
if not job:
raise LookupError("The job {} was not found.".format(job_id))
# Allocate hosts
for h_id in hosts_id:
self.__platform.get_host(h_id)._allocate(job.id)
# Allocate storages
if storage_mapping:
for h_id in set(storage_mapping.values()):
self.__platform.get_storage(h_id)._allocate(job.id)
# Allocate job
job._allocate(hosts_id, storage_mapping)
self.__dispatch_event(JobEvent.ALLOCATED, job)
# Start job
self.__start_runnable_jobs()
[docs] def kill_job(self, job_id: str) -> None:
""" Kill a job that is running.
Args:
job_id: The id of the job to kill.
Raises:
RuntimeError: In case of simulation is not running or the job
cannot be killed.
LookupError: In case of job not found.
"""
if not self.is_running:
raise RuntimeError("The simulation is not running.")
job = next((j for j in self.__jobs if j.id == job_id), None)
if not job:
raise LookupError("The job {} was not found.".format(job_id))
if not job.is_running:
raise RuntimeError(f"Invalid kill. Job {job.id} is not running")
# Sync now with Batsim
request = KillJobBatsimRequest(self.current_time, job.id)
msg = BatsimMessage(self.current_time, [request])
self.__network.send(msg)
self.__handle_batsim_events()
[docs] def reject_job(self, job_id: str) -> None:
""" Rejects a job.
Only jobs in the queue can be rejected. Once a job is rejected, it
cannot be scheduled anymore.
Args:
job_id: The id of the job to reject.
Raises:
RuntimeError: In case of simulation is not running or the job
cannot be rejected.
LookupError: In case of job not found.
"""
if not self.is_running:
raise RuntimeError("The simulation is not running.")
job = next((j for j in self.__jobs if j.id == job_id), None)
if not job:
raise LookupError("The job {} was not found.".format(job_id))
job._reject()
self.__jobs.remove(job)
# Sync Batsim
request = RejectJobBatsimRequest(self.current_time, job_id)
self.__batsim_requests.append(request)
self.__dispatch_event(JobEvent.REJECTED, job)
[docs] def switch_on(self, hosts_id: Sequence[int]) -> None:
""" Switch on hosts.
Args:
hosts_id: A sequence of host ids to be switched on.
Raises:
RuntimeError: In case of the simulation is not running or the host
cannot switch on because it's in an invalid state or the power
state is not defined.
LookupError: In case of host not found.
"""
if not self.is_running:
raise RuntimeError("The simulation is not running.")
assert self.__platform, "For some reason, the platform was not loaded."
for h_id in hosts_id:
host = self.__platform.get_host(h_id)
host._switch_on()
ending_pstate = host.get_default_pstate()
# Sync Batsim
self.__set_batsim_host_pstate(host.id, ending_pstate.id)
self.__dispatch_event(HostEvent.STATE_CHANGED, host)
[docs] def switch_off(self, hosts_id: Sequence[int]) -> None:
""" Switch off hosts.
Args:
hosts_id: A sequence of host ids to be switched off.
Raises:
RuntimeError: In case of simulation is not running or a host cannot
switch off because it's in an invalid state or the power state is
not defined.
LookupError: In case of host not found or the 'off' power state could
not be found.
"""
if not self.is_running:
raise RuntimeError("The simulation is not running.")
assert self.__platform, "For some reason, the platform was not loaded."
for h_id in hosts_id:
host = self.__platform.get_host(h_id)
host._switch_off()
ending_pstate = host.get_sleep_pstate()
# Sync Batsim
self.__set_batsim_host_pstate(host.id, ending_pstate.id)
self.__dispatch_event(HostEvent.STATE_CHANGED, host)
[docs] def switch_power_state(self, host_id: int, pstate_id: int) -> None:
""" Switch the host computation power state.
This is useful if you want to implement a DVFS policy.
Args:
host_id: The host id.
pstate_id: The computation power state id.
Raises:
RuntimeError: In case of the simulation is not running or the
power state were is defined or the host cannot switch.
LookupError: In case of host or power state could not be found.
"""
if not self.is_running:
raise RuntimeError("The simulation is not running.")
assert self.__platform, "For some reason, the platform was not loaded."
host = self.__platform.get_host(host_id)
host._set_computation_pstate(pstate_id)
# Sync Batsim
assert host.pstate
self.__set_batsim_host_pstate(host_id, pstate_id)
self.__dispatch_event(HostEvent.COMPUTATION_POWER_STATE_CHANGED,
host)
def __dispatch_event(self, event: Event, sender: EventSenders) -> None:
""" Dispatch an simulator event """
if isinstance(event, JobEvent):
assert isinstance(sender, Job), "JobEvent sender must be a Job"
elif isinstance(event, HostEvent):
assert isinstance(sender, Host), "HostEvent sender must be a Host"
else:
msg = 'SimulatorEvent sender must be a SimulatorHandler'
assert isinstance(sender, SimulatorHandler), msg
for call in self.__subscriptions[event]:
call(sender) # type: ignore
def __start_runnable_jobs(self) -> None:
""" Start runnable jobs.
This is an internal method used to starts jobs that were allocated.
A job can only starts if the hosts are idle. Thus, this method ensures
that the host can compute the job.
"""
if not self.is_running:
return
assert self.__platform, "For some reason, the platform was not loaded."
runnable_jobs = [j for j in self.__jobs if j.is_runnable]
for job in runnable_jobs:
assert job.allocation, "For some reason, the job was not allocated."
is_ready = True
hosts = [self.__platform.get_host(h) for h in job.allocation]
# Check if all hosts are active and switch on sleeping hosts
for host in hosts:
if not host.is_idle and not host.is_computing:
is_ready = False
if host.is_sleeping:
self.switch_on([host.id])
if is_ready:
job._start(self.current_time)
for host in hosts:
if not host.is_computing:
host._start_computing()
self.__dispatch_event(HostEvent.STATE_CHANGED, host)
self.__dispatch_event(JobEvent.STARTED, job)
# Sync Batsim
request = ExecuteJobBatsimRequest(self.current_time,
job.id,
job.allocation,
job.storage_mapping)
self.__batsim_requests.append(request)
def __goto_next_batsim_event(self) -> None:
""" Go to the next Batsim event. """
self.__send_requests()
self.__handle_batsim_events()
if self.__simulation_time and self.current_time >= self.__simulation_time:
self.close()
else:
self.__start_runnable_jobs()
def __close_simulator(self) -> None:
""" Close the simulator process. """
if self.__simulator:
self.__simulator.terminate()
self.__simulator.communicate()
self.__simulator = None
def __set_batsim_call_me_later(self, at: float) -> None:
""" Setup a call me later request. """
at += 0.09 # Last batsim priority
request = CallMeLaterBatsimRequest(self.current_time, at)
if not any(isinstance(r, CallMeLaterBatsimRequest) and r.at == request.at for r in self.__batsim_requests):
self.__batsim_requests.append(request)
def __set_batsim_host_pstate(self, host_id: int, pstate_id: int) -> None:
""" Set Batsim host power state. """
def get_old_request() -> Optional[SetResourceStateBatsimRequest]:
""" Get the request with the same properties. """
for r in self.__batsim_requests:
if r.timestamp == self.current_time and isinstance(r, SetResourceStateBatsimRequest) and r.state == pstate_id:
return r
return None
# We try to minimize the number of requests.
request = get_old_request()
if request:
request.add_resource(host_id)
else:
request = SetResourceStateBatsimRequest(
self.current_time, [host_id], pstate_id)
self.__batsim_requests.append(request)
def __handle_batsim_events(self) -> None:
""" Handle Batsim events. """
msg = self.__network.recv()
for event in msg.events:
self.__current_time = event.timestamp
if event.type in self.__batsim_event_handlers:
assert isinstance(event.type, BatsimEventType)
self.__batsim_event_handlers[event.type](event)
self.__current_time = msg.now
def __send_requests(self) -> None:
""" Send Batsim requests. """
msg = BatsimMessage(self.current_time, self.__batsim_requests)
self.__network.send(msg)
self.__batsim_requests.clear()
def __on_batsim_simulation_begins(self, event: SimulationBeginsBatsimEvent) -> None:
self.__platform = event.platform
def __on_batsim_simulation_ends(self, _) -> None:
""" Handle batsim simulation ends event. """
if self.__simulator:
ack = BatsimMessage(self.current_time, [])
self.__network.send(ack)
self.__simulator.wait(5)
self.close()
def __on_batsim_host_state_changed(self, event: ResourcePowerStateChangedBatsimEvent) -> None:
""" Handle batsim host state changed event.
When a host is switched on/off, the batsim simulates the transition costs
and tells the scheduler only when the host is sleeping or idle. Thus,
Batsim is the responsible to tell when the host finished its transition.
"""
assert self.__platform, "For some reason, the platform was not loaded."
for h_id in event.resources:
h = self.__platform.get_host(h_id)
assert h.pstate
if h.is_switching_off:
h._set_off()
self.__dispatch_event(HostEvent.STATE_CHANGED, h)
elif h.is_switching_on:
h._set_on()
self.__dispatch_event(HostEvent.STATE_CHANGED, h)
elif (h.is_idle or h.is_computing) and h.pstate.id != event.state:
h._set_computation_pstate(event.state)
self.__dispatch_event(
HostEvent.COMPUTATION_POWER_STATE_CHANGED, h)
assert h.pstate.id == event.state, ('For some reason, the internal '
'platform differs from the '
'Batsim platform, got pstate '
'{} while batsim got pstate {}.'
''.format(h.pstate.id, event.state))
self.__start_runnable_jobs()
def __on_batsim_requested_call(self, _) -> None:
""" Handle batsim answer to call me back request. """
for t in list(self.__callbacks.keys()):
if t <= self.__current_time: # batsim time
for call in self.__callbacks.pop(t):
call(self.current_time) # local time
def __on_batsim_job_submitted(self, event: JobSubmittedBatsimEvent) -> None:
""" Handle batsim job submitted event. """
self.__jobs.append(event.job)
event.job._submit(self.current_time)
self.__dispatch_event(JobEvent.SUBMITTED, event.job)
def __on_batsim_job_completed(self, event: JobCompletedBatsimEvent) -> None:
""" Handle batsim job submitted event. """
job = next((j for j in self.__jobs if j.id == event.job_id), None)
assert job, "The job {} was not found.".format(event.job_id)
assert job.allocation and self.__platform
job._terminate(self.current_time, event.job_state)
for h_id in job.allocation:
host = self.__platform.get_host(h_id)
host._release(job.id)
self.__dispatch_event(HostEvent.STATE_CHANGED, host)
if job.storage_mapping:
for s_id in set(job.storage_mapping.values()):
self.__platform.get_storage(s_id)._release(job.id)
self.__jobs.remove(job)
self.__dispatch_event(JobEvent.COMPLETED, job)
self.__start_runnable_jobs()
def __on_batsim_notify(self, event: NotifyBatsimEvent) -> None:
""" Handle batsim submitter finished event. """
if event.notify_type == BatsimNotifyType.NO_MORE_STATIC_JOB_TO_SUBMIT:
self.__no_more_jobs_to_submit = True
elif event.notify_type == BatsimNotifyType.NO_MORE_EXTERNAL_EVENT_TO_OCCUR:
self.__no_more_external_event_to_occur = True
elif event.notify_type == BatsimNotifyType.EVENT_MACHINE_UNAVAILABLE:
assert event.resources
for res_id in event.resources:
res = self.__platform.get(res_id)
res._set_unavailable()
if isinstance(res, Host):
self.__dispatch_event(HostEvent.STATE_CHANGED, res)
elif event.notify_type == BatsimNotifyType.EVENT_MACHINE_AVAILABLE:
assert event.resources
for res_id in event.resources:
res = self.__platform.get(res_id)
res._set_available()
if isinstance(res, Host):
self.__dispatch_event(HostEvent.STATE_CHANGED, res)
def __on_sigterm(self, signum, frame) -> None:
""" Close simulation on sigterm. """
self.__close_simulator()
sys.exit(signum)