mirror of
https://github.com/git/git.git
synced 2026-01-11 13:23:12 +09:00
run-command: add stdin callback for parallelization
If a user of the run_processes_parallel() API wants to pipe a large amount of information to the stdin of each parallel command, that data could exceed the pipe buffer of the process's stdin and can be too big to store in-memory via strbuf & friends or to slurp to a file. Generally this is solved by repeatedly writing to child_process.in between calls to start_command() and finish_command(). For a specific pre-existing example of this, see transport.c:run_pre_push_hook(). This adds a generic callback API to run_processes_parallel() to do exactly that in a unified manner, similar to the existing callback APIs, which can then be used by hooks.h to convert the remaining hooks to the new, simpler parallel interface. Signed-off-by: Emily Shaffer <emilyshaffer@google.com> Signed-off-by: Ævar Arnfjörð Bjarmason <avarab@gmail.com> Signed-off-by: Adrian Ratiu <adrian.ratiu@collabora.com> Signed-off-by: Junio C Hamano <gitster@pobox.com>
This commit is contained in:
parent
56cef1e504
commit
23a720e96b
@ -1490,6 +1490,16 @@ static int child_is_working(const struct parallel_child *pp_child)
|
|||||||
return pp_child->state == GIT_CP_WORKING;
|
return pp_child->state == GIT_CP_WORKING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int child_is_ready_for_cleanup(const struct parallel_child *pp_child)
|
||||||
|
{
|
||||||
|
return child_is_working(pp_child) && !pp_child->process.in;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int child_is_receiving_input(const struct parallel_child *pp_child)
|
||||||
|
{
|
||||||
|
return child_is_working(pp_child) && pp_child->process.in > 0;
|
||||||
|
}
|
||||||
|
|
||||||
struct parallel_processes {
|
struct parallel_processes {
|
||||||
size_t nr_processes;
|
size_t nr_processes;
|
||||||
|
|
||||||
@ -1659,6 +1669,44 @@ static int pp_start_one(struct parallel_processes *pp,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void pp_buffer_stdin(struct parallel_processes *pp,
|
||||||
|
const struct run_process_parallel_opts *opts)
|
||||||
|
{
|
||||||
|
/* Buffer stdin for each pipe. */
|
||||||
|
for (size_t i = 0; i < opts->processes; i++) {
|
||||||
|
struct child_process *proc = &pp->children[i].process;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
if (!child_is_receiving_input(&pp->children[i]))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* child input is provided via path_to_stdin when the feed_pipe cb is
|
||||||
|
* missing, so we just signal an EOF.
|
||||||
|
*/
|
||||||
|
if (!opts->feed_pipe) {
|
||||||
|
close(proc->in);
|
||||||
|
proc->in = 0;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Feed the pipe:
|
||||||
|
* ret < 0 means error
|
||||||
|
* ret == 0 means there is more data to be fed
|
||||||
|
* ret > 0 means feeding finished
|
||||||
|
*/
|
||||||
|
ret = opts->feed_pipe(proc->in, opts->data, pp->children[i].data);
|
||||||
|
if (ret < 0)
|
||||||
|
die_errno("feed_pipe");
|
||||||
|
|
||||||
|
if (ret) {
|
||||||
|
close(proc->in);
|
||||||
|
proc->in = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void pp_buffer_stderr(struct parallel_processes *pp,
|
static void pp_buffer_stderr(struct parallel_processes *pp,
|
||||||
const struct run_process_parallel_opts *opts,
|
const struct run_process_parallel_opts *opts,
|
||||||
int output_timeout)
|
int output_timeout)
|
||||||
@ -1729,6 +1777,7 @@ static int pp_collect_finished(struct parallel_processes *pp,
|
|||||||
pp->children[i].state = GIT_CP_FREE;
|
pp->children[i].state = GIT_CP_FREE;
|
||||||
if (pp->pfd)
|
if (pp->pfd)
|
||||||
pp->pfd[i].fd = -1;
|
pp->pfd[i].fd = -1;
|
||||||
|
pp->children[i].process.in = 0;
|
||||||
child_process_init(&pp->children[i].process);
|
child_process_init(&pp->children[i].process);
|
||||||
|
|
||||||
if (opts->ungroup) {
|
if (opts->ungroup) {
|
||||||
@ -1763,6 +1812,27 @@ static int pp_collect_finished(struct parallel_processes *pp,
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void pp_handle_child_IO(struct parallel_processes *pp,
|
||||||
|
const struct run_process_parallel_opts *opts,
|
||||||
|
int output_timeout)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* First push input, if any (it might no-op), to child tasks to avoid them blocking
|
||||||
|
* after input. This also prevents deadlocks when ungrouping below, if a child blocks
|
||||||
|
* while the parent also waits for them to finish.
|
||||||
|
*/
|
||||||
|
pp_buffer_stdin(pp, opts);
|
||||||
|
|
||||||
|
if (opts->ungroup) {
|
||||||
|
for (size_t i = 0; i < opts->processes; i++)
|
||||||
|
if (child_is_ready_for_cleanup(&pp->children[i]))
|
||||||
|
pp->children[i].state = GIT_CP_WAIT_CLEANUP;
|
||||||
|
} else {
|
||||||
|
pp_buffer_stderr(pp, opts, output_timeout);
|
||||||
|
pp_output(pp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void run_processes_parallel(const struct run_process_parallel_opts *opts)
|
void run_processes_parallel(const struct run_process_parallel_opts *opts)
|
||||||
{
|
{
|
||||||
int i, code;
|
int i, code;
|
||||||
@ -1782,6 +1852,13 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
|
|||||||
"max:%"PRIuMAX,
|
"max:%"PRIuMAX,
|
||||||
(uintmax_t)opts->processes);
|
(uintmax_t)opts->processes);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Child tasks might receive input via stdin, terminating early (or not), so
|
||||||
|
* ignore the default SIGPIPE which gets handled by each feed_pipe_fn which
|
||||||
|
* actually writes the data to children stdin fds.
|
||||||
|
*/
|
||||||
|
sigchain_push(SIGPIPE, SIG_IGN);
|
||||||
|
|
||||||
pp_init(&pp, opts, &pp_sig);
|
pp_init(&pp, opts, &pp_sig);
|
||||||
while (1) {
|
while (1) {
|
||||||
for (i = 0;
|
for (i = 0;
|
||||||
@ -1799,13 +1876,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
|
|||||||
}
|
}
|
||||||
if (!pp.nr_processes)
|
if (!pp.nr_processes)
|
||||||
break;
|
break;
|
||||||
if (opts->ungroup) {
|
pp_handle_child_IO(&pp, opts, output_timeout);
|
||||||
for (size_t i = 0; i < opts->processes; i++)
|
|
||||||
pp.children[i].state = GIT_CP_WAIT_CLEANUP;
|
|
||||||
} else {
|
|
||||||
pp_buffer_stderr(&pp, opts, output_timeout);
|
|
||||||
pp_output(&pp);
|
|
||||||
}
|
|
||||||
code = pp_collect_finished(&pp, opts);
|
code = pp_collect_finished(&pp, opts);
|
||||||
if (code) {
|
if (code) {
|
||||||
pp.shutdown = 1;
|
pp.shutdown = 1;
|
||||||
@ -1816,6 +1887,8 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
|
|||||||
|
|
||||||
pp_cleanup(&pp, opts);
|
pp_cleanup(&pp, opts);
|
||||||
|
|
||||||
|
sigchain_pop(SIGPIPE);
|
||||||
|
|
||||||
if (do_trace2)
|
if (do_trace2)
|
||||||
trace2_region_leave(tr2_category, tr2_label, NULL);
|
trace2_region_leave(tr2_category, tr2_label, NULL);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -420,6 +420,21 @@ typedef int (*start_failure_fn)(struct strbuf *out,
|
|||||||
void *pp_cb,
|
void *pp_cb,
|
||||||
void *pp_task_cb);
|
void *pp_task_cb);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This callback is repeatedly called on every child process who requests
|
||||||
|
* start_command() to create a pipe by setting child_process.in < 0.
|
||||||
|
*
|
||||||
|
* pp_cb is the callback cookie as passed into run_processes_parallel, and
|
||||||
|
* pp_task_cb is the callback cookie as passed into get_next_task_fn.
|
||||||
|
*
|
||||||
|
* Returns < 0 for error
|
||||||
|
* Returns == 0 when there is more data to be fed (will be called again)
|
||||||
|
* Returns > 0 when finished (child closed fd or no more data to be fed)
|
||||||
|
*/
|
||||||
|
typedef int (*feed_pipe_fn)(int child_in,
|
||||||
|
void *pp_cb,
|
||||||
|
void *pp_task_cb);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This callback is called on every child process that finished processing.
|
* This callback is called on every child process that finished processing.
|
||||||
*
|
*
|
||||||
@ -473,6 +488,12 @@ struct run_process_parallel_opts
|
|||||||
*/
|
*/
|
||||||
start_failure_fn start_failure;
|
start_failure_fn start_failure;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* feed_pipe: see feed_pipe_fn() above. This can be NULL to omit any
|
||||||
|
* special handling.
|
||||||
|
*/
|
||||||
|
feed_pipe_fn feed_pipe;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* task_finished: See task_finished_fn() above. This can be
|
* task_finished: See task_finished_fn() above. This can be
|
||||||
* NULL to omit any special handling.
|
* NULL to omit any special handling.
|
||||||
|
|||||||
@ -23,19 +23,26 @@ static int number_callbacks;
|
|||||||
static int parallel_next(struct child_process *cp,
|
static int parallel_next(struct child_process *cp,
|
||||||
struct strbuf *err,
|
struct strbuf *err,
|
||||||
void *cb,
|
void *cb,
|
||||||
void **task_cb UNUSED)
|
void **task_cb)
|
||||||
{
|
{
|
||||||
struct child_process *d = cb;
|
struct child_process *d = cb;
|
||||||
if (number_callbacks >= 4)
|
if (number_callbacks >= 4)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
strvec_pushv(&cp->args, d->args.v);
|
strvec_pushv(&cp->args, d->args.v);
|
||||||
|
cp->in = d->in;
|
||||||
|
cp->no_stdin = d->no_stdin;
|
||||||
if (err)
|
if (err)
|
||||||
strbuf_addstr(err, "preloaded output of a child\n");
|
strbuf_addstr(err, "preloaded output of a child\n");
|
||||||
else
|
else
|
||||||
fprintf(stderr, "preloaded output of a child\n");
|
fprintf(stderr, "preloaded output of a child\n");
|
||||||
|
|
||||||
number_callbacks++;
|
number_callbacks++;
|
||||||
|
|
||||||
|
/* test_stdin callback will use this to count remaining lines */
|
||||||
|
*task_cb = xmalloc(sizeof(int));
|
||||||
|
*(int*)(*task_cb) = 2;
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,15 +61,48 @@ static int no_job(struct child_process *cp UNUSED,
|
|||||||
static int task_finished(int result UNUSED,
|
static int task_finished(int result UNUSED,
|
||||||
struct strbuf *err,
|
struct strbuf *err,
|
||||||
void *pp_cb UNUSED,
|
void *pp_cb UNUSED,
|
||||||
void *pp_task_cb UNUSED)
|
void *pp_task_cb)
|
||||||
{
|
{
|
||||||
if (err)
|
if (err)
|
||||||
strbuf_addstr(err, "asking for a quick stop\n");
|
strbuf_addstr(err, "asking for a quick stop\n");
|
||||||
else
|
else
|
||||||
fprintf(stderr, "asking for a quick stop\n");
|
fprintf(stderr, "asking for a quick stop\n");
|
||||||
|
|
||||||
|
FREE_AND_NULL(pp_task_cb);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int task_finished_quiet(int result UNUSED,
|
||||||
|
struct strbuf *err UNUSED,
|
||||||
|
void *pp_cb UNUSED,
|
||||||
|
void *pp_task_cb)
|
||||||
|
{
|
||||||
|
FREE_AND_NULL(pp_task_cb);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int test_stdin_pipe_feed(int hook_stdin_fd, void *cb UNUSED, void *task_cb)
|
||||||
|
{
|
||||||
|
int *lines_remaining = task_cb;
|
||||||
|
|
||||||
|
if (*lines_remaining) {
|
||||||
|
struct strbuf buf = STRBUF_INIT;
|
||||||
|
strbuf_addf(&buf, "sample stdin %d\n", --(*lines_remaining));
|
||||||
|
if (write_in_full(hook_stdin_fd, buf.buf, buf.len) < 0) {
|
||||||
|
if (errno == EPIPE) {
|
||||||
|
/* child closed stdin, nothing more to do */
|
||||||
|
strbuf_release(&buf);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
die_errno("write");
|
||||||
|
}
|
||||||
|
strbuf_release(&buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
return !(*lines_remaining);
|
||||||
|
}
|
||||||
|
|
||||||
struct testsuite {
|
struct testsuite {
|
||||||
struct string_list tests, failed;
|
struct string_list tests, failed;
|
||||||
int next;
|
int next;
|
||||||
@ -157,6 +197,7 @@ static int testsuite(int argc, const char **argv)
|
|||||||
struct run_process_parallel_opts opts = {
|
struct run_process_parallel_opts opts = {
|
||||||
.get_next_task = next_test,
|
.get_next_task = next_test,
|
||||||
.start_failure = test_failed,
|
.start_failure = test_failed,
|
||||||
|
.feed_pipe = test_stdin_pipe_feed,
|
||||||
.task_finished = test_finished,
|
.task_finished = test_finished,
|
||||||
.data = &suite,
|
.data = &suite,
|
||||||
};
|
};
|
||||||
@ -460,12 +501,19 @@ int cmd__run_command(int argc, const char **argv)
|
|||||||
|
|
||||||
if (!strcmp(argv[1], "run-command-parallel")) {
|
if (!strcmp(argv[1], "run-command-parallel")) {
|
||||||
opts.get_next_task = parallel_next;
|
opts.get_next_task = parallel_next;
|
||||||
|
opts.task_finished = task_finished_quiet;
|
||||||
} else if (!strcmp(argv[1], "run-command-abort")) {
|
} else if (!strcmp(argv[1], "run-command-abort")) {
|
||||||
opts.get_next_task = parallel_next;
|
opts.get_next_task = parallel_next;
|
||||||
opts.task_finished = task_finished;
|
opts.task_finished = task_finished;
|
||||||
} else if (!strcmp(argv[1], "run-command-no-jobs")) {
|
} else if (!strcmp(argv[1], "run-command-no-jobs")) {
|
||||||
opts.get_next_task = no_job;
|
opts.get_next_task = no_job;
|
||||||
opts.task_finished = task_finished;
|
opts.task_finished = task_finished;
|
||||||
|
} else if (!strcmp(argv[1], "run-command-stdin")) {
|
||||||
|
proc.in = -1;
|
||||||
|
proc.no_stdin = 0;
|
||||||
|
opts.get_next_task = parallel_next;
|
||||||
|
opts.task_finished = task_finished_quiet;
|
||||||
|
opts.feed_pipe = test_stdin_pipe_feed;
|
||||||
} else {
|
} else {
|
||||||
ret = 1;
|
ret = 1;
|
||||||
fprintf(stderr, "check usage\n");
|
fprintf(stderr, "check usage\n");
|
||||||
|
|||||||
@ -164,6 +164,37 @@ test_expect_success 'run_command runs ungrouped in parallel with more tasks than
|
|||||||
test_line_count = 4 err
|
test_line_count = 4 err
|
||||||
'
|
'
|
||||||
|
|
||||||
|
test_expect_success 'run_command listens to stdin' '
|
||||||
|
cat >expect <<-\EOF &&
|
||||||
|
preloaded output of a child
|
||||||
|
listening for stdin:
|
||||||
|
sample stdin 1
|
||||||
|
sample stdin 0
|
||||||
|
preloaded output of a child
|
||||||
|
listening for stdin:
|
||||||
|
sample stdin 1
|
||||||
|
sample stdin 0
|
||||||
|
preloaded output of a child
|
||||||
|
listening for stdin:
|
||||||
|
sample stdin 1
|
||||||
|
sample stdin 0
|
||||||
|
preloaded output of a child
|
||||||
|
listening for stdin:
|
||||||
|
sample stdin 1
|
||||||
|
sample stdin 0
|
||||||
|
EOF
|
||||||
|
|
||||||
|
write_script stdin-script <<-\EOF &&
|
||||||
|
echo "listening for stdin:"
|
||||||
|
while read line
|
||||||
|
do
|
||||||
|
echo "$line"
|
||||||
|
done
|
||||||
|
EOF
|
||||||
|
test-tool run-command run-command-stdin 2 ./stdin-script 2>actual &&
|
||||||
|
test_cmp expect actual
|
||||||
|
'
|
||||||
|
|
||||||
cat >expect <<-EOF
|
cat >expect <<-EOF
|
||||||
preloaded output of a child
|
preloaded output of a child
|
||||||
asking for a quick stop
|
asking for a quick stop
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user