Flux-core: libsubprocess redesign

Created on 12 Feb 2018  路  52Comments  路  Source: flux-framework/flux-core

I was going to make this a wiki page, but it seems likely there will be some discussion here, so I'm starting out as an issue. Below I've pasted in the current iteration of a new flux_subprocess header file. I'll keep that up-to-date in this first post as we iterate and hone the design


As part of redesign of the flux-core execution system, the existing libsubprocess API should undergo a redesign with several goals:

  • Better integration with newer libflux reactor design
  • Symbol renames to allow export of the subprocess interface via the libflux public API
  • Abstract interface for management of remotely executed processes (subprocesses invoked via the cmd.exec interface), such that "local" (child of current process) and "remote" processes are both managed via the subprocess API
  • Replace the zio code completely with buffered IO flux reactor watchers

For the zio replacement, one idea would be to copy the interface/implementation of libevent bufferevents (See Issue #1052). Bufferevents are implemented on top of evbuffers which have a lot of nice features. I imagine if we implemented something similar in flux-core it would have quite a few use cases.

For the libsubprocess replacement, some work has been done on a new header file (see below).
The main points of the new design include:

  • Separate type for building a remote command to execute, the flux_cmd_t. This allows a user to build a single command once and create multiple identical processes, presumably across different ranks (e.g. the pdsh use case)
  • The concept of a subprocess "channel" of communication. This is currently just a stand in for the socketpair use case from the current libsubprocess code. Arbitrary named channels can be created which are implemented as something like a socketpair between the caller and the subprocess. The caller side of the socketpair is obtained via flux_subprocess_channelfd (p, name), and can be managed with reactor fd watchers.
  • A new pair of subprocess creation commands: flux_rexec() and flux_exec() are used to create remote or local subprocesses respectively (this interface may still need work). They are currently kept separate because rexec requires a flux handle, whereas exec can operate without a handle using a reactor alone.
  • Currently, there are subprocess_write and subprocess_read functions to read/write data to stdout/err/stdin of the subprocess, and corresponding subprocess callbacks on_stdout and on_stderr which would be invoked when there is data to read on these streams. Another alternative would be to make stdout/err into named channels, and users of the API could just treat the resulting fds as fd_watchers.
#ifndef _FLUX_CORE_SUBPROCESS_H
#define _FLUX_CORE_SUBPROCESS_H

#include <flux/core.h>

/*
 *  flux_cmd_t: An object that defines a command to be run, either
 *   remotely or as a child of the current process. Includes cmdline
 *   arguments, environment, and working directory. A flux_cmd_t is
 *   used to create Flux subprocesses.
 */
typedef struct flux_command flux_cmd_t;

/*
 *  flux_subprocess_t: A subprocess is an instantiation of a command
 *   as a remote or local process. A subprocess has a state (e.g.
 *   initialized, starting, running, exited, completed), a PID, and
 *   a rank if running remotely.
 */
typedef struct flux_subprocess flux_subprocess_t;

/*
 *  Typedefs for subprocess hooks and callbacks:
 */
typedef void (*flux_subprocess_f) (flux_subprocess_t *p);

/*
 *  Functions for event-driven subprocess handling:
 */
typedef struct {
    flux_subprocess_f on_completion;   /* Process exited and all I/O complete */
    flux_subprocess_f on_state_change; /* Any process state change            */
    flux_subprocess_f on_stdout;       /* Read of stdout is ready             */
    flux_subprocess_f on_stderr;       /* Read of stderr is ready             */
} flux_subprocess_ops_t;

/*
 *  General support:
 */

/*  Start a subprocess server on the handle `h`. Registers message
 *   handlers, etc for remote execution. "prefix" is the topic prefix
 *   used to listen for this service, e.g. `broker` would listen
 *   for `broker.exec`.
 */
int flux_subprocess_server_start (flux_t *h, const char *prefix);

/*
 *  Commands:
 */

/*
 *  Create a cmd object, from which subprocesses can be created
 */
flux_cmd_t * flux_cmd_create (int argc, char *argv[], char **env);

/*
 *  Create a copy of a cmd object.
 */
flux_cmd_t * flux_cmd_copy (const flux_cmd_t *cmd);

/*
 *  Destroy and free command object `cmd`
 */
void flux_cmd_destroy (flux_cmd_t *cmd);

/*
 *  Append formatted string to argv of `cmd`.
 */
int flux_cmd_argv_append (flux_cmd_t *cmd, const char *fmt, ...);

/*
 *  Return the current argument count for `cmd`.
 */
int flux_cmd_argc (flux_cmd_t *cmd);

/*
 *  Set environment vector for cmd `cmd` to `env`. Unsets all previous
 *   environment associated with `cmd`.
 */
int flux_cmd_set_environ (flux_cmd_t *cmd, char **env);

/*
 *  Set a single environment variable (name) to formatted string `fmt`.
 *   If `overwrite` is non-zero then overwrite any existing setting for `name`.
 */
int flux_cmd_setenvf (flux_cmd_t *cmd, int overwrite,
              const char *name, const char *fmt, ...);

/*
 *  Unset environment variable `name` in the command object `cmd`.
 */
void flux_cmd_unsetenv (flux_cmd_t *cmd, const char *name);

/*
 *  Return current value for environment variable `name` as set in
 *   command object `cmd`. If environment variable is not set then NULL
 *   is returned.
 */
const char *flux_cmd_getenv (flux_cmd_t *cmd, const char *name);

/*
 *  Set/get the working directory for the command `cmd`.
 */
int flux_cmd_setcwd (flux_cmd_t *cmd, const char *cwd);
const char *flux_cmd_getcwd (flux_cmd_t *cmd);


/*
 *  Request a channel for communication between process and caller.
 *   The caller side file descriptor of the channel can be obtained
 *   from the subprocess_t handle created from this cmd with the
 *   subprocess_socketpair () call (see below) using the name argument
 *   used here.
 *
 *  The `name` argument is also used as the name of an environment variable
 *   in the subprocess environment that is set to the file descriptor number
 *   of the process side of the socketpair. E.g. name = "FLUX_PMI" would
 *   result in the environment variable "FLUX_PMI_FD=N" set in the process
 *   environment.
 */
int flux_cmd_add_channel (flux_cmd_t *cmd, const char *name);

/*
 *  Set generic string options for command object `cmd`. As with environment
 *   variables, this function adds the option `var` to with value `val` to
 *   the options array for this command. This can be used to enable optional
 *   behavior for executed processes (e.g. setpgrp(2))
 *
 *   XXX: Is this needed?
 */
int flux_cmd_setopt (flux_cmd_t *cmd, const char *var, const char *val);
const char *flux_cmd_getopt (flux_cmd_t *cmd, const char *var);



/*
 *  Subprocesses:
 */

/*
 *  Asynchronously create a new subprocess described by command object
 *   `cmd` on Flux rank `rank`. Callbacks in `ops` structure that are
 *   non-NULL will be called to process state changes, I/O, and
 *   completion.
 *
 *  If `rank` is -1, then the subprocess is directly forked and executed
 *   from the current process.
 *
 *  This function may return NULL (with errno set) on invalid argument(s)
 *   (EINVAL), or failure of underlying Flux messaging calls. Otherwise,
 *   a valid subprocess object is returned, though there is no guarantee
 *   the subprocess has started running anywhere by the time the call returns.
 *
 */
flux_subprocess_t * flux_rexec (flux_t *h, int rank, int flags,
                                const flux_cmd_t *cmd,
                                flux_subprocess_ops_t *ops);

flux_subprocess_t * flux_exec (flux_reactor_t *r, int flags,
                               const flux_cmd_t *cmd,
                               flux_subprocess_ops_t *ops);

/*
 *  Write data to stdin buffer of subprocess `p`.
 *
 *  Returns the total amount of data successfully buffered.
 */
int flux_subprocess_write (flux_subprocess_t *p, char *buf, size_t len);

/*
 *  Close stdin buffer of subprocess `p` and schedule EOF to be sent
 */
int flux_subprocess_close (flux_subprocess_t *p);

/*
 *  Read up to `len` bytes of unread data from stream `stream`. Returns
 *   number of bytes read or -1 with errno set (EAGAIN if no data to read).
 *
 *   A return value of 0 indicates that the subprocess has closed this stream.
 */
int flux_subprocess_read (flux_subprocess_t *p, int stream,
                          char *buf, size_t len);


/*
 *  Read specified number of `lines` of data from `stream` of subprocess
 *   `p` into the destination buffer `buf`. If `lines` is -1 the maximum
 *   number of lines that `buf` can hold is read from the process.
 *
 *  Returns number of bytes read on success, 0 if the number of lines is
 *   not available, or -1 with errno set on error.
 */
int flux_subprocess_read_line (flux_subprocess_t *p, int stream,
                               char *buf, size_t len, int lines);

/*
 *  Create RPC to send signal `signo` to subprocess `p`.
 *  This call returns a flux_future_t. Use flux_future_then(3) to register
 *   a continuation callback when the kill operation is complete, or
 *   flux_future_wait_for(3) to block until the kill operation is complete.
 */
flux_future_t * flux_subprocess_kill (flux_subprocess_t *p, int signo);

/*
 *  Add/remove a reference to subprocess object `p`. The subprocess object
 *   is destroyed once the last reference is removed.
 */
void flux_subprocess_ref (flux_subprocess_t *p);
void flux_subprocess_unref (flux_subprocess_t *p);
#define flux_subprocess_destroy(x) flux_subprocess_unref(x)


/*  Return the command object associated with subprocess `p`.
 */
flux_cmd_t *flux_subprocess_get_cmd (flux_subprocess_t *p);

/*  Return caller side of subprocess channel `name`, if one exists.
 */
int flux_subprocess_channelfd (flux_subprocess_t *p, const char *name);

/*
 *  Set arbitrary context `ctx` with name `name` on subprocess object `p`.
 */
void flux_subprocess_set_context (flux_subprocess_t *p,
                  const char *name, void *ctx);

/*
 *  Return pointer to any context associated with `p` under `name`. If
 *   no such context exists, then NULL is returned.
 */
void *flux_subprocess_get_context (flux_subprocess_t *p, const char *name);

const char * flux_subprocess_state_string (flux_subprocess_t *p);

bool flux_subprocess_exited (flux_subprocess_t *p);

int flux_subprocess_exit_status (flux_subprocess_t *p);

int flux_subprocess_exit_code (flux_subprocess_t *p);

int flux_subprocess_signaled (flux_subprocess_t *p);

pid_t flux_subprocess_pid (flux_subprocess_t *p);

flux_reactor_t * flux_subprocess_get_reactor (flux_subprocess_t *p);


#endif /* !_FLUX_CORE_SUBPROCESS_H */

Most helpful comment

I think the GNU project does that on purpose to make you use the texinfo pages!

All 52 comments

Updated subprocess.h to the latest version I have in my working copy.

BTW, I do have a lot of the flux_cmd_t implemented with tests, but all that code is pretty trivial really.

Some other items for discussion I thought of while going through this again:

  • This API introduces the idea of a generic "channel" for subprocesses which are presented as a client side read/write file descriptor. I'm not sure if it would be a cleaner design to just have the stdio streams for a subprocess be built in versions of these channels (i.e. named "stdin" "stdout" "stderr"). The drawback is extra copying through the pipe (However, on Linux at least we do have splice(2)), and loss of magic internal line buffering in the client side.

  • The subprocess accessors flux_subprocess_exit*, signaled, pid, etc feels messy, a way to get the rank of a running process is missing, etc. There must be a better way -- perhaps get_item interface would be more extensible, or something entirely different.

  • Subprocess state should be more explicit, with a (small) flux_subprocess_state_t enumeration and flux_subprocess_state_string (flux_subprocess_state_t s) instead of what's there now. (States are minimal, perhaps starting, failed, running, exited is all we need.)

  • Need to add a way to interpret exit status, or do we just document to use the macros in sys/wait.h?

I've pushed a branch with my old work on the libsubprocess redesign onto my subprocess branch here. There really isn't too much there -- there's an implementation of the new flux_cmd_t abstraction, and a partial implementation of local subprocess execution with some minor tests -- none of the remote subprocess handing is there yet.

(So really what is there isn't too helpful I fear)

The intent will be to expose subprocess.h as part of libflux API, so we should make another pass through the API to ensure it has a nice interface.

Began implementing the "local" part of this new libsubprocess and hit some gotchas.

1) flux_subprocess_read and flux_subprocess_read_line don't internally map to the way reads are done in flux_buffer_t, so it incurs an extra copy of data from one buffer to the next.

Should we adjust flux_subprocess_read? On the one hand, the API of flux_subprocess_read seems more natural and common. Perhaps flux_buffer_read() should be adjusted to be more normal/common?

2) when an I/O callback for stdout/stderr is not specified, what should behavior be? In the old subprocess, stdout/stderr is simply output to stdout/stderr. But in this API that feels wrong. For the time being, I just close STDOUT_FILENO and/or STDERR_FILENO in the child process and assume the stdout/stderr goes no where.

