Based on my new knowledge about how slow synchronous submission is (#2970) I've been trying to use the Python binding's submit_async mechanism. Unfortunately I've been having a little bit of a difficulty because the future/callback behavior is confusing me.
My setup differs from bulksubmit.y in two ways:
wait_any), the jobid needs to be matched to the submission that produced that job ID. That is, a call submit_async needs to be matched to a job ID before that job finishes.Taking a cue from bulksubmit.py, I added a callback to the submit_async future that calls job.submit_get_id(...). Unfortunately, it looks like the callback is only called from reactor_run. I don't really know what reactor_run does, though, and I'm not sure it fits in very well with assumption 1. I've certainly had some trouble working with it (as you might expect when you don't know what it does).
The kind of behavior that would be easiest for me to work with would be if the calls tosubmit_async were fulfilled in the background (this part already seems to happen based on my observations) and whenever they are fulfilled, the future was marked as "ready", or the callback was invoked, or really any other trigger to allow my code to fetch the job ID efficiently. I can easily get the behavior I want with blocking submissions, but that seems to be quite slow by comparison.
Does my use-case make sense? Is there an easy and efficient way I can get the kind of behavior I want?
Unfortunately, it looks like the callback is only called from reactor_run. I don't really know what reactor_run does, though, and I'm not sure it fits in very well with assumption 1. I've certainly had some trouble working with it (as you might expect when you don't know what it does).
reactor_run is the Python cffi wrapper for the underlying C call, flux_reactor_run(). Unfortunately, the Flux reactor is required in order to use asynchronous callbacks in this way. The reactor is needed in order to process messages and events until the next event you are interested in (here, that a job submission is complete and a jobid is ready), and then calls your then callback.
I think there is some work ongoing to make some of the Python native async code work with our reactor, but for now I think you'd either have to make your main code use the Flux reactor loop, or run the reactor loop in another thread. Unfortunately, I'm not that knowlegeable about Python. There may be a better solution but I just don't know it.
BTW, your use case makes sense and I bet it will be a common one, so thanks for your patience while all the interfaces aren't quite developed yet!
Thanks @grondo. It looks like, though, if I submit jobs on one flux.Flux() instance and run the reactor loop on another instance (since in another thread) the two don't align. I.e. if I submit jobs with callbacks on instance A, calling reactor_run on instance B doesn't seem to register the callbacks from A. It seemed to "work" if I used the reactor from A, i.e. B.reactor_run(A.get_reactor()) but sometimes it would segfault. (I figured something would probably go wrong, I just wanted to see if it would work.)
I think, though, that I can make the thread with A also do the reactor run.
Edit: sorry, shouldn't have closed this, since it sounds like you might eventually make the behavior I wanted easier to obtain.
Yeah, the Futures and their underlying RPCs are definitely associated with a single flux.Flux() handle and its associated reactor. More to the point, an RPC only gets a response on the same handle from which it was initiated. So you can't send an RPC or request (like job_submit) from A and expect to recieve it on B. Think of them like sockets and it might make more sense.
There is a way to add multiple handles to a single reactor so you can receive events from all of them, though. We can figure out how to do that from Python if it helps your use case.
Thanks for the offer and the explanation. I actually just got it working reasonably simply and efficiently (I think). My first approach was just to add another thread that calls reactor_run but I couldn't figure out how to do it, since the submit_async thread and the reactor_run thread can't share handles and therefore RPCs, like you said. But I think just having it all done in one thread worked out pretty well for me---I just have calls to reactor_run(..., REACTOR_NOWAIT) after every batch of jobs submitted.
I just have calls to reactor_run(..., REACTOR_NOWAIT) after every batch of jobs submitted.
Great! That is one suggestion @SteVwonder had when we talked on the phone today. If that is working then perfect!
We had also discussed that if you want to run the reactor for a bit longer than one iteration per call, you could register an idle watcher, and have the callback for that watcher stop the reactor. This way you'd pop out the reactor instead of blocking.
In C we have these flags for flux_reactor_run(). Not sure if they work in python land:
FLUX_REACTOR_NOWAIT = 1, /* run loop once without blocking */
FLUX_REACTOR_ONCE = 2, /* same as above but block until at least */
but maybe helpul if so?
Thanks @grondo, @garlick, and @SteVwonder (who I emailed earlier about this issue). With flux.constants.FLUX_REACTOR_NOWAIT I can get a pretty good compromise between submitting jobs in batches and not waiting _too_ long to get a job ID back (and not worrying about getting stuck in flux_reactor_run indefinitely).
Thanks @jameshcorbett for opening this issue and posting your solution. This is a great use case to reference while we are developing our Workflow/Ensemble classes within Flux, which should hide a lot of these details. Cross-referencing the issues for future reference: https://github.com/flux-framework/flux-core/issues/2653