One of the ideas that has been bouncing around is to have the job manager module manage the dependencies between jobs are satisfied rather than placing the burden on the scheduler. By dependencies between jobs, I mean that a dependent job should not run until some event occurs for another job. For example, I may want Job B to only run after Job A has completed with a non-zero exit code. Side note: I think resource dependencies (like a file-system or license being available) will still be left to the scheduler to handle.
In the case of a single, centralized job manager module, this should be "easy" to implement. Any job that is submitted with dependencies will be queued at the job manager, and the job manager will abstain from submitting an allocate request to the scheduler for the job while the job's dependencies are unsatisfied. Since the single job manager is aware of all job completions and exit codes, it can keep track of which of a job's dependencies are still unsatisfied. Once all of its dependencies have been satisfied, the job manager can send an allocate requests to the scheduler.
Once the job manager module is distributed across ranks, the problem becomes more complicated. Assuming each job manager rank is responsible for its own subset of jobs, each job manager will only be aware of job completions for the jobs under its management (i.e., an internal job). If an internal job has a dependency on an external job (i.e., outside of the job manager's domain), then the job manager will need some mechanism for "subscribing" to that external job's job events.
If an ensemble with a complex web of dependencies is submitted, it may be easiest/most efficient to ensure that all of those jobs are managed by the same job manager rank (or just wrapping those jobs up into their own sub-instance).
Maybe the KVS event log is a good way to orchestrate dependencies within an ensemble of jobs?
E.g. all jobs in the ensemble agree on the location of the event log. A job that "produces" a symbolic dependency adds the symbol to the eventlog (or rather the job manager managing that job appends the symbol at the appropriate time). A job that "consumes" a symbolic dependency can wait on it appearing in the event log before attempting to allocate resources (again, it's really the job manager that does the waiting).
Maybe the KVS event log is a good way to orchestrate dependencies within an ensemble of jobs?
I like that proposal. That also naturally keeps different symbolic namespaces from colliding.
Will there be some implicit symbolic dependencies generated by jobs though, e.g. based on fluid or other identifier, so that dependent jobs could be submitted after the fact or not as an ensemble? Could the job manager just use the default event log for those dependencies? (sorry if that's a dumb question, having trouble wrapping my head around it right now)
By dependencies between jobs, I mean that a dependent job should not run until some event occurs for another job.
@SteVwonder: thank you for taking the lead fo this.
Since we will probably want to support a DAG, we may want to say "a dependent job should not run until some events occur for a set of jobs". Also there may be a case where the dependent job should just be cancelled if some events occur for another job.
In general, maybe we can define our job dependency system as a mechanism to act on the dependent job when some events occur for a set of jobs. Action can be allocate request or cancel. But depending on other use cases, there maybe other actions in the future (extension point).
BTW, one thing that comes up over and over again is how to specify the job dependencies without having to have the jobid itself. I think that's where the idea of "symbol" can work great. (And it seems using event log sounds like a great idea.)
Generally, I think there is a tremendous opportunity to get this right if we talk to workflow people about what they require early enough. Once we have a strawman, maybe we should call for a meeting with Pilot2 MLSI, UQP, and ATS users to pick their brain for this.
For UQP, I think retry logic etc can significantly benefit from this.
Another interesting idea (which we don't have to pursue at this point) is have a notion of expanded event sources. Regular job event is one event source; arbitrary symbol would be another source; some markers showing up in stdout/stderr would be the other. For example, UQP can define a set of markers that appear in the stdout/stderr, which requires a retry action. This logic can be pushed into the RM. A user can install a stdout filter (e.g., regular expression) and when the zio sees this marker, it generates a symbol into the event log...
Those can be a part of our future co-design work with UQP, I think.
Great ideas!
Could the job manager just use the default event log for those dependencies?
I think for the general definition that @dongahn provided:
maybe we can define our job dependency system as a mechanism to act on the dependent job when some events occur for a set of jobs.
that using the default event log would cover 99% of use cases. At some point in the future, we might want to expand that out to include other event logs, like the sched and exec event logs. Having entirely separate event logs specially for dependencies (as @garlick proposed) would be useful too for large dependency sets. Take for example, a job that cannot start until at least one of 10K jobs completes. It would be a nightmare for the job manager in charge of that job to listen to the event logs of 10K jobs simultaneously. Creating a separate event log just for that dependency seems like the way to go in that case.
Another interesting idea (which we don't have to pursue at this point) is have a notion of expanded event sources.
I am also tempted to suggest that we support some very simple logic on events. For example, event.name == "complete" && event.context.exit_code > 0
Will there be some implicit symbolic dependencies generated by jobs though, e.g. based on fluid or other identifier, so that dependent jobs could be submitted after the fact or not as an ensemble?
BTW, one thing that comes up over and over again is how to specify the job dependencies without having to have the jobid itself. I think that's where the idea of "symbol" can work great.
Maybe I'm not thinking generally enough, but at least in my mental model, any symbol in a jobpsec that refers to another job will eventually need to be concertized into the other job's FLUID. In that case, maybe a FLUID is a valid, concrete "symbol" that refers to an already existing job (and saves the system the work of concertizing the abstract symbol).
I'm hearing two use cases here
1) DAG of jobs with symbolic dependencies (e.g. hardwired in jobspec)
2) after-the-fact dependencies, e.g. first job is already running without having arranged for a symbolic dependency, you want to submit job that depends on it
Is that right?
Would it suffice to allow a job to be submitted with an optional "dependency event source" and "dependency event sink" that each point to an eventlog? For case 1) both would point to the eventlog for the DAG, and for case 2) the source could point to another job's eventlog?
I was thinking that for case 1) there wouldn't be a need to resolve a symbol to a specific job. The symbol would just appear as an event in the shared log.
Danger: possibly half-baked idea:
On this per-DAG eventlog - maybe it would be best if the DAG were somehow represented by a "meta job" that allocates no resources but "contains' the DAG's eventlog and completes when the entire DAG completes?
maybe it would be best if the DAG were somehow represented by a "meta job" that allocates no resources but "contains' the DAG's eventlog and completes when the entire DAG completes?
Alternatively, it may be more in line with our philosophy to contain a workflow/DAG within an instance and thus have one symbolic event namespace per instance?
We could allow non-symbolic dependencies in the system instance and maybe instance owner orchestrated dependencies in sub-instances?
I hope I'm not getting us too off track!
maybe it would be best if the DAG were somehow represented by a "meta job" that allocates no resources but "contains' the DAG's eventlog and completes when the entire DAG completes?
I like the idea. The "meta job" would be the DAG manager and would handle the logic of the complex workload dependencies? A similar approach might be useful for so called "array jobs" (no interdependence, but you might want to wait on an array job as a whole)
However, it does seem like we had identified these as good cases for sub-instances -- though that would require grow/shrink and possibly communication between schedulers at different levels...
Alternatively, it may be more in line with our philosophy to contain a workflow/DAG within an instance and thus have one symbolic event namespace per instance?
That was my initial reaction (sorry posted before I saw your follow-up comment). However, in order to support job dependency workflows in a sub-instance, we'd have to more difficult-to-implement features like grow/shrink, whereas "meta job" is a bit easier to wrap my head around. However, I think you are right that if we could push ensemble jobs off into sub-instances, that much better fits the design of flux.
whereas "meta job" is a bit easier to wrap my head around. However, I think you are right that if we could push ensemble jobs off into sub-instances, that much better fits the design of flux.
True. But when I created an instance myself with a set of jobs submitted in my initial program, I had somewhat difficult time to wait for all submitted jobs to complete.
Somehow it felt like we need a bit better group control. Seems a lightweight metajob has many uses cases before latching on a full subinstance.
I do believe we can benefit from talking to workflow manager guys to look at their dependency patterns and communication needs among jobs.
when I created an instance myself with a set of jobs submitted in my initial program, I had somewhat difficult time to wait for all submitted jobs to complete.
Let's be sure we make that work, regardless! (Seems fundamental)
Summary of our conversation over coffee:
resource section of the jobspec).depend state. On the other hand, users should always be able to add new "out" dependencies to jobs, regardless of their state.file:///p/lustre1/$USER/workflow1/simdata.hdf5fluid:hungry-hippos-white-elephant?state=runningsymbol:foo@grondo, @garlick, did I miss anything? :laughing:
Quoting Stephen Herbein (2019-02-21 16:05:07)
Summary of our conversation over coffee:
• The in, out, inout, runtime syntax suggested by @trws in flux-framework/rfc
#51 (comment) seems like a good starting point. We should be able to
specify an arbitrary DAG this way. One thought was to look at other
dependency synatxes in addition to OpenMP.
Would a few links to tasking systems be useful for this? I'm not in a position
to give a set right now, but if it would let me know and I'll pull one together.
• Due to the linear progression of job states, once a job has moved from
'depend' to 'sched', it cannot go back to 'depend'. So the dependency
module/service should only handle "one-time" or "one-way" dependencies.
This is fine for "run this job after this other job", but dependencies like
"run this job only if lustre1 is up" should be handled some other way (like
by adding it to the resource section of the jobspec).
â–¡ Additionally, users should only be able to add a new "in" dependency to
a job if the job is still in the depend state. On the other hand, users
should always be able to add new "out" dependencies to jobs, regardless
of their state.
At least for me, I'd strongly prefer that we not allow the addition of new
dependencies to a job after it has been submitted. Any number of things
depending on it could be added, to be clear, but no new dependency properties of
the job itself. If we allow that, it makes reasoning about the system
substantially harder unless we introduce a state between job creation and job
submit. Given that pre-submit time period, modification could take place there,
but once it's in the system allowing users to think they have something correct
that adds a dependency after a job is added is asking for angry bug reports.
• We need to decide what to do for dependency lifetimes. Example: the Pilot2
workflow has pre-processing jobs, which each have a unique out dependency
and a GPU MD job, which each have a corresponding in dependency. Since the
two types of jobs are loosely coupled, the throughput of the pre-processing
jobs could be higher than the GPU jobs. It would be nice if the unsatisfied
out dependencies stuck around long enough that the Pilot2 team could submit
more jobs later on (a few days later, for example) and have the dependency
resolution pick up where it left off. Problem: we now potentially have a
data structure that grows unbounded.
This one, thankfully, has a good if non-obvious solution.
â–¡ Do we want an out dependency to stick around forever? â–¡ Do we want to add lifetimes to each dependency (with some default like 1 week)? And do we provide a CLI for users to pre-maturely "expire" out dependencies â–¡ Should dependencies have counts associated with them, and when the count is reached, the dependency is "garbage collected"?
There's basically no need. A dependency exists as long as a job exists that has
an output edge on that dependency. When there are no output edges on it left,
it can be reaped immediately. This works because when a job appears, if it has
an in dependency on something that does not exist, it can be scheduled
immediately because all instances of that dependency must have been completed.
It allows a dependency to logically last "forever" but keeps the memory use
bounded a factor of the number of jobs in submitted, dependent or running state.
Of course, that also means that a user could submit something they think depends
on something that actually doesn't because that thing was never run. We had a
discussion at some point about allowing a dependency type to work based on
presence or lack of a file or something too, but I'm not sure if I necessarily
like depending on file-system state. Worth some thought perhaps.
• For dependency symbols/types, we like the idea of using a URI. That way you
can unambiguously specify a dependency on an abstract symbol, a FLUID, a
file, an event, or a message.
â–¡ file:///p/lustre1/$USER/workflow1/simdata.hdf5
â–¡ fluid:hungry-hippos-white-elephant?state=running
â–¡ symbol:foo
• We will need some way to reject jobs that have a dependency when no
dependency service is loaded
â–¡ One suggestion was to always have at least a dummy dependency service
that can look for a "depends" tag in the jobspec and then immediately
reject the job.
• The dependency service/module will (like the scheduler) need some way to
register its own, custom validation script with the job-manager, based on
its abilities (e.g., supports symbols and files but not messages or
events).@grondo, @garlick, did I miss anything? 😆
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.*
At least for me, I'd strongly prefer that we not allow the addition of new dependencies to a job after it has been submitted. Any number of things depending on it could be added, to be clear, but no new dependency properties of the job itself. If we allow that, it makes reasoning about the system substantially harder unless we introduce a state between job creation and job submit. Given that pre-submit time period, modification could take place there, but once it's in the system allowing users to think they have something correct that adds a dependency after a job is added is asking for angry bug reports.
Interesting idea; I like it! I agree that would certainly make implementing the dependency system and providing meaningful warning/error messages to users much easier.
One implication of this change for users is that those who have to delete and resubmit a job because the initial dependencies were incorrect will lose their "queue time" priority bonus. Personally, I think we should just remove queue time as a factor in the priority equation as it seems to be a prime target for exploitation by "resourceful" users.
There's basically no need. A dependency exists as long as a job exists that has
an output edge on that dependency. When there are no output edges on it left,
it can be reaped immediately. This works because when a job appears, if it has
an in dependency on something that does not exist, it can be scheduled
immediately because all instances of that dependency must have been completed.
It allows a dependency to logically last "forever" but keeps the memory use
bounded a factor of the number of jobs in submitted, dependent or running state.
So to confirm: the key invariant is that job _m_ with out:foo has to be submitted before job _n_ with in:foo. That way if _m_ is still in the queue, _n_ will block. If they are submitted in reverse, _n_ would immediately run (or rather exit the DEPEND state), having assumed that foo must have already been retired.
And if multiple jobs have out:foo, then all have to provide it before a job with in:foo can run.
This is appealing from an implementation point of view, because the amount of state that a dependency manager needs to track is minimized, and inactive jobs become irrelevant to dependency resolution.
However, I'm not sure I understand how errors would be handled. If an error occurs such as a job with out: foo doesn't provide it for some reason (it's tied to a state that is not reached, or it isn't "manually" provided by some application logic before the job becomes INACTIVE), would a dependency manager then need to hold on to foo until the end of time, and propagate errors to any job submitted with in:foo? What if, after foo is failed, another job comes along with out:foo and successfully provides it? Does that finally allow foo to be retired?
On Mar 31, 2019, at 1:42 PM, Jim Garlick <[email protected]notifications@github.com> wrote:
So to confirm: the key invariant is that job m with out:foo has to be submitted before job n with in:foo. That way if m is still in the queue, n will block. If they are submitted in reverse, n would immediately run (or rather exit the DEPEND state), having assumed that foo must have already been retired.
Correct, it’s entirely order dependent. The assumption is that if you start with a sequence of tasks or jobs that run immediately and in sequence the result would be correct, this preserves the same ordering while allowing independent work to overlap without requiring too much overhead on the programmer or the runtime.
And if multiple jobs have out:foo, then all have to provide it before a job with in:foo can run.
Yup, that’s right.
This is appealing from an implementation point of view, because the amount of state that a dependency manager needs to track is minimized, and inactive jobs become irrelevant to dependency resolution.
However, I'm not sure I understand how errors would be handled. If an error occurs such as a job with out: foo doesn't provide it for some reason (it's tied to a state that is not reached, or it isn't "manually" provided by some application logic before the job becomes INACTIVE), would a dependency manager then need to hold on to foo until the end of time, and propagate errors to any job submitted with in:foo? What if, after foo is failed, another job comes along with out:foo and successfully provides it? Does that retire the error?
That is a very interesting question, OpenMP doesn’t deal with it at all, but we should if we want error propagation. I’m tempted to handle that by pivoting into a slightly different abstraction of the same kind of model. Treating the symbols as explicit queues, rather than the abstract thing that OpenMP uses, could let us set failure modes when something like that happens that we could track in a slower but cheaper storage, KVS maybe, then let users interact with to clear or handle them. I’ll give this some further thought.
In a way, error propagation through dependent jobs is just an instance of a larger problem of coupling input and output between dependent jobs, which might involve staging of files, etc.. Is that a problem for the "dependency manager" or is some other service or abstraction handling it?
I like the idea of a symbolic dependency represented as a queue, especially when thinking about it in broader terms like the above. In the discussion above, at one point we were talking about a "per-DAG eventlog". Maybe that's another way to do it, with dependent jobs somehow associated to a named DAG?
Maybe an "anonymous DAG" (one that doesn't declare a group or whatever) could function as you described earlier, with symbolic dependencies disappearing as they are fulfilled, and no possibility of coupling errors - with the nice advantage of not creating state. Possibly we could deny the creation of named DAGs to guest users, encouraging workflows of any complexity to be run in sub-instances.
Spoke with @frankd412 about the current strawman (https://github.com/flux-framework/rfc/pull/208). He liked the DAG-style dependency declaration created with the in, out, and inout tags.
He also liked the idea of conditional dependencies based on job exceptions (or lack thereof). We just need to be careful here about how much complexity we introduce. If user's need Turing Completeness (or anything close to it) to decide if their job finished successfully, they should create a post-processing script that either runs at the end of their job or afterwards as a separate job.
One thing that became evident from the discussion is that while having a file:// URI dependency scheme would be cool, but users will most likely read way too far into the capabilities of such a URI and result in confusion. Flux's capabilities in such a situation would be constrained by the overhead of constantly polling the filesystem. At best, it could check that the file actually exists after a job completes (and throw an exception if not). Users would most likely expect it to block a job until a file appears, even if the file isn't created by another job.
From the post-coffee time chat with @trws and @garlick:
Supporting conditional dependencies (i.e., depending on the success/failure of a particular job) with FLUIDs as the dependency type is relatively easy (i.e., just look up the exit code and if any exceptions occurred for that job in the KVS or job-info module). For any other dependency type, conditional dependencies must be handled carefully.
In our current strawman, there is no need to keep state about previous jobs and their abstract/conditional dependencies (if someone set an in dependency that has no matching out dependency, the job just runs). Without this historical metadata, a race-condition exists between the submission of a DAG and the actual execution of parts of it. As a pathological example, let's say I want to submit 1 million jobs, all with an out dependency named A and then submit a single post-processing job that has an in dependency named A, but I only want that post-processing job to run if all of the jobs with an out dependency named A complete successfully. Before the post-processing job can be submitted, some of the other jobs may be scheduled, run, and complete. Since those jobs have already left the system, when the post-processing job is submitted, it's in dependency won't match with them. If any of them fails, the post-processing job will still run.
To solve this, we need some way to constrain how long we keep this metadata around for. Our discussion mainly centered around creating a "namespace" for a stream of jobs that are all apart of the same DAG. The namespace would have a defined start and end marker, creating natural boundaries for tracking all of this extra metadata. You could then create conditional dependencies on any symbols within that namespace without issue.
A cool (and a little mind-bending) suggestion for how to accomplish was to create "neutered" instance that you submit your DAG to, which upon exiting would submit the DAG to it's enclosing instance. Alternatively, that "neutered" instance could grow (once we have grow/shrink) and execute the DAG.
Note: this is for the most general, complete design. On the shorter TOSS4 timeline, we plan on doing something much simpler (i.e., don't support conditional dependencies on abstract symbols, just FLUIDs).
Closing this issue after the merging of RFC26. Opening a new issue to discuss the implementation of the actual dependency module.