3) I noticed there is a race in the reactor in terms of the order/quickness of recognizing stdout & child exit in a simple /bin/echo foobar test. The race leads to a question if the user should see the STDOUT_FILENO pipe closed or not. I think we want it to be consistent and the user always gets the closed notification or never gets it. So which one? I'm leaning to never sending it to the user if the child has already exited.

Thanks @chu11!

Should we adjust flux_subprocess_read? On the one hand, the API of flux_subprocess_read seems more natural and common. Perhaps flux_buffer_read() should be adjusted to be more normal/common?

I'm not sure -- and I have to admit I don't think I put too much thought on the design of the new interface for reading from/writing to subprocesses. So any change that makes sense here is fine with me! We definitely want to avoid extra copies.

Another idea (as you noted before), would be to drop subprocess_read/write and instead allow stdout, stderr, and stdin fds to be fetched, and allow buffered IO directly using the reactor based IO calls you wrote. The existing flux_subprocess_channelfd call could be used, e.g. flux_subprocess_channelfd (sp, "stdout"). This approach would definitely add copies, however, this could be mitigated on linux with the vmsplice(2) and friends system call. Anyway just throwing the idea out there for consideration.

when an I/O callback for stdout/stderr is not specified, what should behavior be? In the old subprocess, stdout/stderr is simply output to stdout/stderr. But in this API that feels wrong. For the time being, I just close STDOUT_FILENO and/or STDERR_FILENO in the child process and assume the stdout/stderr goes no where.

