Flux-core: hierarchical flux parent->child communication

Created on 16 Aug 2017  Â·  29Comments  Â·  Source: flux-framework/flux-core

I’m stuck on a technical problem with Flux, and I think you can point me in a better direction much faster than I can blindly stumble my way to a better solution. My current problem has to do with having parent flux instances communicate with their child instances (optimally via RPCs but events and KVS access would also suffice).

My problem (at a 50,000 ft level) is that we want to show the benefits of dynamically routing jobs through a hierarchy of flux instances that are distributed across many nodes. This requires the ability for parents to send jobs to their children and children to notify their parents when they are out of jobs.

To accomplish this, I threw together an admittedly hacky communication scheme for parent<-->children communication. Children communicate with their parents by calling flux_open on the parent_uri, which works just as intended, no problems there. I struggled to find a clean/easy way to get parents to communicate with their children. Since the parent has its scheduler running on a separate node than the child scheduler, it cannot just run flux_open on the local_uri of the child’s flux instance that is running the scheduler. I settled on having the parents call flux_open with an ssh connector uri. This had a major drawback in that there is a limited number of ssh connectors any parent can have, thus limiting the branching factor of the flux instance hierarchy. Is there a better way to have parent flux instances communicate with their children? I feel as though I’m missing something very obvious and have over complicated the situation.

Most helpful comment

Instead of request_encode() I would use flux_rpc() or flux_rpc_pack(). That is a bit higher level.

I just thumbs upped my own post, Sheesh, I should stop clicking every time I see a pretty icon :-1:

All 29 comments

If you use RPC, can the parent store a message sent from each of the children and reuse the routing info which should be a part of the message header?

Yeah, when you already have the overlay network set up, it's a shame to have to open additional ssh connections - and you may find that presuming that ssh keys exist where you want them is problematic as well.

Could you have the child establish the connection, but then "fetch" work from the parent? When there's no work, an outstanding request would be queued in the parent somewhere. The child registers a continuation callback for the request, so it doesn't have to block. When the parent has work, it generates a response to the child's enqueued request. When the child is ready for work it sends another request (or it could always send a request after each response). Events could be used when the parent needs to direct the child out of band to do something.

That's basically what @dongahn is suggesting I guess. The request from the child contains the routing info need to get a response back to it.

Something like "credit based flow control" could be implemented that way. E.g. client sends N requests that are queued in the parent, and as responses are received, it ensures that it always has N outstanding requests. N could be tunable.

@dongahn & @garlick: sounds like a good solution. As far as actually implementing that, presumably the easiest way is to have the child use request_encode and the parent respond by calling flux_respond.

Instead of request_encode() I would use flux_rpc() or flux_rpc_pack(). That is a bit higher level.

I just thumbs upped my own post, Sheesh, I should stop clicking every time I see a pretty icon :-1:

Instead of request_encode() I would use flux_rpc() or flux_rpc_pack().

Yeah, this way you can just copy any number of examples from existing services in flux-core.

BTW, just curious, in what context will the flux_rpc() in the child occur? Will this be implemented as an initial program/script in the child?

@grondo, it will be from a module. If I remember correctly, initial programs cannot send RPCs, correct? Or is it that they can't receive a reply?

They can't (currently) register a service. They can still act as a client as proposed above. I should find the old issue where we discussed this before, but the initial program is an interesting place to implement code to enslave an instance to its parent...

698

In the simplified scheme @garlick has proposed, using a script or program run as the initial program of the child job would seem to have many benefits. The initial program or any child of the initial program can certainly use RPCs, as this is how all the commands work.

I think it would be a really simple approach to have a child job start a "work receiving" script as its initial program, which would fetch work from the parent repeatedly until the parent told it to stop, at which point it would exit causing normal termination of the instance.

The script could also notify the parent via a different RPC as jobs complete. I don't think it needs to implement a service, and thus could be done simply with the current code.

698

Sorry missed @garlick's comment and forgot about this older issue. I'm not being too helpful I fear.
However, if you have any further questions about getting this set up and working please don't hesitate to ask

