RFC 21 says:
RUN
The job is able to run or is running. The job manager sends a request to the exec service to start the job, then logs a start event once the job shells have been started. It then sends a request to the exec service to wait for completion, then logs a finish event once all job shells have exited.
We need to define a first cut of these RPCs for the simulator sprint. The initial cut need not cover "partial exit" which would allow partial release of a job's resources back to the scheduler when its teardown is spread out over time.
The job manager sends a request to the exec service to start the job, then logs a start event once the job shells have been started. It then sends a request to the exec service to wait for completion, then logs a finish event once all job shells have exited.
What would be the drawback of the exec service sending multiple replies to the start request instead of requiring separate wait and start RPCs? This would simplify the implementation of the exec service a bit, since it can generate events (responses) directly as the execution state of a job changes, instead of setting up a machinery for waiting on events for specific jobs. Another wrinkle in supporting separate RPCs is that execution state would have to be preserved until a wait request is received (i.e. until the exec state is reaped)
On the other hand, a drawback of multi-response is that multiple wait-ers would not be possible, nor would a wait from a different handle than the initial start request be possible (if support for these features is needed or desirable).
If these are necessary, then we could keep the separate start and wait single-response RPCs.
The initial cut need not cover "partial exit" which would allow partial release of a job's resources back to the scheduler when its teardown is spread out over time.
Actually, it would be nice to have a first cut of how this will work. In #1134 it was stated that
Finally, when the job begins terminating it [the execution service] manages the job state and returns resources to the scheduler.
In RFC 21 it isn't clearly specified if the exec service returns resources to the scheduler, or if the job-manager is responsible for this task. If the job-manager is going to issue multiple free events, then the exec service will have to issue multiple responses to the wait request (in the case the wait RPC is used) or have a way to indicate partial completion back to job-manager for multi-response start request. (In the latter case, the protocol could be:
job-manager job-exec
start ------------>
<------------ started
log start
<------------ release (R1)
log free
<------------ release (R2)
log free
<------------ finish (all resources guaranteed released)
log finish
This would require at least one release response to the job-manager before finish, even in the case where partial release isn't being supported, but would open the door to trivially supporting partial release in the future (in fact, the scheduler already supports it)
What would be the drawback of the exec service sending multiple replies to the start request instead of requiring separate wait and start RPCs?
IMHO sounds fine to combine them as you suggest. At least I can't recall a justification for splitting them up, and if it simplifies implementation, let's go that way and see how it turns out.
In RFC 21 it isn't clearly specified if the exec service returns resources to the scheduler, or if the job-manager is responsible for this task.
I was thinking exec would free to job manager and job manager would free to scheduler. If that is acceptable, I think your proposal for separate release and finish responses is good!
One consideration is right now the sched.free RPC implicitly refers to R, which sched fetches from the KVS. I guess we need to support an optional argument that can refer to specific R fragments (still via the KVS I guess?) I was sort of wondering (probably naively) whether one could get away with passing back just an idset referring to broker ranks, and then sched could "index" the original R to implement partial release?
One consideration is right now the sched.free RPC implicitly refers to R, which sched fetches from the KVS. I guess we need to support an optional argument that can refer to specific R fragments (still via the KVS I guess?) I was sort of wondering (probably naively) whether one could get away with passing back just an idset referring to broker ranks, and then sched could "index" the original R to implement partial release?
That could work for now! I think it makes sense that the smallest fragment of R that might be released is a broker rank (i.e. one job shell), so this approach might even work long term.
(thinking out loud now...) It would be trivial, however, for the exec system to write a fragment of R to the KVS and for the scheduler to fetch this path instead of the whole R. The "trail of Rs" in the kvs might also be useful for a reporting tool to calculate the currently used resources. This could be a first foray in how to record R for a job that grows/shrinks (in this case shrinks)
At the very least I suppose the free event in the eventlog would record the list of ranks for restart purposes?
At the very least I suppose the free event in the eventlog would record the list of ranks for restart purposes?
Right, that makes sense.
Here's a stab at a protocol. First try! Comments welcome!
This would use the same "matchtagless" technique used for sched.alloc and sched.free.
job-exec.start
Request: "{s:I s:i}", "id", job->id, "userid", job->userid
Response: "{s:I s:i s:o}", "id", &id, "type", &type, "data", &data
Where type determines format of data object:
{s:i s:s s?:s}, "severity", &xseverity, "type", &xtype, "note", &xnote)Example response streams:
Does that make sense?
Perfect, this is essentially what I had already in my prototype.
However, I would slightly prefer type be a string: "start", "release" "finish", this is much easier to debug in code and on the wire. However, if you have a good reason to use an integer I could be easily swayed.
For type == release, why don't we say data is and idset of ranks from which resources are being released, and use "all" for now to indicate release of whole R, e.g.
{s:s}, "ranks", "all"?
Sounds good on both counts (strings and "all").
Clarification: should we go with "{s:s}", "ranks", &idset for the data object in the "release" response (to keep it an object)?
Clarification: should we go with "{s:s}", "ranks", &idset for the data object in the "release" response (to keep it an object)?
Yes, keeping the data object as you suggest seems good to me.
As discussed face to face: we need a way to report the job "result" _before_ resources have been released to handle the case where the job has completed successfully, but the resources may still be tied up running the epilog. We had said we wanted to attach the "result or exception available" semantic to the transition of job state to CLEANUP.
So let's add a flag to release to indicate that all resources have been released, and then say that finish should precede the first release event. Thus the end-of-stream is now the release with flag set.
Examples would change to:
Successful execution: start, finish, release flag=1.
Failure starting job shells: exception, release flag=1.
Flux failure during execution: start, exception, release flag=1.
Where eventually release could be replaced with:
release flag=0, release flag=0,... release flag=1
FYI -- I think one use case for this would be burst buffer support like SCR, which wants to drain the checkpoint & restart files after the program has successfully completed.
and then say that finish should precede the first release event.
finish may not always precede the first release event, since resources could be released after one or more job shells have exited but before all job shells have exited. However, it will always precede the final release event (with flag set), which is the terminator for the streaming response.
FYI -- I think one use case for this would be burst buffer support like SCR, which wants to drain the checkpoint & restart files after the program has successfully completed.
Hm, that is a good use case. Maybe this can be accomplished by a user epilog that runs in the job shell (thus job shell hasn't exited, so no finish or release event)? There may be other ways to do it though (e.g. user selectable "plugin" to the system cleanup/epilog script)
Just to close the loop, I propose the release response data be defined as:
"{s:s s:b}", "ranks", &idset, "final", &final
I talked with Kathryn Mohror at a meeting about the mechanisms needed by SCR as related to this. @kathrynmohror: please comment on the above when you have a chance. (I know you are swamped and it will take a couple of days to get to it.)
https://github.com/flux-framework/flux-core/issues/2040#issuecomment-471719040
Following up on face to face with @grondo - would be nice if the exec service could be dynamically registered with the job manager so that simulator initial program can take over exec service even though the real exec service might have been loaded by rc1.
As with the scheduler interface, we could implement a job-manager.exec-hello RPC:
Request: "{s:s}", "service", service_name
Response: success or failure
Prior to the first hello, any jobs entering RUN state will not attempt to send start requests. Upon receipt of hello, active jobs are scanned for any in RUN state and <service_name>.start requests are sent for these.
If a new hello is received, the request is denied if any jobs have outstanding start requests, thus the system must be drained of running jobs first. (This avoids needing to handle some corner cases with multiple active exec services, and I think we don't have a motivation to allow multiple active exec services). If the hello is allowed, the new service name is pushed on a stack of names in the job-manager and used for subsequent jobs entering RUN.
When unloading, the exec service should first unregister its service, then generate an exception for all jobs it is running, and ensure that each start response stream is terminated with a release final=true response so that all resources are freed. It may then exit. When the job manager receives ENOSYS in response to <service_name>.start, it pops the current service off its stack, clears the "start pending" flag in all active jobs, and (starting from a quiesced system again) tries the service name exposed on the stack, if any.
Excellent! :+1:
There's actually another race here - if multiple start requests are in the pipeline when a service unloads, multiple ENOSYS responses may come back and it will be hard to tell which service it came from (causing stack to pop again).
Rather than work around that, I'm wondering if maybe we should just track the currently registered exec service and forget about the previous one. In fact, we could maybe have the job-manager send a shutdown request to the previous one to force it to cancel jobs/return resources before taking up with the new one?
Although...possibly sending a "generation number" in the matchtag field of these requests could help us match ENOSYS responses to particular service instances. That might be good for sched too, where I think there's a possible race when the scheduler is quickly unloaded and reloaded
Rather than work around that, I'm wondering if maybe we should just track the currently registered exec service and forget about the previous one.
That seems fine to me. However, isn't the original topic string of the request copied to the ENOSYS response? i.e. wouldn't flux_msg_get_topic (msg, &topic) allow you to differentiate the service that got the ENOSYS?
Just to close the loop, I propose the release response data be defined as:
"{s:s s:b}", "ranks", &idset, "final", &final
According to RFC 16:
Once all job shells have exited and all outstanding writes to the guest namespace have stopped, the exec system links the guest namespace into the primary KVS namespace before notifying the job manager that the job is finished.
This means there may be period after exec system releases the final set of resources (release response with final=true) where there are still writes to the job kvs directory in flight.
Of course, the exec system could just delay the release final=1 message until after the guest namespace has quiesced and been moved to the main namespace, but this might be a bit tricky (though maybe not, haven't tried it yet) and it seems a shame to delay release of resources for this reason.
However, I'd also hate to add another message to the protocol...
For now I'll try to implement the first idea above, but I wanted to bring up the issue anyway.
I can't say I have followed all the discussion here but I'll do my best to comment. From what Dong said, you are discussing the case where something like SCR would need to hold on to allocation resources after the main application finishes successfully or terminates abnormally. I agree that is a needed capability. For SCR we will want to to drain data from SSDs or memory (or whatever storage) to the parallel file system or we may also restart the job in the same allocation. What I am not following is does the 'finish' state always mean the job terminates successfully? Or can it also mean abnormal termination?
What I am not following is does the 'finish' state always mean the job terminates successfully? Or can it also mean abnormal termination?
Here finish means that all processes in the job have exited and some kind of exit status is available for collection. At this low level, Flux doesn't have a separate state for "abnormal termination" so a tool would have to further examine the exit code or other job state to determine the final status. At some point an aggregated exit status will probably be available in the eventlog as context for the finish event.
I would hope the discussion in this issue is at a lower level than developers of tools like SCR will have to care about, so no sense in following this directly. However, perhaps we should open an issue on the SCR use case in general to make sure our early versions of the new execution system can handle it easily.
I kind of wonder if SCR support will be a function of the "job shell" (the user process that actually runs user compute tasks). Resources aren't released until the job shell exits, so an SCR-enabled job shell could just keep the job shell from exiting until it has finished draining data.
I'll try to open an issue and mention you so you can comment if we get off track.
Thanks Mark! Yes, we usually have a wrapper script called from the batch script that manages the work needed before and after the application runs. So, you're right that the job shell should still be alive if SCR is alive.
In another project (Unify - a distributed file system across burst buffers, implemented in user space), we do plan on accessing compute resources in the epilog. In this model, would the resources still be accessible in the epilog?
In this model, would the resources still be accessible in the epilog?
Yes, I should have mentioned that in our model, a release event isn't issued until an epilog/cleanup script completes on the resources (e.g. much like Slurm's completing state).
Of course, the exec system could just delay the release final=1 message until after the guest namespace has quiesced and been moved to the main namespace, but this might be a bit tricky (though maybe not, haven't tried it yet)
I'm not sure if we have all the messaging just right here, so maybe we should brainstorm a bit. There _does_ need to be some sort of handshake from exec -> job manager when exec commit is done, so that the job manager knows it's OK to archive the job directory (and notify state watchers)...
I guess the question is whether we should add a message after the release final=1 message so that resources can be freed as early as possible. One question: do we envision that exec will be committing R fragments that will need to be read by sched? If so then release would have a similar requirement. But it sounded like maybe the idset would work as a long term solution?
do we envision that exec will be committing R fragments that will need to be read by sched? If so then release would have a similar requirement. But it sounded like maybe the idset would work as a long term solution?
I think only being able to release a fragment at the resolution of a broker rank might be limiting in the far future. Examples:
Of course there are probably ways around all these "issues" if we find that supporting fragments of R isn't the right approach.
There does need to be some sort of handshake from exec -> job manager when exec commit is done, so that the job manager knows it's OK to archive the job directory (and notify state watchers)...
For now I was working on the assumption that the exec system will delay the release final=1 until after the guest kvs namespace has completed its move. We could always say that a release message with empty ranks, e.g.
{"type":"release", "data":{"ranks":"","final":true}}
is allowed if it turns out that all resources sometimes need to be released before the final message. I can't imagine the move of the namespace will ever take long enough to notice though.
That sounds like a good way to proceed.
Closed by #2077
Protocol between job-manager and job-exec is summarized at the top of:
https://github.com/flux-framework/flux-core/blob/master/src/modules/job-manager/start.c