Source code for batsim_py.jobs

from abc import ABC
from enum import Enum
from typing import Optional
from typing import Sequence
from typing import Dict


[docs]class JobState(Enum): """ Batsim Job State Types """ UNKNOWN = 0 SUBMITTED = 1 RUNNING = 2 COMPLETED_SUCCESSFULLY = 3 COMPLETED_FAILED = 4 COMPLETED_WALLTIME_REACHED = 5 COMPLETED_KILLED = 6 REJECTED = 7 ALLOCATED = 8 def __str__(self) -> str: return self.name
[docs]class JobProfile(ABC): """ Batsim Job Profile base class. Args: name: The profile name. Must be unique within a workload. """ def __init__(self, name: str) -> None: self.__name = str(name) @property def name(self) -> str: """The profile's name. """ return self.__name
[docs]class DelayJobProfile(JobProfile): """ Batsim Delay Job Profile class. This class describes a job that will sleep during the defined time. Args: name: The profile name. Must be unique within a workload. delay: The sleep time. Raises: ValueError: In case of delay is not greater than zero. Examples: A job that will sleep for 100 seconds. >>> profile = DelayJobProfile(name="delay", delay=100) """ def __init__(self, name: str, delay: float) -> None: if delay <= 0: raise ValueError('Expected `delay` argument to be a number ' 'greater than zero, got {}'.format(delay)) super().__init__(name) self.__delay = float(delay) @property def delay(self) -> float: """The sleep time. """ return self.__delay
[docs]class ParallelJobProfile(JobProfile): """ Batsim Parallel Job Profile class. This class describes a job that performs a set of computations and communications on the allocated hosts. Args: name: The profile name. Must be unique within a workload. cpu: A list that defines the amount of flops to be computed on each allocated host. com: A list [host x host] that defines the amount of bytes to be transferred between the allocated hosts. Raises: ValueError: In case of `cpu` is not provided or `com` has an invalid size or their values are less than zero. Examples: Two hosts computing 10E6 flops each with local communication only. >>> profile = ParallelJobProfile(name="parallel", cpu=[10e6, 10e6], com=[5e6, 0, 0, 5e6]) Two hosts computing 2E6 flops each with host 1 sending 5E6 bytes to host 2. >>> profile = ParallelJobProfile(name="parallel", cpu=[2e6, 2e6], com=[0, 5e6, 0, 0]) One host computing 2E6 flops with host 1 sending 5E6 bytes to host 2 and host 2 sending 10E6 bytes to host 1. >>> profile = ParallelJobProfile(name="parallel", cpu=[2e6, 0], com=[0, 5e6, 10e6, 0]) """ def __init__(self, name: str, cpu: Sequence[float], com: Sequence[float]) -> None: if not cpu: raise ValueError('Expected `cpu` argument to be a non ' 'empty sequence, got {}.'.format(cpu)) if len(com) != len(cpu) * len(cpu): raise ValueError('Expected `com` argument to be a ' 'list of size [host x host], got {}'.format(com)) if not all(c >= 0 for c in cpu): raise ValueError('Expected `cpu` argument to have values ' 'not less than zero.') if not all(c >= 0 for c in com): raise ValueError('Expected `com` argument to have values ' 'not less than zero.') super().__init__(name) self.__cpu = list(cpu) self.__com = list(com) @property def cpu(self) -> Sequence[float]: """The amount of flops to be computed on each host.""" return self.__cpu @property def com(self) -> Sequence[float]: """The amount of bytes to be transferred between hosts.""" return self.__com
[docs]class ParallelHomogeneousJobProfile(JobProfile): """ Batsim Parallel Homogeneous Job Profile class. This class describes a job that performs the same computation and communication on all allocated hosts. Args: name: The profile name. Must be unique within a workload. cpu: The amount of flops to be computed on each allocated host. com: The amount of bytes to be sent and received between each pair of hosts. The loopback communication of each machine defaults to 0. Raises: ValueError: In case of `com` or `cpu` are not greater than zero. Examples: >>> profile = ParallelHomogeneousJobProfile("name", cpu=10e6, com=5e6) """ def __init__(self, name: str, cpu: float, com: float) -> None: if cpu == 0 and com == 0: raise ValueError('Expected `cpu` or `com` argument ' ' to be defined.') if cpu < 0: raise ValueError('Expected `cpu` argument to be ' 'not less than zero.') if com < 0: raise ValueError('Expected `com` argument to be ' 'not less than zero.') super().__init__(name) self.__cpu = float(cpu) self.__com = float(com) @property def cpu(self) -> float: """The amount of flops to be computed on each host.""" return self.__cpu @property def com(self) -> float: """The amount of bytes to be transferred between hosts.""" return self.__com
[docs]class ParallelHomogeneousTotalJobProfile(JobProfile): """ Batsim Parallel Homogeneous Total Job Profile class. This class describes a job that equally distributes the total amount of computation and communication between the allocated hosts. Args: name: The profile name. Must be unique within a workload. cpu: The total amount of flops to be computed on all hosts. com: The total amount of bytes to be sent and received on each pair of hosts. Raises: ValueError: In case of `com` or `cpu` are not greater than zero. Examples: >>> profile = ParallelHomogeneousTotalJobProfile("name", cpu=10e6, com=5e6) """ def __init__(self, name: str, cpu: float, com: float) -> None: if cpu == 0 and com == 0: raise ValueError('Expected `cpu` or `com` arguments ' ' to be defined.') if cpu < 0: raise ValueError('Expected `cpu` argument to be ' 'not less than zero.') if com < 0: raise ValueError('Expected `com` argument to be ' 'not less than zero.') super().__init__(name) self.__cpu = float(cpu) self.__com = float(com) @property def cpu(self) -> float: """The total amount of flops to be computed.""" return self.__cpu @property def com(self) -> float: """The total amount of bytes to be sent and received.""" return self.__com
[docs]class ComposedJobProfile(JobProfile): """ Batsim Composed Job Profile class. This class describes a job that executes a sequence of profiles. Args: name: The profile name. Must be unique within a workload. profiles: The profile names to execute sequentially. repeat: The number of times to repeat the sequence. Defaults to 1. Raises: ValueError: In case of `repeat` argument is less than 1 or profiles list is empty. Examples: >>> composed = ComposedJobProfile("composed", profiles=["p1", "p2"], repeat=2) """ def __init__(self, name: str, profiles: Sequence[str], repeat: int = 1) -> None: if repeat <= 0: raise ValueError('Expected `repeat` argument to be greater' 'than 0, got {}'.format(repeat)) if not profiles: raise ValueError('Expected `profiles` argument to be a non ' 'empty sequence, got {}.'.format(profiles)) super().__init__(name) self.__repeat = int(repeat) self.__profiles = list(profiles) @property def repeat(self) -> int: """The number of times to repeat the profile sequence.""" return self.__repeat @property def profiles(self) -> Sequence[str]: """The sequence of profiles to execute.""" return self.__profiles
[docs]class ParallelHomogeneousPFSJobProfile(JobProfile): """ Batsim Homogeneous Job with IO to/from PFS Profile class. This class describes a job that represents an IO transfer between the allocated hosts and a storage resource. Args: name: The profile name. Must be unique within a workload. bytes_to_read: The amount of bytes to read. bytes_to_write: The amount of bytes to write. storage: The storage resource label. Raises: ValueError: In case of `bytes_to_read` or `bytes_to_write` are not greater than zero or `storage` is not a valid string. Examples: >>> pfs = ParallelHomogeneousPFSJobProfile("pfs", bytes_to_read=10e6, bytes_to_write=1e6, storage="pfs") """ def __init__(self, name: str, bytes_to_read: float, bytes_to_write: float, storage: str) -> None: if bytes_to_read == 0 and bytes_to_write == 0: raise ValueError('Expected `bytes_to_read` or `bytes_to_write` ' 'arguments to be defined.') if bytes_to_read < 0: raise ValueError('Expected `bytes_to_read` argument to be ' 'not less than zero, got {}'.format(bytes_to_read)) if bytes_to_write < 0: raise ValueError('Expected `bytes_to_write` argument to be ' 'not less than zero, got {}'.format(bytes_to_write)) if not storage: raise ValueError('Expected `storage` argument to be a ' 'valid string, got {}'.format(storage)) super().__init__(name) self.__bytes_to_read = float(bytes_to_read) self.__bytes_to_write = float(bytes_to_write) self.__storage = str(storage) @property def bytes_to_read(self) -> float: """ The amount of bytes to read. """ return self.__bytes_to_read @property def bytes_to_write(self) -> float: """ The amount of bytes to write. """ return self.__bytes_to_write @property def storage(self) -> str: """ The storage label. """ return self.__storage
[docs]class DataStagingJobProfile(JobProfile): """ Batsim Data Staging Job Profile class. This class describes a job that represents an IO transfer between two storage resources. Args: name: The profile name. Must be unique within a workload. nb_bytes: The amount of bytes to be transferred. src: The sending storage label (source). dest: The receiving storage label (destination). Raises: ValueError: In case of `nb_bytes` argument is not greater than or equal to zero and `src` or `dest` arguments are not valid strings. Examples: >>> data = DataStagingJobProfile("data", nb_bytes=10e6, src="pfs", dest="nfs") """ def __init__(self, name: str, nb_bytes: float, src: str, dest: str) -> None: if nb_bytes <= 0: raise ValueError('Expected `nb_bytes` argument to be a ' 'positive number, got {}'.format(nb_bytes)) if not src: raise ValueError('Expected `src` argument to be a ' 'valid string, got {}'.format(src)) if not dest: raise ValueError('Expected `dest` argument to be a ' 'valid string, got {}'.format(dest)) super().__init__(name) self.__nb_bytes = float(nb_bytes) self.__src = str(src) self.__dest = str(dest) @property def dest(self) -> str: """ The receiving storage label.""" return self.__dest @property def src(self) -> str: """ The sending storage label. """ return self.__src @property def nb_bytes(self) -> float: """ The amount of bytes to be transferred. """ return self.__nb_bytes
[docs]class Job: """ This class describes a rigid job. Args: name: The job name. Must be unique within a workload. workload: The job workload name. res: The number of resources requested. profile: The job profile to be executed. subtime: The submission time. walltime: The execution time limit (maximum execution time). Defaults to None. user_id: The user id. Defaults to None. Raises: ValueError: In case of invalid arguments value. """ WORKLOAD_SEPARATOR: str = "!" def __init__(self, name: str, workload: str, res: int, profile: JobProfile, subtime: float, walltime: Optional[float] = None, user_id: Optional[int] = None) -> None: if not name: raise ValueError('Expected `name` argument to be a ' 'valid string, got {}'.format(name)) if not workload: raise ValueError('Expected `workload` argument to be a ' 'valid string, got {}'.format(workload)) if res <= 0: raise ValueError('Expected `res` argument to be greater ' 'than zero, got {}'.format(res)) if subtime < 0: raise ValueError('Expected `subtime` argument to be greater ' 'than or equal to zero, got {}'.format(subtime)) if walltime is not None and walltime <= 0: raise ValueError('Expected `walltime` argument to be greater ' 'than zero, got {}'.format(walltime)) assert isinstance(profile, JobProfile) self.__id = f"{workload}{self.WORKLOAD_SEPARATOR}{name}" self.__res = int(res) self.__profile = profile self.__subtime = float(subtime) self.__walltime = walltime self.__user_id = user_id self.__storage_mapping: Optional[Dict[str, int]] = None self.metadata: dict = {} self.__state: JobState = JobState.UNKNOWN self.__allocation: Optional[Sequence[int]] = None self.__start_time: Optional[float] = None # will be set on start self.__stop_time: Optional[float] = None # will be set on terminate def __repr__(self) -> str: return f"Job_{self.id}" @property def id(self) -> str: """ The job ID. """ return self.__id @property def name(self) -> str: """ The job name. """ return self.id.split(self.WORKLOAD_SEPARATOR)[1] @property def workload(self) -> str: """ The job workload name. """ return self.id.split(self.WORKLOAD_SEPARATOR)[0] @property def subtime(self) -> float: """ The job submission time. """ return self.__subtime @property def storage_mapping(self) -> Optional[Dict[str, int]]: """ The job storage mapping. """ return self.__storage_mapping @property def res(self) -> int: """ The number of resources requested. """ return self.__res @property def profile(self) -> JobProfile: """ The job profile. """ return self.__profile @property def walltime(self) -> Optional[float]: """ The job maximum execution time. """ return self.__walltime @property def user_id(self) -> Optional[int]: """ The user id. """ return self.__user_id @property def state(self) -> JobState: """ The current job state. """ return self.__state @property def allocation(self) -> Optional[Sequence[int]]: """ The allocated resource ids. """ if self.__allocation: return list(self.__allocation) else: return None @property def is_running(self) -> bool: """ Whether this job is running or not. """ return self.__state == JobState.RUNNING @property def is_runnable(self) -> bool: """ Whether this job is able to start or not. """ return self.__state == JobState.ALLOCATED @property def is_submitted(self) -> bool: """ Whether this job was submitted or not. """ return self.__state == JobState.SUBMITTED @property def is_finished(self) -> bool: """ Whether this job has finished or not. """ states = (JobState.COMPLETED_SUCCESSFULLY, JobState.COMPLETED_FAILED, JobState.COMPLETED_WALLTIME_REACHED, JobState.COMPLETED_KILLED) return self.__state in states @property def is_rejected(self) -> bool: """ Whether this job was rejected or not. """ return self.__state == JobState.REJECTED @property def start_time(self) -> Optional[float]: """ The job start time. """ return self.__start_time @property def stop_time(self) -> Optional[float]: """ The job stop time. """ return self.__stop_time @property def waiting_time(self) -> Optional[float]: """ The job waiting time. """ if self.start_time is None: return None else: return self.start_time - self.subtime @property def runtime(self) -> Optional[float]: """ The job runtime. The minimum value is 1 second. """ if self.stop_time is None or self.start_time is None: return None else: return max(1, self.stop_time - self.start_time) @property def stretch(self) -> Optional[float]: """ The job stretch. """ stretch = None if self.waiting_time is not None: if self.walltime is not None: stretch = self.waiting_time / self.walltime elif self.runtime is not None: stretch = self.waiting_time / self.runtime return stretch @property def turnaround_time(self) -> Optional[float]: """ The job turnaround time. """ if self.waiting_time is None or self.runtime is None: return None else: return self.waiting_time + self.runtime @property def per_processor_slowdown(self) -> Optional[float]: """ The job per-processor slowdown. """ if self.turnaround_time is None or self.runtime is None: return None else: return max(1., self.turnaround_time / (self.res * self.runtime)) @property def slowdown(self) -> Optional[float]: """ The job slowdown. """ if self.turnaround_time is None or self.runtime is None: return None else: return max(1., self.turnaround_time / self.runtime) def _allocate(self, hosts: Sequence[int], storage_mapping: Dict[str, int] = None) -> None: """ Allocate hosts for the job. This is an internal method to be used by the simulator only. Args: hosts: A sequence of host ids. storage_mapping: The resource ids of the storages to use. Defaults to None. Required only if the job needs a storage resource. Raises: RuntimeError: In case of job is not in the queue (e.g. job is runnable or running). ValueError: In case of the number of allocated resources does not match the number requested or the job needs a storage and the mapping is not defined or valid. """ if not self.is_submitted: raise RuntimeError('The job is not in the queue anymore, ' f'got {self.state}') if len(hosts) != self.res: raise ValueError('Expected `hosts` argument to be a list ' f'of length {self.res}, got {hosts}') if isinstance(self.profile, DataStagingJobProfile): if not storage_mapping: raise ValueError(f'Expected storage mapping to be defined, ' f'got {storage_mapping}') if self.profile.src not in storage_mapping: raise ValueError(f'Expected storage mapping to ' f'include {self.profile.src}') if self.profile.dest not in storage_mapping: raise ValueError(f'Expected storage mapping to ' f'include {self.profile.dest}') elif isinstance(self.profile, ParallelHomogeneousPFSJobProfile): if not storage_mapping: raise ValueError(f'Expected storage mapping to be defined, ' f'got {storage_mapping}') if self.profile.storage not in storage_mapping: raise ValueError(f'Expected storage mapping to ' f'include {self.profile.storage}') elif storage_mapping: raise ValueError(f'Expected storage mapping to be empty, ' f'got {storage_mapping}') self.__allocation = list(hosts) self.__storage_mapping = storage_mapping self.__state = JobState.ALLOCATED def _reject(self) -> None: """ Rejects the job. This is an internal method to be used by the simulator only. Raises: RuntimeError: In case of job is not in the queue (e.g. job is runnable or running). """ if not self.is_submitted: raise RuntimeError('Cannot reject a job that is not in the ' f'queue, got {self.state}') self.__state = JobState.REJECTED def _submit(self, subtime: float) -> None: """ Submits the job. This is an internal method to be used by the simulator only. Args: subtime: The submission time. Raises: RuntimeError: In case of job was already submitted. ValueError: In case of subtime is invalid. """ if self.state != JobState.UNKNOWN: raise RuntimeError('This job was already submitted.' f'got, {self.state}') if subtime < 0: raise ValueError('Expected `subtime` argument to be greather ' f'than zero, got {subtime}') self.__state = JobState.SUBMITTED self.__subtime = float(subtime) def _start(self, current_time: float) -> None: """ Start the job. This is an internal method to be used by the simulator only. Args: current_time: The current simulation time. Raises: RuntimeError: In case of job is not runnable ValueError: In case of current time is less than the submission time. """ if not self.is_runnable: raise RuntimeError('The job cannot start if it is not ' f'runnable, got {self.state}') if current_time < self.subtime: raise ValueError('The `current_time` argument cannot be less ' 'than the job submission time, ' f'got {current_time}') self.__start_time = float(current_time) self.__state = JobState.RUNNING def _terminate(self, current_time: float, final_state: JobState) -> None: """ Terminate the job. This is an internal method to be used by the simulator only. Args: current_time: The current simulation time. final_state: The job final state. Raises: RuntimeError: In case of the job is not running. ValueError: In case of current time is less than the job start time or the final state is not one of the possible ones. """ if not self.is_running or self.start_time is None: raise RuntimeError('The job cannot be finished if it is not running' f'got, {self.state}') valid_states = (JobState.COMPLETED_SUCCESSFULLY, JobState.COMPLETED_FAILED, JobState.COMPLETED_KILLED, JobState.COMPLETED_WALLTIME_REACHED) if not final_state in valid_states: raise ValueError('Expected `final_state` argument to be valid, ' f'got {final_state}') if current_time < self.start_time: raise ValueError('Expected `current_time` argument to be greather ' f'than start_time, got {current_time}') self.__stop_time = float(current_time) self.__state = final_state