Hm, yeah good observation. Maybe we could just say that there are default handlers for on_stdout and on_stderr and that they copy data to the current process stdout and stderr streams? Or we could require those callbacks be registered and provide some built-in functions in the interface that copy to current process's stdio streams?

I noticed there is a race in the reactor in terms of the order/quickness of recognizing stdout & child exit in a simple /bin/echo foobar test. The race leads to a question if the user should see the STDOUT_FILENO pipe closed or not. I think we want it to be consistent and the user always gets the closed notification or never gets it. So which one? I'm leaning to never sending it to the user if the child has already exited.

This is actually a common case, and IIRC is the reason for the "completion" callback. For a short-lived process that generates a lot of output, the output will always be a lot slower to process than the exit status. So, in order to simplify the interface, a process isn't "complete" until it has exited and stdout/stderr streams have had their EOFs read.

I see now this is a bit more complicated to implement with this interface (where output has to be read() from the process), than the previous interface (where output was pushed to the user). However, the same idea holds, the on_completion callback should be called when we've completed reading all the data from the process (we've handled a terminal state, and all output). This allows the interface to handle the raciness you're talking about (if I understood correctly) rather than the user having to work around it.

So any change that makes sense here is fine with me! We definitely want to avoid extra copies.

ok, sounds good!