Sorry for having what seems to be the same conversation a second time. These are all great points, and I think I can see how to reduce my current module down into an initial program, greatly simplifying the workflow (as well as removing the unnecessary ssh connectors).

Great! No problem rehashing - things have changed quite a bit since last summer!

I just sat down with @dongahn to debug an interesting problem I ran into implementing this. Right now I have the root flux instance registering a watcher on "pymod.new_job" (via a pymod module) which is waiting for jobs from the UQ Pipeline. It also registers a watcher on "pymod.need_job" which is waiting on RPCs from child instances that are ready for jobs to run. If there are more requests for work from the children than the parent instance can provide (because its waiting on work from the UQP), the parent pymod instance will enqueue the child's RPC message and return control to the reactor. Then, when the "pymod.new_job" callback is called, it will take the new unit of work and add it to the response to the queued RPC. Unfortunately, this results in the response going to the UQP not to the child instance. The conclusion that @dongahn and I reached is that after the module leaves the "pymod.need_job" callback, the enqueued message is destroyed by flux. Then when the "pymod.new_job" callback attempts to respond to the destroyed, queued message, it instead responds to the message that the cb is running for. Does this seem like a valid conclusion? If so, any thoughts on how to work around this problem? I cannot guarantee that the rate of work production by the UQP will always exceed the rate of requests by the children, so I presumably need some way to queue up requests for work. Is there a flux_msg_copy that I am not seeing? Or maybe I've unknowingly fallen into an anti-pattern. Worst case, I guess I could manually copy the route from the child's message, and then create a new message and put the route in there.

Right, if you want to save the message received in a message handler you have to copy it, since it is destroyed when your handler returns. I assume you're talking about a message handler when you say a watcher, i.e. one of these

typedef void (*flux_msg_handler_f)(flux_t *h, flux_msg_handler_t *w,
                                   const flux_msg_t *msg, void *arg);

There is a message copy function (message.h):

/* Duplicate msg, omitting payload if 'payload' is false.
 */
flux_msg_t *flux_msg_copy (const flux_msg_t *msg, bool payload);

Could you perhaps also just generate the response immediately, and queue that up for later use rather than queuing a copy of the request? Then you can just fill in the payload of the response later using flux_msg_set_payload() before sending it.

I have hit another hurdle and with the tight deadline, I could use some help debugging if anyone has the time. My problem, as far as I can tell, is that the child is never receiving the RPC response from the parent. The parent receives the RPC from the child, no problem, but the response to the child seems to disappear into the ether. Any thoughts on what I may be doing wrong or how I can debug this further?

My module code can be found here: https://gist.github.com/SteVwonder/25df12a3948a6a6b5be338dca39a2374.
The output from the module at the root level and child level can be found here:
https://gist.github.com/SteVwonder/dcb1fca9b64c0736d8d239c6a2f1a674

When I attach to the child instance, the only output from the parent_handle trace is:

→ flux wreck attach 1
--------------------------------------
>[000] ||
>[022] sched_proxy_c.need_job
>[012] 8E010109FFFFFFFF00000001

When I run the root instance with FLUX_HANDLE_TRACE=1, I do see that the root is sending the response (based on the payload of the RPC), but I am not sure where it ends up. Log:

→ grep -C3 "need_job" 0-stdouterr
k[012] 8E0108000000000000000002
--------------------------------------
>[032] |239AA|
>[022] sched_proxy_c.need_job
>[012] 8E010109FFFFFFFF00000001
--------------------------------------
>[032] |E5DA7|
--
k[012] 8E0108000000000000000001
--------------------------------------
>[065] |239AA!5D7ED!0|
>[022] sched_proxy_c.need_job
>[012] 8E010109FFFFFFFF00000001
>[065] |E5DA7!5D7ED!0|
--------------------------------------
--
>[012] 8E01010BFFFFFFFF00000003
--------------------------------------
<[065] |239AA!5D7ED!0|
<[022] sched_proxy_c.need_job
<[082] {"cmdline": ["hostname"], "ntasks": 1, "nnodes": 1, "cwd": "/tmp", "walltime": 10}
<[012] 8E01020F0000000000000001
--------------------------------------
--
--------------------------------------
k[012] 8E0108000000000000000001
<[032] |239AA|
<[022] sched_proxy_c.need_job
<[082] {"cmdline": ["hostname"], "ntasks": 1, "nnodes": 1, "cwd": "/tmp", "walltime": 10}
<[012] 8E01020F0000000000000001
--------------------------------------

