job.walk

Module Contents

Classes

Walk

An abstract class scheduling and executing a DAG of actions.

class job.walk.Walk(actions: e3.collection.dag.DAG)

An abstract class scheduling and executing a DAG of actions.

Variables:
  • actions (DAG) – DAG of actions to perform.

  • prev_fingerprints (dict[str, Fingerprint | None]) – A dict of e3.fingerprint.Fingerprint objects, indexed by the corresponding job ID. This dictionary contains the former fingerprints a given job or None if there was no such fingerprint. (with the job corresponding to a given entry in the DAG of actions).

  • new_fingerprints (dict[str, Fingerprint | None]) – A dict of e3.fingerprint.Fingerprint objects, indexed by the corresponding job ID. This dictionary contains the fingerprints we compute each time we create a new job (with the job corresponding to a given entry in the DAG of actions).

  • job_status (dict[str, ReturnValue]) – A dictionary of job status (ReturnValue), indexed by job unique IDs.

  • scheduler (e3.job.scheduler.Scheduler) – The scheduler used to schedule and execute all the actions.

set_scheduling_params() None

Set the parameters used when creating the scheduler.

This method is expected to set the following attributes:
  • self.queues: Same as parameter of the same name

    in e3.job.scheduler.Scheduler.__init__.

  • self.tokens: Likewise.

  • self.job_timeout: Likewise.

This method provides a default setup where the scheduler has 1 token and 1 queue, and where the job’s maximum duration is DEFAULT_JOB_MAX_DURATION. Child classes requiring different scheduling parameters should override this method.

compute_fingerprint(uid: str, data: Any, is_prediction: bool = False) e3.fingerprint.Fingerprint | None

Compute the given action’s Fingerprint.

This method is expected to return a Fingerprint corresponding to the state of the system should the given action be executed and succesful. It can also return None if keeping track of the result of past actions is not necessary.

This implementation always returns None. Child classes requiring a different behavior may override this method.

Parameters:
  • uid – A unique Job ID.

  • data – Data associated to the job.

  • is_prediction – If True this is an attempt to compute the fingerprint before launching the job. In that case if the function returns None then the job will always be launched. When False, this is the computation done after running the job (that will be the final fingerprint saved for future comparison)

save_fingerprint(uid: str, fingerprint: e3.fingerprint.Fingerprint | None) None

Save the given fingerprint.

For systems that require fingerprint persistence, this method is expected to save the fingerprint somewhere – typically inside a file. Passing None as the fingerprint causes the fingerprint to be deleted instead.

This implementation does nothing. Child classes taking advantage of fingerprint support should override this method to save the fingerprint at the location of their choice and using the format that suits them.

Parameters:
  • uid – A unique Job ID.

  • fingerprint – The fingerprint corresponding to the given job, or None, if the fingerprint should be deleted instead of saved.

load_previous_fingerprint(uid: str) e3.fingerprint.Fingerprint | None

Get the fingerprint from the given action’s previous execution.

This method is expected to have the following behavior:
  • If the given action has already previously been executed and its fingerprint saved (see method “save_fingerprint” above), then load and return it;

  • Otherwise, return None.

This implementation always returns None, providing a behavior where re-executing a given action always results in the corresponding job being executed. Child classes requiring a different behavior may override this method.

Parameters:

uid – A unique Job ID.

should_execute_action(uid: str, previous_fingerprint: e3.fingerprint.Fingerprint | None, new_fingerprint: e3.fingerprint.Fingerprint | None) bool

Return True if the given action should be performed.

The purpose of this function is to provide a way to determine, based on the previous fingerprint, and the new one, whether the user of this class wants us to launch the action or not.

The default implementation implements the following strategy:
  • when fingerprints are not in use, always execution the given action;

  • when fingerprints are in use, execute the given action if the fingerprint has changed.

However, child classes may want to override this method to implement alternative strategies.

Parameters:
  • uid – A unique Job ID.

  • previous_fingerprint – The fingerprint of the previous execution of the action. None if the action has not been previously executed.

  • new_fingerprint – The fingerprint of the new action to be performed. None if fingerprints are not used

create_skipped_job(uid: str, data: Any, predecessors: frozenset[str], reason: str | None, notify_end: collections.abc.Callable[[str], None], status: e3.anod.status.ReturnValue = ReturnValue.failure) e3.job.Job

Return a failed job.

This method always returns an EmptyJob. Deriving classes may override this method if they need something more specific.

Parameters:
  • uid – A unique Job ID.

  • data – Data associated to the job to create.

  • predecessors – A list of predecessor jobs, or None.

  • reason – If not None, the reason for creating a failed job.

Notify_end:

Same as the notify_end parameter in Job.__init__.

abstract create_job(uid: str, data: Any, predecessors: frozenset[str], notify_end: collections.abc.Callable[[str], None]) e3.job.ProcessJob

Create a ProcessJob.

Parameters:
  • uid – A unique Job ID.

  • data – Data associated to the job to create.

  • predecessors – A list of predecessor jobs, or None.

Notify_end:

Same as the notify_end parameter in Job.__init__.

abstract request_requeue(job: e3.job.ProcessJob) bool

Requeue the given job.

Return True if the job has been requeued, False otherwise.

Parameters:

job – The job to requeue.

get_job(uid: str, data: Any, predecessors: frozenset[str], notify_end: collections.abc.Callable[[str], None]) e3.job.Job

Return a Job.

Same as self.create_job except that this function first checks whether any of the predecessors might have failed, in which case the failed job (creating using the create_skipped_job method) is returned.

collect(job: e3.job.ProcessJob) bool

Collect all the results from the given job.

Parameters:

job – The job whose results we need to collect.

Returns:

True if the job is requeued, False otherwise