Or we could require those callbacks be registered and provide some built-in functions in the interface that copy to current process's stdio streams?

I like this idea. In hindsight, having an option where the user and discard stdout/stderr seems like a non-good default :P

So, in order to simplify the interface, a process isn't "complete" until it has exited and stdout/stderr streams have had their EOFs read.

Sounds good.

Per some discussion above, I added a flux_subprocess_state() function that current supports the states of INIT, STARTED, RUNNING, FAILED, EXITED, and COMPLETION.

But now that I'm coding up unit tests, I realize there's some things in the API above that are sort of wonky.

1) The function flux_subprocess_exited () is no longer needed. Easy enough. Just check that the state is EXITED or COMPLETION.

2) Is there a need for both of the callbacks?

flux_subprocess_f on_completion;   /* Process exited and all I/O complete */
flux_subprocess_f on_state_change; /* Any process state change            */

Shouldn't state_change be enough? Or is state change for a subset of states?

3) I assumed the FAILED state is only for when an exec() error occurs. flux_exec() returns to the user with an error if exec() fails, so is there a need for the FAILED state? Or the STARTED state for that matter? In the old libsubprocess, the fork() and exec() functions were split up, which made more sense in that old API.

Is there a need for both of the callbacks?
shouldn't state_change be enough? Or is state change for a subset of states?

I hadn't been thinking of completed as a "state" of a subprocess. The state of a process is starting running, exited or failed. The complete state is when the subprocess has reached a terminal state and all IO has been processed.

The two are separate more for ergonomics than anything else. Handling the "complete" state is more common than handling other internal states, and having a specific handler for it allows for a nice simple completion handler callback, rather than requiring every user of the interface to add a if (state == COMPLETE) to their handler (or in this case a if/then/else cascade in most cases)

The COMPLETION state should also probably be called COMPLETE or COMPLETED to match the tense of the other states.

The function flux_subprocess_exited () is no longer needed.