I would suggest that you try some simple tests to reduce the # of unknowns for your debugging

First question: if the root rpc-reponses to the messages from the children immediately (that is, synchrouns), do the children get the response? If that doesn't work, it seems there is an issue in cross instance communication from the root to children. Hopefully, it is not the case.

If that simple case does works, I would suspect that there still is a bug in enqueing of a copying of the message for a later response?


From: Stephen Herbein notifications@github.com
Sent: Friday, August 18, 2017 4:09:45 PM
To: flux-framework/flux-core
Cc: Ahn, Dong H.; Mention
Subject: Re: [flux-framework/flux-core] hierarchical flux parent->child communication (#1151)

I have hit another hurdle and with the tight deadline, I could use some help debugging if anyone has the time. My problem, as far as I can tell, is that the child is never receiving the RPC response from the parent. The parent receives the RPC from the child, no problem, but the response to the child seems to disappear into the ether. Any thoughts on what I may be doing wrong or how I can debug this further?

My module code can be found here: https://gist.github.com/SteVwonder/25df12a3948a6a6b5be338dca39a2374.
The output from the module at the root level and child level can be found here:
https://gist.github.com/SteVwonder/dcb1fca9b64c0736d8d239c6a2f1a674

When I attach to the child instance, the only output from the parent_handle trace is:

→ flux wreck attach 1

[000] ||
[022] sched_proxy_c.need_job
[012] 8E010109FFFFFFFF00000001

When I run the root instance with FLUX_HANDLE_TRACE=1, I do see that the root is sending the response (based on the payload of the RPC), but I am not sure where it ends up. Log:

→ grep -C3 "need_job" 0-stdouterr

k[012] 8E0108000000000000000002

[032] |239AA|
[022] sched_proxy_c.need_job

>[012] 8E010109FFFFFFFF00000001

>[032] |E5DA7|

k[012] 8E0108000000000000000001

[065] |239AA!5D7ED!0|
[022] sched_proxy_c.need_job
[012] 8E010109FFFFFFFF00000001

>[065] |E5DA7!5D7ED!0|

--

>[012] 8E01010BFFFFFFFF00000003

<[065] |239AA!5D7ED!0|
<[022] sched_proxy_c.need_job
<[082] {"cmdline": ["hostname"], "ntasks": 1, "nnodes": 1, "cwd": "/tmp", "walltime": 10}

<[012] 8E01020F0000000000000001

--

k[012] 8E0108000000000000000001
<[032] |239AA|
<[022] sched_proxy_c.need_job
<[082] {"cmdline": ["hostname"], "ntasks": 1, "nnodes": 1, "cwd": "/tmp", "walltime": 10}

<[012] 8E01020F0000000000000001

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHubhttps://github.com/flux-framework/flux-core/issues/1151#issuecomment-323479979, or mute the threadhttps://github.com/notifications/unsubscribe-auth/AA0nq5TeuCKs49gkjyhvssCSDRhR8uBDks5sZhm5gaJpZM4O4NUY.

@dongahn, the output posted above is for a test where the module has its job_queue "primed" by with a manually constructed job spec. This means it can and does respond immediately to requests for work (thus avoiding the copying and buffering of RPC messages).

Stephen, perhaps this should be obvious from your description, but what is
the process to reproduce the problem using your module?
(In case I have time to help)

On Aug 18, 2017 4:30 PM, "Stephen Herbein" notifications@github.com wrote:

@dongahn https://github.com/dongahn, the output posted above is for a
test where the module has its job_queue "primed" by with a manually
constructed job spec. This means it can and does respond immediately (thus
avoiding the copying and buffering of RPC messages).

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/flux-framework/flux-core/issues/1151#issuecomment-323482439,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAtSUryo9aE_iFaunG-4d9HZq90mVGE4ks5sZh54gaJpZM4O4NUY
.

I see. So even the immediate response from the parent to a child isn't happening properly...

@grondo, sorry. It is definitely not obvious how to reproduce from my description. I'm using forks of core/sched that are based off the following versions of flux-core/sched (they are a bit old, and for that I pre-apologize):

  • core: 1dc799d0e606f34bb45d3791c226f977a474f4a1
  • sched: 11373cd543d4aaa86b9ba796d9edea0fc08e3138

From there, you need to start a root flux instance and load the sched and sched_proxy_c modules. Then from that root instance, launch a nested, child instance and again load the sched and sched_proxy_c modules. You can see the exact lines that I load the sched_proxy_c with in this gist: https://gist.github.com/SteVwonder/dcb1fca9b64c0736d8d239c6a2f1a674. The only arguments are the root=1 or leaf=1 arguments.

This should be all that is required. At this point, if everything works perfectly, the root sched_proxy_c should have sent the handmade job spec to the child sched_proxy_c, and the child should have posted another request for work. Unfortunately, for me, the job spec gets sent but never makes it to the child.

BTW, have we ever tested a simple rpc child->parent->child?

Also just curious. Does changing the routing policy to FLUX_MSGFLAG_UPSTREAM make any difference?

I'm still not sure I fully understand the flow of control here, but what happens if you replace the call to flux_rpc_then with a blocking call to flux_rpc_get?

I'm afraid the problem here is that the handle with the connection to the parent has no way to wake up when receiving a message because the module itself is only waiting in the reactor registered with the main broker reactor handle. That is, you have two reactors here, one in each handle and I'm not sure by default you can process messages from both handles with the one reactor you're sleeping on.

Sorry if that is confusing, and I'm not 100% convinced I'm correct, but the test suggested should help narrow it down.

Ah that sounds right (from albeit pretty limited review on my part)

Try the rpc_get as @grondo suggests.

I need to poke at some code to see how to do this with a continuation.
It's possible we've not exposed all the hooks necessary for this to work
but it shouldn't be too hard to fix if not. I'll get on it AsAP since I
know you are on a deadline.

Sorry I didn't catch that earlier.

On Aug 18, 2017 5:57 PM, "Mark Grondona" notifications@github.com wrote:

I'm still not sure I fully understand the flow of control here, but what
happens if you replace the call to flux_rpc_then with a blocking call to
flux_rpc_get?

I'm afraid the problem here is that the handle with the connection to the
parent has no way to wake up when receiving a message because the module
itself is only waiting in the reactor registered with the main broker
reactor handle. That is, you have two reactors here, one in each handle and
I'm not sure by default you can process messages from both handles with
the one reactor you're sleeping on.

Sorry if that is confusing, and I'm not 100% convinced I'm correct, but
the test suggested should help narrow it down.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/flux-framework/flux-core/issues/1151#issuecomment-323489724,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAKX2zNJrTsZiZ78Y-e_DW1V5sHTH06iks5sZjL_gaJpZM4O4NUY
.

Ok. A synchronous rpc should be fine for my use case. I'll give it a shot after dinner.

It looks like doing either the synchronous rpc_get OR calling reactor_run with the parent handle and still using rpc+rpc_then both work. Thanks @dongahn, @grondo and @garlick for the help!

Glad it worked, in particular with the tight deadline! I think we have a plenty time to revisit interinstance communication later. Thanks @grondo and @garlick, as well!

On getting it to work with a continuation, pretty sure all you need to do is call flux_set_reactor(parent_h, flux_get_reactor (module_h)) before making any RPCs on parent_h.

Internally, the first RPC call will create a dispatcher and register a "handle watcher" on the reactor associated with the handle used in the RPC. You just have to make sure that the handle used with the RPC is already associated with the reactor you want to use.

This needs a test case and documentation. Will open an issue.

Was this page helpful?
0 / 5 - 0 ratings