:py:mod:`job.walk` ================== .. py:module:: job.walk .. autoapi-nested-parse:: DAG walk and execution framework. Module Contents --------------- Classes ~~~~~~~ .. autoapisummary:: job.walk.Walk Attributes ~~~~~~~~~~ .. autoapisummary:: job.walk.logger .. py:data:: logger .. py:class:: Walk(actions: e3.collection.dag.DAG) An abstract class scheduling and executing a DAG of actions. .. |ReturnValue| replace:: :class:`~e3.anod.status.ReturnValue` :ivar actions: DAG of actions to perform. :vartype actions: DAG :ivar prev_fingerprints: A dict of e3.fingerprint.Fingerprint objects, indexed by the corresponding job ID. This dictionary contains the former fingerprints a given job or None if there was no such fingerprint. (with the job corresponding to a given entry in the DAG of actions). :vartype prev_fingerprints: dict[str, Fingerprint | None] :ivar new_fingerprints: A dict of e3.fingerprint.Fingerprint objects, indexed by the corresponding job ID. This dictionary contains the fingerprints we compute each time we create a new job (with the job corresponding to a given entry in the DAG of actions). :vartype new_fingerprints: dict[str, Fingerprint | None] :ivar job_status: A dictionary of job status (|ReturnValue|), indexed by job unique IDs. :vartype job_status: dict[str, |ReturnValue|] :ivar scheduler: The scheduler used to schedule and execute all the actions. :vartype scheduler: e3.job.scheduler.Scheduler .. py:method:: set_scheduling_params() -> None Set the parameters used when creating the scheduler. This method is expected to set the following attributes: - self.queues: Same as parameter of the same name in e3.job.scheduler.Scheduler.__init__. - self.tokens: Likewise. - self.job_timeout: Likewise. This method provides a default setup where the scheduler has 1 token and 1 queue, and where the job's maximum duration is DEFAULT_JOB_MAX_DURATION. Child classes requiring different scheduling parameters should override this method. .. py:method:: compute_fingerprint(uid: str, data: Any, is_prediction: bool = False) -> e3.fingerprint.Fingerprint | None Compute the given action's Fingerprint. This method is expected to return a Fingerprint corresponding to the state of the system should the given action be executed and succesful. It can also return None if keeping track of the result of past actions is not necessary. This implementation always returns None. Child classes requiring a different behavior may override this method. :param uid: A unique Job ID. :param data: Data associated to the job. :param is_prediction: If True this is an attempt to compute the fingerprint before launching the job. In that case if the function returns None then the job will always be launched. When False, this is the computation done after running the job (that will be the final fingerprint saved for future comparison) .. py:method:: save_fingerprint(uid: str, fingerprint: e3.fingerprint.Fingerprint | None) -> None Save the given fingerprint. For systems that require fingerprint persistence, this method is expected to save the fingerprint somewhere -- typically inside a file. Passing None as the fingerprint causes the fingerprint to be deleted instead. This implementation does nothing. Child classes taking advantage of fingerprint support should override this method to save the fingerprint at the location of their choice and using the format that suits them. :param uid: A unique Job ID. :param fingerprint: The fingerprint corresponding to the given job, or None, if the fingerprint should be deleted instead of saved. .. py:method:: load_previous_fingerprint(uid: str) -> e3.fingerprint.Fingerprint | None Get the fingerprint from the given action's previous execution. This method is expected to have the following behavior: - If the given action has already previously been executed and its fingerprint saved (see method "save_fingerprint" above), then load and return it; - Otherwise, return None. This implementation always returns None, providing a behavior where re-executing a given action always results in the corresponding job being executed. Child classes requiring a different behavior may override this method. :param uid: A unique Job ID. .. py:method:: should_execute_action(uid: str, previous_fingerprint: e3.fingerprint.Fingerprint | None, new_fingerprint: e3.fingerprint.Fingerprint | None) -> bool Return True if the given action should be performed. The purpose of this function is to provide a way to determine, based on the previous fingerprint, and the new one, whether the user of this class wants us to launch the action or not. The default implementation implements the following strategy: - when fingerprints are not in use, always execution the given action; - when fingerprints are in use, execute the given action if the fingerprint has changed. However, child classes may want to override this method to implement alternative strategies. :param uid: A unique Job ID. :param previous_fingerprint: The fingerprint of the previous execution of the action. None if the action has not been previously executed. :param new_fingerprint: The fingerprint of the new action to be performed. None if fingerprints are not used .. py:method:: create_skipped_job(uid: str, data: Any, predecessors: frozenset[str], reason: str | None, notify_end: collections.abc.Callable[[str], None], status: e3.anod.status.ReturnValue = ReturnValue.failure) -> e3.job.Job Return a failed job. This method always returns an EmptyJob. Deriving classes may override this method if they need something more specific. :param uid: A unique Job ID :param data: Data associated to the job to create :param predecessors: A list of predecessor jobs, or None :param reason: If not None, the reason for creating a failed job :param notify_end: Same as the notify_end parameter in Job.__init__ :param status: The status of the skipped job .. py:method:: create_job(uid: str, data: Any, predecessors: frozenset[str], notify_end: collections.abc.Callable[[str], None]) -> e3.job.ProcessJob :abstractmethod: Create a ProcessJob. :param uid: A unique Job ID :param data: Data associated to the job to create :param predecessors: A list of predecessor jobs, or None :param notify_end: Same as the notify_end parameter in Job.__init__ .. py:method:: request_requeue(job: e3.job.ProcessJob) -> bool :abstractmethod: Requeue the given job. Return True if the job has been requeued, False otherwise. :param job: The job to requeue. .. py:method:: get_job(uid: str, data: Any, predecessors: frozenset[str], notify_end: collections.abc.Callable[[str], None]) -> e3.job.Job Return a Job. Same as self.create_job except that this function first checks whether any of the predecessors might have failed, in which case the failed job (creating using the create_skipped_job method) is returned. :param uid: unique job identifier :param data: job data :param predecessors: set of predecessor job IDs :param notify_end: notification function .. py:method:: collect(job: e3.job.ProcessJob) -> bool Collect all the results from the given job. :param job: The job whose results we need to collect. :return: True if the job is requeued, False otherwise