I have to think about that one. It might be convenient to have a nice clear check for whether the process exited, but I'm fine leaving it out for now -- it is simple to add convenience functions later if we find ourselves adding the same compound conditionals again and again.

flux_exec() returns to the user with an error if exec() fails, so is there a need for the FAILED state?

Good question, that might no longer be needed. I'd have to think about how it would work for a remote execution though, the flux_rexec should return immediately and get the error asynchronously, but I'm not saying I know the most clear way to do that...

I hadn't been thinking of completed as a "state" of a subprocess. The state of a process is starting running, exited or failed. The complete state is when the subprocess has reached a terminal state and all IO has been processed.

Ahh, I see. You viewed it as the state of the process itself, I was more thinking of the state of the uhh "data structure" flux_subprocess_t. What you were thinking makes more sense now.

Good question, that might no longer be needed.

Yeah, it made me ponder. If we don't need FAILED, then we probably don't need STARTED either. Then we probably don't need on_state_change, the callback just becomes on_exit. I'll leave for now, b/c perhaps something will become more obvious when handling rexec.

pushed a preliminary libsubprocess with local support in PR #1564 just for initial review.

Initially I implemented things with the assumption that stdin/out/err would use the subprocess_read/write() calls, but channel fds would only use fds. More or less just following what the initial API design had.

But as I wrote up code & unit tests, this is a bit inconsistent and perhaps not optimal. So it got me thinking:

A) do we only want to support one mechanism or both for reading/writing?

B) personally, I like prefer the subprocess_read/write() mechanism. I like the idea of abstracting the underlying fds and users simply get a read/write set of functions. Looking through the minimal code out there that uses subprocess_socketpair() from the old libsubprocess, I think this would be ok. But ... could there be a use case going forward that requires fds?

C) if we got to an all "fd" model, I think I'd have to do some buffer reactor work. There is currently no read & write buffer reactor (i.e. data comes in one fd, gets buffered, then gets sent out the other fd). And with channels, I'd probably have to create a composite that allows data to go both ways. (It's worth noting that my initial channel fd support is just like the old libsubprocess support, it's just a socketpair. No buffering in the middle.)

Good questions. I had also initially thought that users of subprocess "channels" would need an fd on their end, but now that you mention it I can't think of why that would be.. Making stdout/err handled the same as channels (either way you do it) would greatly simplify the interface I agree.

I would be fine with changing how channels are handled. If a user needs an fd, that could always be implemented via a pipe backed by buffered reader/writer (at the expense of extra copies), or someone could enhance the interface to present an fd instead of using subprocess_read/write.

Ok, lets stick with just the subprocess_read/write() model. I'll have to think about how to alter the API a tad as well as the internals. For example, should

const char *flux_subprocess_read (flux_subprocess_t *p, int stream, int len, int *lenp);

be changed to

const char *flux_subprocess_read (flux_subprocess_t *p, const char *name, int len, int *lenp);

If we consolidate it all into the same model, I'm sure the internal code can be cleaned up quite a bit too.

A thought I had over the weekend, should buffer size be a configuration option? With stdin/stdout a reasonable default is fine, but with channel fds, who knows what a user may want to do.

Sure, maybe a setopt/getopt type option to set buffer for a named "channel"?

I refactored today to have libsubprocess use subprocess_read/write() for both channels and stdin/stdout. With a little more refactoring, I think I'll have stdin/stdout simply be special case channels in which they are write/read only. It'll clean up the code a lot.

One thing I dislike a tiny bit is the API now uses a const char *stream argument in the read and write calls. For example:

const char *flux_subprocess_read (flux_subprocess_t *p,
                                  const char *stream,
                                  int len, int *lenp);

So you'd pass in "stdout" or "mychannel" as the stream name.

It's a tad annoying to pass around a char * everywhere. But it ended up not being as bad as I thought, using NULL for "stdin" and "stdout" as defaults. I contemplated a flux_subprocess_stream_t kind of abstraction being passed around (it would avoid some strcmp and a zhash_lookup() calls internally), but I think that's overkill for now.

A few thoughts:

1) Do functions like flux_subprocess_pid() have a purpose once we support remote execution? Seems, unnecessary? Hopefully pid's can be abstracted away via the interface.

2) Should we require callers to always set env and cwd within a command? Otherwise what should things default to? empty environment or parent environment (latter makes less sense on remote execution). How about current working directory?

Good thoughts!

Local pid will be necessary for users like the job shell, which may need to move pid between cgroups, call ptrace etc. However, maybe a remote subprocess should return EINVAL?

Currently env, cwd default to the env and cwd of the parent. Since libsubprocess is an "instance owner" library, maybe that is ok? I'm not sure, but it is convenient not to set a huge environment if you don't need to...

@grondo I'm curious as to the need for the envz_strip() call below. Is there a specific reason to remove environment variables without values? It seems just as harmless to leave them there.

char **subprocess_env_expand (struct subprocess *p)                                                                                    
{                                                                                                                                      
    envz_strip (&p->envz, &p->envz_len);                                                                                               
    return (expand_argz (p->envz, p->envz_len));                                                                                       
}       

I don't remember any specific reason for adding the call to envz_strip. However, I don't think envz_strip removes variables without values (e.g. FOO=), but instead removes null entries (entries without an '='). The documentation seems to have conflicting information about it. I'm not even sure how null entries would get into a subprocess envz array, so I think I'd be ok with removing it.

It might be illustrative to see what happens if expand_argz() is called on and envz array with a null entry.

Yeah, did a little experiment and it doesn't seem that envz_strip() does what the documentation states. Some of the function prototypes in the Linux manpage weren't even correct, ugh ...

I think the GNU project does that on purpose to make you use the texinfo pages!

Just realized a race issue in the API

    if (!(p = flux_exec (x->r, 0, cmd, &ops)))                                                                                         
        goto error;                                                                                                                    
    if (!(copy = flux_msg_copy (msg, true)))                                                                                           
        goto error;                                                                                                                    
    if (flux_subprocess_set_context (p, "msg", (void *) copy) < 0)                                                                     
        goto error;                                                                                                                    

The old api had a fork & exec call, so we could guarantee contexts were set before a callback from a subprocess was executed. But now, since we've abstracted fork & exec into one call, the contexts being set in a subprocess could race with a callback that needs it.

Obviously, could move contexts into flux_cmd_t, but that seems wrong. As the reusability of the flux_cmd_t is an important part of the design.

Could have some type of "delay exec until I say I'm ready" kind of flag and then a flux_go_forth_and_exec() function when I'm done. But that seems ugly.

I sort of wonder if there should be a flux_contexts_t kind of data structure where we can stick stuff and pass that into a flux_exec() call. But that seems excessive.

Yes, it does seem like we might have to allow creation of a flux_subprocess_t placeholder, then use that to exec, but that isn't quite as nice a design IMO.

I can't remember, but if callbacks can only be run after you enter the reactor, that would resolve many potential race conditions. (One of the nice things about reactor driven code)

I completely forgot about, the reactor won't be run until later, so the callbacks shouldn't be generated until later. So there shouldn't be a race issue.

doh, I spoke too soon. many callbacks won't be called until the reactor is run or re-entered (such as stdout/stderr), but the "state change" callback can be called. Hmmm.

Maybe that should be delayed and called under prep or check only?

One of the issues with that approach is b/c things are abstracted into just a flux_exec() call now, effectively we will have a state change of INIT -> START -> RUNNING (or INIT -> START -> FAIL) before flux_exec() returns. So if a exec fail occurs, flux_exec() can return exec fail and an errno, but never get the callback if we delay it into a reactor.

We could say state change callbacks are only guaranteed when entering the reactor, i.e. with remote execution. With local you may not get the state change? But that seems wrong and inconsistent.

As much as I hate to do it, I'm thinking a flux_subprocess_create(flux_cmd_t *cmd, flux_opts_t *ops) is necessary and then flux_exec(flux_subprocess_t *p) call is probably the right approach.

One of the issues with that approach is b/c things are abstracted into just a flux_exec() call now, effectively we will have a state change of INIT -> START -> RUNNING (or INIT -> START -> FAIL) before flux_exec() returns.

I'm not sure it makes sense to return a flux_subprocess_t for a local flux_exec() that failed.

We could say state change callbacks are only guaranteed when entering the reactor, i.e. with remote execution. With local you may not get the state change? But that seems wrong and inconsistent.

We could just say that the state change of "FAILED" is only valid for remote processes. Failed local processes are never associated with a flux_subprocess_t object.

I think it is ok to note that the callbacks are reactor callbacks, and like other "watchers" you can't get the callback if you don't enter the reactor. It doesn't seem that useful to use the "running" callback anyway for locally executed processes.

I could just be naive though since I'm not actually working on the code.

We could just say that the state change of "FAILED" is only valid for remote processes. Failed local processes are never associated with a flux_subprocess_t object.

I think that's a pretty good idea.

An idea I just had, we could make it that flux_exec() does not return an exec error. It only does so within a state change callback? But API use wise, that's quite inconvenient for local processes.

I can't remember, did we decide to go with a flux_exec and flux_rexec calls, or just a flux_exec() with rank == -1 to fork local processes?

For the time being is flux_exec() and flux_rexec().

Hmmm, if we put state changes into a reactor it's possible that multiple states may have passed by the time the first on_state_change callback is called. For example, we may have moved from INIT -> STARTED -> RUNNING, so the first state change a user would see is RUNNING.

Do we want them to see STARTED & RUNNING? Would just take some logic to know what was given to the user thus far. However, with the flux_subprocess_state() function, what state should it return? The notified state? Or the real current state?

Given our API where fork & exec are abstracted behind the scenes, I'm thinking the state STARTED perhaps needs to be dropped. The states should be INIT (which user will never see), RUNNING, FAILED, & EXITED only.

That sounds fine to me

What about with missed states? For example a subprocess could run & exit before a user enters the reactor.

I'm thinking we modify the on_state_change callback to also return the state transitioned to (i.e. flux_state_change_callback (flux_subprocess_t *p, flux_subprocess_state_t state), to always notify the caller which specific transition has occurred,

B/c of raciness, there is always a chance that the state returned in the on_state_change callback is different than what the current state is via flux_subprocess_state() call, which always returns the current state.

How would the state change without reentering the reactor? No child watcher could fire to update the state if we're in the on_state_change handler, could it? There is only one thread of execution... I'm probably missing something simple.

Oh I think I understand now. If you have two callbacks queued up for example.

I think your introduction of a state argument is a good one.

We could also take a look at the bigger picture -- maybe a generic state change callback isn't the right approach?

yeah, that's what I mean. Like in the background a subprocess could go from START -> RUNNING -> EXIT before the first on_state_change callback call. So what should the user see in that callback?

yeah, they should get each callback (probably), in which case the callback should be passed the state for which the callback is being invoked. For now, your state parameter to the callback seems like a good approach.

just starting to implement stdin/channel writes for remote execution, wondering on a few design thoughts:

1) I've ignored dealing with a "subprocess manager" up to this point, but think I can no longer avoid it.

I began pondering the int flux_subprocess_server_start (flux_t *h, const char *prefix); prototype written above. I assume the initial idea was to hide most of original cmb.exec into the subprocess library? That way both the client & server protocol is abstracted in one library (vs. a library + a broker module), which is a good idea. Just want to make sure we're on the same page (up to this point I created a new broker module to implement the remote exec side of the protocol).

2) It appears the old cmb.exec returns errors via the normal flux_respond(h, msg, errno, NULL). flux-exec simply captures all errors and outputs that an error occurs, but doesn't have pid or any other information beyond that.

This seems less than ideal. If (for example) there is a protocol error in the "stdin" portion of code, there is no way of knowing which pid it occurred on. So I was wondering how this information could be returned to the caller.

One idea I had, was to overload the FLUX_SUBPROCESS_FAILED state. That instead of this state only encompassing that exec() errors occurred, it could encompass any error occurring on the remote side. Or if we didn't want to overload that state, it could be split into multiple states (EXEC_ERROR, WRITE_ERROR, etc.).

I assume the initial idea was to hide most of original cmb.exec into the subprocess library? That way both the client & server protocol is abstracted in one library (vs. a library + a broker module), which is a good idea. Just want to make sure we're on the same page (up to this point I created a new broker module to implement the remote exec side of the protocol).

Yes. the "cmb.exec" like code would move into libsubprocess.

it appears the old cmb.exec returns errors via the normal flux_respond(h, msg, errno, NULL). flux-exec simply captures all errors and outputs that an error occurs, but doesn't have pid or any other information beyond that.

Yes, as you've discovered, the exec protocol didn't have any up-front design. This time we should design it (perhaps an RFC? Though that may be overkill). Now there are nice features like multiple rpc responses built into libflux API that were not around before, so likely we can do a much better job.

While writing some more tests today I realized a subtlety. In the old libsubprocess and cmb.exec an exit code from a exec() failure could be propogated back to the user (both locally and from a remote subprocess).

But with the singular function flux_exec() now, we can't do that:

if (!(p = flux_exec (...))) {
   /// handle error here
}

Without the p data structure, we have no info on the exit status.

There's many ways to work around this fact, but wondering how important this need is?

Just to be clear, we have the errno (i.e. EPERM, etc.) from an exec() error and will always have that. We just won't have the exit code. Likewise with remote processes. We can send back the errno, but not the exit code.

There's many ways to work around this fact, but wondering how important this need is?

Most likely, it is not too important. The separation of the creation of a subprocess structure and its execution in the original libsubprocess seemed cumbersome after awhile, without much benefit. As long as the caller can differentiate permission denied, no such file or directory, etc. I think your solution is better. (We'd want good documentation of common errors and their corresponding errno, along with tests, but other than that I can't think of any problem with the current apporach)

I realized something subtlety dumb last night. If flux_subprocess_server_start() takes a prefix, then that means the client side (i.e. flux_rexec()) should also take a prefix, otherwise all of the rpc calls are making assumptions on the rpc topic strings.

But passing a prefix to flux_rexec() seems ugly.

Perhaps server start should just assume cmb? We can support a prefix later if we really need to?

Or was there some use case down then line that I don't see?

Good observation. I think the intention was to allow users of the API to start their own local servers directly, so it would be nice to keep the "prefix" argument if it isn't too much trouble. At first, subprocess servers running under a different service topic string can be targeted by use of the underlying documented "exec" protocol directly (e.g. perhaps a job shell will make use of this at some point?). If there is some reason this becomes common practice, perhaps a different version of flux_rexec() could be added to the API which includes a topic string argument. (We might want to organize the internal code like this to begin with, since it would help to be able to unit test the subprocess server anyway)

Does that make any sense?

ok, that makes sense. We can keep the code as is, and perhaps we'll add a flux_rexec_target() or something like that in the future.

99% complete w/ #1564 is passing now. One nit design question.

Originally we had flux_exec() for local forks, and flux_rexec() for remote processes. If the user specified a rank of -1 to flux_rexec(), it would call flux_exec(). This is convenient b/c perhaps the user would like to pass a flux_t *h so things get logged to the flux log.

Then I hit a gotcha, FLUX_NODEID_ANY == -1. And I also added support for FLUX_NODEID_UPSTREAM (-2 I think).

So now, in order to run locally and pass in a flux_t *h, you have to call flux_rexec() with < -2. This is stupid. Wondering what is better:

  • add an optional flux_t *h to flux_exec(), which is only used for logging? Ok, but sort of ugly.

  • add a new flux_handle_exec() (maybe I need a better function name), that is identical to flux_exec() but takes a flux_t *h. But I hate to add another function.

  • add a flux_set_args() (or something like that) that sets a h for logging?

  • or a better idea?

going back and forth, I think a flux_handle_exec() would be best.

I think flux_handle_exec() is fine, but just as an alternate idea to throw out there, we could instead add a flux_t * argument to flux_exec() and have a flux_local_exec() that uses a reactor instead of a handle. My opinion would depend on which type of call would be more common...

I had considered that too, but it was even use (discounting tests). But now that you mention it, I think "local" as a function name sounds better/makes more sense than "handle".

1564 merged in, but a few cleanups left to do.

  • [ ] collapse on_channel_out, on_stdout, and on_stderr into one function (presumably on_output).
  • [ ] Default stdout/stderr to go to parent stdout/stderr, resulting in flux_subprocess_output() no longer needing to be a public function.
  • [x] flux_subprocess_destroy() should go back to its original, a #define pointing to flux_subprocess_unref(). Any code that requires a destroy function taking a void * should create a wrapper destroy function.
  • [x] cron module should be cleaned up given subprocess conversion. Should also cache current working directory on module load.
  • [x] channel environment variables should not be made by appending _FD to them. Users should simply define the channel name to be the environment variable they want it to be.

I realized that bullets 1 & 2 in the above cleanup list sort of conflict with each other.

If I collapse on_channel_out, on_stdout, and on_stderr into one on_output callback, what if the user does not specify the on_output callback? Should all output (including channel) be dumped to parent stdout/stderr? This doesn't quite seem right.

So I think only one of them can really be done. I'm leaning towards bullet 1. Collapsing those callbacks, and leaving the flux_subprocess_output() function for the common dump to stdio case. Although I'll probably rename the function to something else.

After going back and forth, I'm now not convinced that there is an obvious best design between bullet 1, bullet 2, or the current code. Each has some mix of pros and cons. I've decided to keep the current code as the one to keep, as I consider requiring the user to set a callback to get output (vs having some default output) is the least bad choice.

However, I want to rename flux_subprocess_output(), since I think having a convenience function prefixed with flux_subprocess isn't a good idea.

Since only one final cleanup is necessary here, I'm going to consider this issue now completed.

Was this page helpful?
0 / 5 - 0 ratings