Merge branch 'ar/run-command-hook'

Use hook API to replace ad-hoc invocation of hook scripts with the
run_command() API.

* ar/run-command-hook:
  receive-pack: convert receive hooks to hook API
  receive-pack: convert update hooks to new API
  hooks: allow callers to capture output
  run-command: allow capturing of collated output
  hook: allow overriding the ungroup option
  reference-transaction: use hook API instead of run-command
  transport: convert pre-push to hook API
  hook: convert 'post-rewrite' hook in sequencer.c to hook API
  hook: provide stdin via callback
  run-command: add stdin callback for parallelization
  run-command: add first helper for pp child states
This commit is contained in:
Junio C Hamano 2026-01-06 16:33:53 +09:00
commit f406b89552
12 changed files with 596 additions and 305 deletions

View File

@ -43,6 +43,12 @@ static int run(int argc, const char **argv, const char *prefix,
if (!argc) if (!argc)
goto usage; goto usage;
/*
* All current "hook run" use-cases require ungrouped child output.
* If this changes, a hook run argument can be added to toggle it.
*/
opt.ungroup = 1;
/* /*
* Having a -- for "run" when providing <hook-args> is * Having a -- for "run" when providing <hook-args> is
* mandatory. * mandatory.

View File

@ -749,7 +749,7 @@ static int check_cert_push_options(const struct string_list *push_options)
return retval; return retval;
} }
static void prepare_push_cert_sha1(struct child_process *proc) static void prepare_push_cert_sha1(struct run_hooks_opt *opt)
{ {
static int already_done; static int already_done;
@ -775,23 +775,23 @@ static void prepare_push_cert_sha1(struct child_process *proc)
nonce_status = check_nonce(sigcheck.payload); nonce_status = check_nonce(sigcheck.payload);
} }
if (!is_null_oid(&push_cert_oid)) { if (!is_null_oid(&push_cert_oid)) {
strvec_pushf(&proc->env, "GIT_PUSH_CERT=%s", strvec_pushf(&opt->env, "GIT_PUSH_CERT=%s",
oid_to_hex(&push_cert_oid)); oid_to_hex(&push_cert_oid));
strvec_pushf(&proc->env, "GIT_PUSH_CERT_SIGNER=%s", strvec_pushf(&opt->env, "GIT_PUSH_CERT_SIGNER=%s",
sigcheck.signer ? sigcheck.signer : ""); sigcheck.signer ? sigcheck.signer : "");
strvec_pushf(&proc->env, "GIT_PUSH_CERT_KEY=%s", strvec_pushf(&opt->env, "GIT_PUSH_CERT_KEY=%s",
sigcheck.key ? sigcheck.key : ""); sigcheck.key ? sigcheck.key : "");
strvec_pushf(&proc->env, "GIT_PUSH_CERT_STATUS=%c", strvec_pushf(&opt->env, "GIT_PUSH_CERT_STATUS=%c",
sigcheck.result); sigcheck.result);
if (push_cert_nonce) { if (push_cert_nonce) {
strvec_pushf(&proc->env, strvec_pushf(&opt->env,
"GIT_PUSH_CERT_NONCE=%s", "GIT_PUSH_CERT_NONCE=%s",
push_cert_nonce); push_cert_nonce);
strvec_pushf(&proc->env, strvec_pushf(&opt->env,
"GIT_PUSH_CERT_NONCE_STATUS=%s", "GIT_PUSH_CERT_NONCE_STATUS=%s",
nonce_status); nonce_status);
if (nonce_status == NONCE_SLOP) if (nonce_status == NONCE_SLOP)
strvec_pushf(&proc->env, strvec_pushf(&opt->env,
"GIT_PUSH_CERT_NONCE_SLOP=%ld", "GIT_PUSH_CERT_NONCE_SLOP=%ld",
nonce_stamp_slop); nonce_stamp_slop);
} }
@ -803,119 +803,74 @@ struct receive_hook_feed_state {
struct ref_push_report *report; struct ref_push_report *report;
int skip_broken; int skip_broken;
struct strbuf buf; struct strbuf buf;
const struct string_list *push_options;
}; };
typedef int (*feed_fn)(void *, const char **, size_t *); static int feed_receive_hook_cb(int hook_stdin_fd, void *pp_cb UNUSED, void *pp_task_cb)
static int run_and_feed_hook(const char *hook_name, feed_fn feed,
struct receive_hook_feed_state *feed_state)
{ {
struct child_process proc = CHILD_PROCESS_INIT; struct receive_hook_feed_state *state = pp_task_cb;
struct async muxer; struct command *cmd = state->cmd;
int code; unsigned int lines_batch_size = 500;
const char *hook_path = find_hook(the_repository, hook_name);
if (!hook_path) strbuf_reset(&state->buf);
return 0;
strvec_push(&proc.args, hook_path); /* batch lines to avoid going through run-command's poll loop for each line */
proc.in = -1; for (unsigned int i = 0; i < lines_batch_size; i++) {
proc.stdout_to_stderr = 1; while (cmd &&
proc.trace2_hook_name = hook_name; state->skip_broken && (cmd->error_string || cmd->did_not_exist))
cmd = cmd->next;
if (feed_state->push_options) { if (!cmd)
size_t i; break; /* no more commands left */
for (i = 0; i < feed_state->push_options->nr; i++)
strvec_pushf(&proc.env,
"GIT_PUSH_OPTION_%"PRIuMAX"=%s",
(uintmax_t)i,
feed_state->push_options->items[i].string);
strvec_pushf(&proc.env, "GIT_PUSH_OPTION_COUNT=%"PRIuMAX"",
(uintmax_t)feed_state->push_options->nr);
} else
strvec_pushf(&proc.env, "GIT_PUSH_OPTION_COUNT");
if (tmp_objdir) if (!state->report)
strvec_pushv(&proc.env, tmp_objdir_env(tmp_objdir)); state->report = cmd->report;
if (use_sideband) { if (state->report) {
memset(&muxer, 0, sizeof(muxer)); struct object_id *old_oid;
muxer.proc = copy_to_sideband; struct object_id *new_oid;
muxer.in = -1; const char *ref_name;
code = start_async(&muxer);
if (code) old_oid = state->report->old_oid ? state->report->old_oid : &cmd->old_oid;
return code; new_oid = state->report->new_oid ? state->report->new_oid : &cmd->new_oid;
proc.err = muxer.in; ref_name = state->report->ref_name ? state->report->ref_name : cmd->ref_name;
strbuf_addf(&state->buf, "%s %s %s\n",
oid_to_hex(old_oid), oid_to_hex(new_oid),
ref_name);
state->report = state->report->next;
if (!state->report)
cmd = cmd->next;
} else {
strbuf_addf(&state->buf, "%s %s %s\n",
oid_to_hex(&cmd->old_oid), oid_to_hex(&cmd->new_oid),
cmd->ref_name);
cmd = cmd->next;
}
} }
prepare_push_cert_sha1(&proc); state->cmd = cmd;
code = start_command(&proc); if (state->buf.len > 0) {
if (code) { int ret = write_in_full(hook_stdin_fd, state->buf.buf, state->buf.len);
if (use_sideband) if (ret < 0) {
finish_async(&muxer); if (errno == EPIPE)
return code; return 1; /* child closed pipe */
return ret;
}
} }
sigchain_push(SIGPIPE, SIG_IGN); return state->cmd ? 0 : 1; /* 0 = more to come, 1 = EOF */
while (1) {
const char *buf;
size_t n;
if (feed(feed_state, &buf, &n))
break;
if (write_in_full(proc.in, buf, n) < 0)
break;
}
close(proc.in);
if (use_sideband)
finish_async(&muxer);
sigchain_pop(SIGPIPE);
return finish_command(&proc);
} }
static int feed_receive_hook(void *state_, const char **bufp, size_t *sizep) static void hook_output_to_sideband(struct strbuf *output, void *cb_data UNUSED)
{ {
struct receive_hook_feed_state *state = state_; if (!output)
struct command *cmd = state->cmd; BUG("output must be non-NULL");
while (cmd && /* buffer might be empty for keepalives */
state->skip_broken && (cmd->error_string || cmd->did_not_exist)) if (output->len)
cmd = cmd->next; send_sideband(1, 2, output->buf, output->len, use_sideband);
if (!cmd)
return -1; /* EOF */
if (!bufp)
return 0; /* OK, can feed something. */
strbuf_reset(&state->buf);
if (!state->report)
state->report = cmd->report;
if (state->report) {
struct object_id *old_oid;
struct object_id *new_oid;
const char *ref_name;
old_oid = state->report->old_oid ? state->report->old_oid : &cmd->old_oid;
new_oid = state->report->new_oid ? state->report->new_oid : &cmd->new_oid;
ref_name = state->report->ref_name ? state->report->ref_name : cmd->ref_name;
strbuf_addf(&state->buf, "%s %s %s\n",
oid_to_hex(old_oid), oid_to_hex(new_oid),
ref_name);
state->report = state->report->next;
if (!state->report)
state->cmd = cmd->next;
} else {
strbuf_addf(&state->buf, "%s %s %s\n",
oid_to_hex(&cmd->old_oid), oid_to_hex(&cmd->new_oid),
cmd->ref_name);
state->cmd = cmd->next;
}
if (bufp) {
*bufp = state->buf.buf;
*sizep = state->buf.len;
}
return 0;
} }
static int run_receive_hook(struct command *commands, static int run_receive_hook(struct command *commands,
@ -923,47 +878,65 @@ static int run_receive_hook(struct command *commands,
int skip_broken, int skip_broken,
const struct string_list *push_options) const struct string_list *push_options)
{ {
struct receive_hook_feed_state state; struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
int status; struct command *iter = commands;
struct receive_hook_feed_state feed_state;
int ret;
strbuf_init(&state.buf, 0); /* if there are no valid commands, don't invoke the hook at all. */
state.cmd = commands; while (iter && skip_broken && (iter->error_string || iter->did_not_exist))
state.skip_broken = skip_broken; iter = iter->next;
state.report = NULL; if (!iter)
if (feed_receive_hook(&state, NULL, NULL))
return 0; return 0;
state.cmd = commands;
state.push_options = push_options; if (push_options) {
status = run_and_feed_hook(hook_name, feed_receive_hook, &state); for (int i = 0; i < push_options->nr; i++)
strbuf_release(&state.buf); strvec_pushf(&opt.env, "GIT_PUSH_OPTION_%d=%s", i,
return status; push_options->items[i].string);
strvec_pushf(&opt.env, "GIT_PUSH_OPTION_COUNT=%"PRIuMAX"",
(uintmax_t)push_options->nr);
} else {
strvec_push(&opt.env, "GIT_PUSH_OPTION_COUNT");
}
if (tmp_objdir)
strvec_pushv(&opt.env, tmp_objdir_env(tmp_objdir));
prepare_push_cert_sha1(&opt);
/* set up sideband printer */
if (use_sideband)
opt.consume_output = hook_output_to_sideband;
/* set up stdin callback */
feed_state.cmd = commands;
feed_state.skip_broken = skip_broken;
feed_state.report = NULL;
strbuf_init(&feed_state.buf, 0);
opt.feed_pipe_cb_data = &feed_state;
opt.feed_pipe = feed_receive_hook_cb;
ret = run_hooks_opt(the_repository, hook_name, &opt);
strbuf_release(&feed_state.buf);
return ret;
} }
static int run_update_hook(struct command *cmd) static int run_update_hook(struct command *cmd)
{ {
struct child_process proc = CHILD_PROCESS_INIT; struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
int code;
const char *hook_path = find_hook(the_repository, "update");
if (!hook_path) strvec_pushl(&opt.args,
return 0; cmd->ref_name,
oid_to_hex(&cmd->old_oid),
oid_to_hex(&cmd->new_oid),
NULL);
strvec_push(&proc.args, hook_path);
strvec_push(&proc.args, cmd->ref_name);
strvec_push(&proc.args, oid_to_hex(&cmd->old_oid));
strvec_push(&proc.args, oid_to_hex(&cmd->new_oid));
proc.no_stdin = 1;
proc.stdout_to_stderr = 1;
proc.err = use_sideband ? -1 : 0;
proc.trace2_hook_name = "update";
code = start_command(&proc);
if (code)
return code;
if (use_sideband) if (use_sideband)
copy_to_sideband(proc.err, -1, NULL); opt.consume_output = hook_output_to_sideband;
return finish_command(&proc);
return run_hooks_opt(the_repository, "update", &opt);
} }
static struct command *find_command_by_refname(struct command *list, static struct command *find_command_by_refname(struct command *list,
@ -1640,33 +1613,20 @@ out:
static void run_update_post_hook(struct command *commands) static void run_update_post_hook(struct command *commands)
{ {
struct command *cmd; struct command *cmd;
struct child_process proc = CHILD_PROCESS_INIT; struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
const char *hook;
hook = find_hook(the_repository, "post-update");
if (!hook)
return;
for (cmd = commands; cmd; cmd = cmd->next) { for (cmd = commands; cmd; cmd = cmd->next) {
if (cmd->error_string || cmd->did_not_exist) if (cmd->error_string || cmd->did_not_exist)
continue; continue;
if (!proc.args.nr) strvec_push(&opt.args, cmd->ref_name);
strvec_push(&proc.args, hook);
strvec_push(&proc.args, cmd->ref_name);
} }
if (!proc.args.nr) if (!opt.args.nr)
return; return;
proc.no_stdin = 1; if (use_sideband)
proc.stdout_to_stderr = 1; opt.consume_output = hook_output_to_sideband;
proc.err = use_sideband ? -1 : 0;
proc.trace2_hook_name = "post-update";
if (!start_command(&proc)) { run_hooks_opt(the_repository, "post-update", &opt);
if (use_sideband)
copy_to_sideband(proc.err, -1, NULL);
finish_command(&proc);
}
} }
static void check_aliased_update_internal(struct command *cmd, static void check_aliased_update_internal(struct command *cmd,

View File

@ -1978,6 +1978,9 @@ int run_commit_hook(int editor_is_used, const char *index_file,
strvec_push(&opt.args, arg); strvec_push(&opt.args, arg);
va_end(args); va_end(args);
/* All commit hook use-cases require ungrouping child output. */
opt.ungroup = 1;
opt.invoked_hook = invoked_hook; opt.invoked_hook = invoked_hook;
return run_hooks_opt(the_repository, name, &opt); return run_hooks_opt(the_repository, name, &opt);
} }

29
hook.c
View File

@ -55,7 +55,7 @@ int hook_exists(struct repository *r, const char *name)
static int pick_next_hook(struct child_process *cp, static int pick_next_hook(struct child_process *cp,
struct strbuf *out UNUSED, struct strbuf *out UNUSED,
void *pp_cb, void *pp_cb,
void **pp_task_cb UNUSED) void **pp_task_cb)
{ {
struct hook_cb_data *hook_cb = pp_cb; struct hook_cb_data *hook_cb = pp_cb;
const char *hook_path = hook_cb->hook_path; const char *hook_path = hook_cb->hook_path;
@ -65,11 +65,22 @@ static int pick_next_hook(struct child_process *cp,
cp->no_stdin = 1; cp->no_stdin = 1;
strvec_pushv(&cp->env, hook_cb->options->env.v); strvec_pushv(&cp->env, hook_cb->options->env.v);
if (hook_cb->options->path_to_stdin && hook_cb->options->feed_pipe)
BUG("options path_to_stdin and feed_pipe are mutually exclusive");
/* reopen the file for stdin; run_command closes it. */ /* reopen the file for stdin; run_command closes it. */
if (hook_cb->options->path_to_stdin) { if (hook_cb->options->path_to_stdin) {
cp->no_stdin = 0; cp->no_stdin = 0;
cp->in = xopen(hook_cb->options->path_to_stdin, O_RDONLY); cp->in = xopen(hook_cb->options->path_to_stdin, O_RDONLY);
} }
if (hook_cb->options->feed_pipe) {
cp->no_stdin = 0;
/* start_command() will allocate a pipe / stdin fd for us */
cp->in = -1;
}
cp->stdout_to_stderr = 1; cp->stdout_to_stderr = 1;
cp->trace2_hook_name = hook_cb->hook_name; cp->trace2_hook_name = hook_cb->hook_name;
cp->dir = hook_cb->options->dir; cp->dir = hook_cb->options->dir;
@ -77,6 +88,12 @@ static int pick_next_hook(struct child_process *cp,
strvec_push(&cp->args, hook_path); strvec_push(&cp->args, hook_path);
strvec_pushv(&cp->args, hook_cb->options->args.v); strvec_pushv(&cp->args, hook_cb->options->args.v);
/*
* Provide per-hook internal state via task_cb for easy access, so
* hook callbacks don't have to go through hook_cb->options.
*/
*pp_task_cb = hook_cb->options->feed_pipe_cb_data;
/* /*
* This pick_next_hook() will be called again, we're only * This pick_next_hook() will be called again, we're only
* running one hook, so indicate that no more work will be * running one hook, so indicate that no more work will be
@ -136,10 +153,12 @@ int run_hooks_opt(struct repository *r, const char *hook_name,
.tr2_label = hook_name, .tr2_label = hook_name,
.processes = 1, .processes = 1,
.ungroup = 1, .ungroup = options->ungroup,
.get_next_task = pick_next_hook, .get_next_task = pick_next_hook,
.start_failure = notify_start_failure, .start_failure = notify_start_failure,
.feed_pipe = options->feed_pipe,
.consume_output = options->consume_output,
.task_finished = notify_hook_finished, .task_finished = notify_hook_finished,
.data = &cb_data, .data = &cb_data,
@ -148,6 +167,9 @@ int run_hooks_opt(struct repository *r, const char *hook_name,
if (!options) if (!options)
BUG("a struct run_hooks_opt must be provided to run_hooks"); BUG("a struct run_hooks_opt must be provided to run_hooks");
if (options->path_to_stdin && options->feed_pipe)
BUG("options path_to_stdin and feed_pipe are mutually exclusive");
if (options->invoked_hook) if (options->invoked_hook)
*options->invoked_hook = 0; *options->invoked_hook = 0;
@ -177,6 +199,9 @@ int run_hooks(struct repository *r, const char *hook_name)
{ {
struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT; struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
/* All use-cases of this API require ungrouping. */
opt.ungroup = 1;
return run_hooks_opt(r, hook_name, &opt); return run_hooks_opt(r, hook_name, &opt);
} }

51
hook.h
View File

@ -1,6 +1,7 @@
#ifndef HOOK_H #ifndef HOOK_H
#define HOOK_H #define HOOK_H
#include "strvec.h" #include "strvec.h"
#include "run-command.h"
struct repository; struct repository;
@ -33,10 +34,60 @@ struct run_hooks_opt
*/ */
int *invoked_hook; int *invoked_hook;
/**
* Allow hooks to set run_processes_parallel() 'ungroup' behavior.
*/
unsigned int ungroup:1;
/** /**
* Path to file which should be piped to stdin for each hook. * Path to file which should be piped to stdin for each hook.
*/ */
const char *path_to_stdin; const char *path_to_stdin;
/**
* Callback used to incrementally feed a child hook stdin pipe.
*
* Useful especially if a hook consumes large quantities of data
* (e.g. a list of all refs in a client push), so feeding it via
* in-memory strings or slurping to/from files is inefficient.
* While the callback allows piecemeal writing, it can also be
* used for smaller inputs, where it gets called only once.
*
* Add hook callback initalization context to `feed_pipe_ctx`.
* Add hook callback internal state to `feed_pipe_cb_data`.
*
*/
feed_pipe_fn feed_pipe;
/**
* Opaque data pointer used to pass context to `feed_pipe_fn`.
*
* It can be accessed via the second callback arg 'pp_cb':
* ((struct hook_cb_data *) pp_cb)->hook_cb->options->feed_pipe_ctx;
*
* The caller is responsible for managing the memory for this data.
* Only useful when using `run_hooks_opt.feed_pipe`, otherwise ignore it.
*/
void *feed_pipe_ctx;
/**
* Opaque data pointer used to keep internal state across callback calls.
*
* It can be accessed directly via the third callback arg 'pp_task_cb':
* struct ... *state = pp_task_cb;
*
* The caller is responsible for managing the memory for this data.
* Only useful when using `run_hooks_opt.feed_pipe`, otherwise ignore it.
*/
void *feed_pipe_cb_data;
/*
* Populate this to capture output and prevent it from being printed to
* stderr. This will be passed directly through to
* run_command:run_parallel_processes(). See t/helper/test-run-command.c
* for an example.
*/
consume_output_fn consume_output;
}; };
#define RUN_HOOKS_OPT_INIT { \ #define RUN_HOOKS_OPT_INIT { \

110
refs.c
View File

@ -2422,68 +2422,72 @@ static int ref_update_reject_duplicates(struct string_list *refnames,
return 0; return 0;
} }
struct transaction_feed_cb_data {
size_t index;
struct strbuf buf;
};
static int transaction_hook_feed_stdin(int hook_stdin_fd, void *pp_cb, void *pp_task_cb)
{
struct hook_cb_data *hook_cb = pp_cb;
struct ref_transaction *transaction = hook_cb->options->feed_pipe_ctx;
struct transaction_feed_cb_data *feed_cb_data = pp_task_cb;
struct strbuf *buf = &feed_cb_data->buf;
struct ref_update *update;
size_t i = feed_cb_data->index++;
int ret;
if (i >= transaction->nr)
return 1; /* No more refs to process */
update = transaction->updates[i];
if (update->flags & REF_LOG_ONLY)
return 0;
strbuf_reset(buf);
if (!(update->flags & REF_HAVE_OLD))
strbuf_addf(buf, "%s ", oid_to_hex(null_oid(the_hash_algo)));
else if (update->old_target)
strbuf_addf(buf, "ref:%s ", update->old_target);
else
strbuf_addf(buf, "%s ", oid_to_hex(&update->old_oid));
if (!(update->flags & REF_HAVE_NEW))
strbuf_addf(buf, "%s ", oid_to_hex(null_oid(the_hash_algo)));
else if (update->new_target)
strbuf_addf(buf, "ref:%s ", update->new_target);
else
strbuf_addf(buf, "%s ", oid_to_hex(&update->new_oid));
strbuf_addf(buf, "%s\n", update->refname);
ret = write_in_full(hook_stdin_fd, buf->buf, buf->len);
if (ret < 0 && errno != EPIPE)
return ret;
return 0; /* no more input to feed */
}
static int run_transaction_hook(struct ref_transaction *transaction, static int run_transaction_hook(struct ref_transaction *transaction,
const char *state) const char *state)
{ {
struct child_process proc = CHILD_PROCESS_INIT; struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
struct strbuf buf = STRBUF_INIT; struct transaction_feed_cb_data feed_ctx = { 0 };
const char *hook;
int ret = 0; int ret = 0;
hook = find_hook(transaction->ref_store->repo, "reference-transaction"); strvec_push(&opt.args, state);
if (!hook)
return ret;
strvec_pushl(&proc.args, hook, state, NULL); opt.feed_pipe = transaction_hook_feed_stdin;
proc.in = -1; opt.feed_pipe_ctx = transaction;
proc.stdout_to_stderr = 1; opt.feed_pipe_cb_data = &feed_ctx;
proc.trace2_hook_name = "reference-transaction";
ret = start_command(&proc); strbuf_init(&feed_ctx.buf, 0);
if (ret)
return ret;
sigchain_push(SIGPIPE, SIG_IGN); ret = run_hooks_opt(transaction->ref_store->repo, "reference-transaction", &opt);
for (size_t i = 0; i < transaction->nr; i++) { strbuf_release(&feed_ctx.buf);
struct ref_update *update = transaction->updates[i];
if (update->flags & REF_LOG_ONLY)
continue;
strbuf_reset(&buf);
if (!(update->flags & REF_HAVE_OLD))
strbuf_addf(&buf, "%s ", oid_to_hex(null_oid(the_hash_algo)));
else if (update->old_target)
strbuf_addf(&buf, "ref:%s ", update->old_target);
else
strbuf_addf(&buf, "%s ", oid_to_hex(&update->old_oid));
if (!(update->flags & REF_HAVE_NEW))
strbuf_addf(&buf, "%s ", oid_to_hex(null_oid(the_hash_algo)));
else if (update->new_target)
strbuf_addf(&buf, "ref:%s ", update->new_target);
else
strbuf_addf(&buf, "%s ", oid_to_hex(&update->new_oid));
strbuf_addf(&buf, "%s\n", update->refname);
if (write_in_full(proc.in, buf.buf, buf.len) < 0) {
if (errno != EPIPE) {
/* Don't leak errno outside this API */
errno = 0;
ret = -1;
}
break;
}
}
close(proc.in);
sigchain_pop(SIGPIPE);
strbuf_release(&buf);
ret |= finish_command(&proc);
return ret; return ret;
} }

View File

@ -1478,15 +1478,32 @@ enum child_state {
GIT_CP_WAIT_CLEANUP, GIT_CP_WAIT_CLEANUP,
}; };
struct parallel_child {
enum child_state state;
struct child_process process;
struct strbuf err;
void *data;
};
static int child_is_working(const struct parallel_child *pp_child)
{
return pp_child->state == GIT_CP_WORKING;
}
static int child_is_ready_for_cleanup(const struct parallel_child *pp_child)
{
return child_is_working(pp_child) && !pp_child->process.in;
}
static int child_is_receiving_input(const struct parallel_child *pp_child)
{
return child_is_working(pp_child) && pp_child->process.in > 0;
}
struct parallel_processes { struct parallel_processes {
size_t nr_processes; size_t nr_processes;
struct { struct parallel_child *children;
enum child_state state;
struct child_process process;
struct strbuf err;
void *data;
} *children;
/* /*
* The struct pollfd is logically part of *children, * The struct pollfd is logically part of *children,
* but the system call expects it as its own array. * but the system call expects it as its own array.
@ -1509,7 +1526,7 @@ static void kill_children(const struct parallel_processes *pp,
int signo) int signo)
{ {
for (size_t i = 0; i < opts->processes; i++) for (size_t i = 0; i < opts->processes; i++)
if (pp->children[i].state == GIT_CP_WORKING) if (child_is_working(&pp->children[i]))
kill(pp->children[i].process.pid, signo); kill(pp->children[i].process.pid, signo);
} }
@ -1578,7 +1595,10 @@ static void pp_cleanup(struct parallel_processes *pp,
* When get_next_task added messages to the buffer in its last * When get_next_task added messages to the buffer in its last
* iteration, the buffered output is non empty. * iteration, the buffered output is non empty.
*/ */
strbuf_write(&pp->buffered_output, stderr); if (opts->consume_output)
opts->consume_output(&pp->buffered_output, opts->data);
else
strbuf_write(&pp->buffered_output, stderr);
strbuf_release(&pp->buffered_output); strbuf_release(&pp->buffered_output);
sigchain_pop_common(); sigchain_pop_common();
@ -1652,6 +1672,44 @@ static int pp_start_one(struct parallel_processes *pp,
return 0; return 0;
} }
static void pp_buffer_stdin(struct parallel_processes *pp,
const struct run_process_parallel_opts *opts)
{
/* Buffer stdin for each pipe. */
for (size_t i = 0; i < opts->processes; i++) {
struct child_process *proc = &pp->children[i].process;
int ret;
if (!child_is_receiving_input(&pp->children[i]))
continue;
/*
* child input is provided via path_to_stdin when the feed_pipe cb is
* missing, so we just signal an EOF.
*/
if (!opts->feed_pipe) {
close(proc->in);
proc->in = 0;
continue;
}
/**
* Feed the pipe:
* ret < 0 means error
* ret == 0 means there is more data to be fed
* ret > 0 means feeding finished
*/
ret = opts->feed_pipe(proc->in, opts->data, pp->children[i].data);
if (ret < 0)
die_errno("feed_pipe");
if (ret) {
close(proc->in);
proc->in = 0;
}
}
}
static void pp_buffer_stderr(struct parallel_processes *pp, static void pp_buffer_stderr(struct parallel_processes *pp,
const struct run_process_parallel_opts *opts, const struct run_process_parallel_opts *opts,
int output_timeout) int output_timeout)
@ -1665,7 +1723,7 @@ static void pp_buffer_stderr(struct parallel_processes *pp,
/* Buffer output from all pipes. */ /* Buffer output from all pipes. */
for (size_t i = 0; i < opts->processes; i++) { for (size_t i = 0; i < opts->processes; i++) {
if (pp->children[i].state == GIT_CP_WORKING && if (child_is_working(&pp->children[i]) &&
pp->pfd[i].revents & (POLLIN | POLLHUP)) { pp->pfd[i].revents & (POLLIN | POLLHUP)) {
int n = strbuf_read_once(&pp->children[i].err, int n = strbuf_read_once(&pp->children[i].err,
pp->children[i].process.err, 0); pp->children[i].process.err, 0);
@ -1679,13 +1737,17 @@ static void pp_buffer_stderr(struct parallel_processes *pp,
} }
} }
static void pp_output(const struct parallel_processes *pp) static void pp_output(const struct parallel_processes *pp,
const struct run_process_parallel_opts *opts)
{ {
size_t i = pp->output_owner; size_t i = pp->output_owner;
if (pp->children[i].state == GIT_CP_WORKING && if (child_is_working(&pp->children[i]) &&
pp->children[i].err.len) { pp->children[i].err.len) {
strbuf_write(&pp->children[i].err, stderr); if (opts->consume_output)
opts->consume_output(&pp->children[i].err, opts->data);
else
strbuf_write(&pp->children[i].err, stderr);
strbuf_reset(&pp->children[i].err); strbuf_reset(&pp->children[i].err);
} }
} }
@ -1722,6 +1784,7 @@ static int pp_collect_finished(struct parallel_processes *pp,
pp->children[i].state = GIT_CP_FREE; pp->children[i].state = GIT_CP_FREE;
if (pp->pfd) if (pp->pfd)
pp->pfd[i].fd = -1; pp->pfd[i].fd = -1;
pp->children[i].process.in = 0;
child_process_init(&pp->children[i].process); child_process_init(&pp->children[i].process);
if (opts->ungroup) { if (opts->ungroup) {
@ -1732,11 +1795,15 @@ static int pp_collect_finished(struct parallel_processes *pp,
} else { } else {
const size_t n = opts->processes; const size_t n = opts->processes;
strbuf_write(&pp->children[i].err, stderr); /* Output errors, then all other finished child processes */
if (opts->consume_output) {
opts->consume_output(&pp->children[i].err, opts->data);
opts->consume_output(&pp->buffered_output, opts->data);
} else {
strbuf_write(&pp->children[i].err, stderr);
strbuf_write(&pp->buffered_output, stderr);
}
strbuf_reset(&pp->children[i].err); strbuf_reset(&pp->children[i].err);
/* Output all other finished child processes */
strbuf_write(&pp->buffered_output, stderr);
strbuf_reset(&pp->buffered_output); strbuf_reset(&pp->buffered_output);
/* /*
@ -1748,7 +1815,7 @@ static int pp_collect_finished(struct parallel_processes *pp,
* running process time. * running process time.
*/ */
for (i = 0; i < n; i++) for (i = 0; i < n; i++)
if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING) if (child_is_working(&pp->children[(pp->output_owner + i) % n]))
break; break;
pp->output_owner = (pp->output_owner + i) % n; pp->output_owner = (pp->output_owner + i) % n;
} }
@ -1756,6 +1823,27 @@ static int pp_collect_finished(struct parallel_processes *pp,
return result; return result;
} }
static void pp_handle_child_IO(struct parallel_processes *pp,
const struct run_process_parallel_opts *opts,
int output_timeout)
{
/*
* First push input, if any (it might no-op), to child tasks to avoid them blocking
* after input. This also prevents deadlocks when ungrouping below, if a child blocks
* while the parent also waits for them to finish.
*/
pp_buffer_stdin(pp, opts);
if (opts->ungroup) {
for (size_t i = 0; i < opts->processes; i++)
if (child_is_ready_for_cleanup(&pp->children[i]))
pp->children[i].state = GIT_CP_WAIT_CLEANUP;
} else {
pp_buffer_stderr(pp, opts, output_timeout);
pp_output(pp, opts);
}
}
void run_processes_parallel(const struct run_process_parallel_opts *opts) void run_processes_parallel(const struct run_process_parallel_opts *opts)
{ {
int i, code; int i, code;
@ -1775,6 +1863,16 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
"max:%"PRIuMAX, "max:%"PRIuMAX,
(uintmax_t)opts->processes); (uintmax_t)opts->processes);
if (opts->ungroup && opts->consume_output)
BUG("ungroup and reading output are mutualy exclusive");
/*
* Child tasks might receive input via stdin, terminating early (or not), so
* ignore the default SIGPIPE which gets handled by each feed_pipe_fn which
* actually writes the data to children stdin fds.
*/
sigchain_push(SIGPIPE, SIG_IGN);
pp_init(&pp, opts, &pp_sig); pp_init(&pp, opts, &pp_sig);
while (1) { while (1) {
for (i = 0; for (i = 0;
@ -1792,13 +1890,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
} }
if (!pp.nr_processes) if (!pp.nr_processes)
break; break;
if (opts->ungroup) { pp_handle_child_IO(&pp, opts, output_timeout);
for (size_t i = 0; i < opts->processes; i++)
pp.children[i].state = GIT_CP_WAIT_CLEANUP;
} else {
pp_buffer_stderr(&pp, opts, output_timeout);
pp_output(&pp);
}
code = pp_collect_finished(&pp, opts); code = pp_collect_finished(&pp, opts);
if (code) { if (code) {
pp.shutdown = 1; pp.shutdown = 1;
@ -1809,6 +1901,8 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
pp_cleanup(&pp, opts); pp_cleanup(&pp, opts);
sigchain_pop(SIGPIPE);
if (do_trace2) if (do_trace2)
trace2_region_leave(tr2_category, tr2_label, NULL); trace2_region_leave(tr2_category, tr2_label, NULL);
} }

View File

@ -420,6 +420,32 @@ typedef int (*start_failure_fn)(struct strbuf *out,
void *pp_cb, void *pp_cb,
void *pp_task_cb); void *pp_task_cb);
/**
* This callback is repeatedly called on every child process who requests
* start_command() to create a pipe by setting child_process.in < 0.
*
* pp_cb is the callback cookie as passed into run_processes_parallel, and
* pp_task_cb is the callback cookie as passed into get_next_task_fn.
*
* Returns < 0 for error
* Returns == 0 when there is more data to be fed (will be called again)
* Returns > 0 when finished (child closed fd or no more data to be fed)
*/
typedef int (*feed_pipe_fn)(int child_in,
void *pp_cb,
void *pp_task_cb);
/**
* If this callback is provided, output is collated into a new pipe instead
* of the process stderr. Then `consume_output_fn` will be called repeatedly
* with output contained in the `output` arg. It will also be called with an
* empty `output` to allow for keepalives or similar operations if necessary.
*
* pp_cb is the callback cookie as passed into run_processes_parallel.
* No task cookie is provided because the callback receives collated output.
*/
typedef void (*consume_output_fn)(struct strbuf *output, void *pp_cb);
/** /**
* This callback is called on every child process that finished processing. * This callback is called on every child process that finished processing.
* *
@ -473,6 +499,18 @@ struct run_process_parallel_opts
*/ */
start_failure_fn start_failure; start_failure_fn start_failure;
/*
* feed_pipe: see feed_pipe_fn() above. This can be NULL to omit any
* special handling.
*/
feed_pipe_fn feed_pipe;
/*
* consume_output: see consume_output_fn() above. This can be NULL
* to omit any special handling.
*/
consume_output_fn consume_output;
/** /**
* task_finished: See task_finished_fn() above. This can be * task_finished: See task_finished_fn() above. This can be
* NULL to omit any special handling. * NULL to omit any special handling.

View File

@ -1292,32 +1292,40 @@ int update_head_with_reflog(const struct commit *old_head,
return ret; return ret;
} }
static int pipe_from_strbuf(int hook_stdin_fd, void *pp_cb, void *pp_task_cb UNUSED)
{
struct hook_cb_data *hook_cb = pp_cb;
struct strbuf *to_pipe = hook_cb->options->feed_pipe_ctx;
int ret;
if (!to_pipe)
BUG("pipe_from_strbuf called without feed_pipe_ctx");
ret = write_in_full(hook_stdin_fd, to_pipe->buf, to_pipe->len);
if (ret < 0 && errno != EPIPE)
return ret;
return 1; /* done writing */
}
static int run_rewrite_hook(const struct object_id *oldoid, static int run_rewrite_hook(const struct object_id *oldoid,
const struct object_id *newoid) const struct object_id *newoid)
{ {
struct child_process proc = CHILD_PROCESS_INIT; struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
int code; int code;
struct strbuf sb = STRBUF_INIT; struct strbuf sb = STRBUF_INIT;
const char *hook_path = find_hook(the_repository, "post-rewrite");
if (!hook_path)
return 0;
strvec_pushl(&proc.args, hook_path, "amend", NULL);
proc.in = -1;
proc.stdout_to_stderr = 1;
proc.trace2_hook_name = "post-rewrite";
code = start_command(&proc);
if (code)
return code;
strbuf_addf(&sb, "%s %s\n", oid_to_hex(oldoid), oid_to_hex(newoid)); strbuf_addf(&sb, "%s %s\n", oid_to_hex(oldoid), oid_to_hex(newoid));
sigchain_push(SIGPIPE, SIG_IGN);
write_in_full(proc.in, sb.buf, sb.len); opt.feed_pipe_ctx = &sb;
close(proc.in); opt.feed_pipe = pipe_from_strbuf;
strvec_push(&opt.args, "amend");
code = run_hooks_opt(the_repository, "post-rewrite", &opt);
strbuf_release(&sb); strbuf_release(&sb);
sigchain_pop(SIGPIPE); return code;
return finish_command(&proc);
} }
void commit_post_rewrite(struct repository *r, void commit_post_rewrite(struct repository *r,

View File

@ -23,19 +23,26 @@ static int number_callbacks;
static int parallel_next(struct child_process *cp, static int parallel_next(struct child_process *cp,
struct strbuf *err, struct strbuf *err,
void *cb, void *cb,
void **task_cb UNUSED) void **task_cb)
{ {
struct child_process *d = cb; struct child_process *d = cb;
if (number_callbacks >= 4) if (number_callbacks >= 4)
return 0; return 0;
strvec_pushv(&cp->args, d->args.v); strvec_pushv(&cp->args, d->args.v);
cp->in = d->in;
cp->no_stdin = d->no_stdin;
if (err) if (err)
strbuf_addstr(err, "preloaded output of a child\n"); strbuf_addstr(err, "preloaded output of a child\n");
else else
fprintf(stderr, "preloaded output of a child\n"); fprintf(stderr, "preloaded output of a child\n");
number_callbacks++; number_callbacks++;
/* test_stdin callback will use this to count remaining lines */
*task_cb = xmalloc(sizeof(int));
*(int*)(*task_cb) = 2;
return 1; return 1;
} }
@ -51,18 +58,61 @@ static int no_job(struct child_process *cp UNUSED,
return 0; return 0;
} }
static void test_divert_output(struct strbuf *output, void *cb UNUSED)
{
FILE *output_file;
output_file = fopen("./output_file", "a");
strbuf_write(output, output_file);
fclose(output_file);
}
static int task_finished(int result UNUSED, static int task_finished(int result UNUSED,
struct strbuf *err, struct strbuf *err,
void *pp_cb UNUSED, void *pp_cb UNUSED,
void *pp_task_cb UNUSED) void *pp_task_cb)
{ {
if (err) if (err)
strbuf_addstr(err, "asking for a quick stop\n"); strbuf_addstr(err, "asking for a quick stop\n");
else else
fprintf(stderr, "asking for a quick stop\n"); fprintf(stderr, "asking for a quick stop\n");
FREE_AND_NULL(pp_task_cb);
return 1; return 1;
} }
static int task_finished_quiet(int result UNUSED,
struct strbuf *err UNUSED,
void *pp_cb UNUSED,
void *pp_task_cb)
{
FREE_AND_NULL(pp_task_cb);
return 0;
}
static int test_stdin_pipe_feed(int hook_stdin_fd, void *cb UNUSED, void *task_cb)
{
int *lines_remaining = task_cb;
if (*lines_remaining) {
struct strbuf buf = STRBUF_INIT;
strbuf_addf(&buf, "sample stdin %d\n", --(*lines_remaining));
if (write_in_full(hook_stdin_fd, buf.buf, buf.len) < 0) {
if (errno == EPIPE) {
/* child closed stdin, nothing more to do */
strbuf_release(&buf);
return 1;
}
die_errno("write");
}
strbuf_release(&buf);
}
return !(*lines_remaining);
}
struct testsuite { struct testsuite {
struct string_list tests, failed; struct string_list tests, failed;
int next; int next;
@ -157,6 +207,8 @@ static int testsuite(int argc, const char **argv)
struct run_process_parallel_opts opts = { struct run_process_parallel_opts opts = {
.get_next_task = next_test, .get_next_task = next_test,
.start_failure = test_failed, .start_failure = test_failed,
.feed_pipe = test_stdin_pipe_feed,
.consume_output = test_divert_output,
.task_finished = test_finished, .task_finished = test_finished,
.data = &suite, .data = &suite,
}; };
@ -460,12 +512,23 @@ int cmd__run_command(int argc, const char **argv)
if (!strcmp(argv[1], "run-command-parallel")) { if (!strcmp(argv[1], "run-command-parallel")) {
opts.get_next_task = parallel_next; opts.get_next_task = parallel_next;
opts.task_finished = task_finished_quiet;
} else if (!strcmp(argv[1], "run-command-abort")) { } else if (!strcmp(argv[1], "run-command-abort")) {
opts.get_next_task = parallel_next; opts.get_next_task = parallel_next;
opts.task_finished = task_finished; opts.task_finished = task_finished;
} else if (!strcmp(argv[1], "run-command-no-jobs")) { } else if (!strcmp(argv[1], "run-command-no-jobs")) {
opts.get_next_task = no_job; opts.get_next_task = no_job;
opts.task_finished = task_finished; opts.task_finished = task_finished;
} else if (!strcmp(argv[1], "run-command-stdin")) {
proc.in = -1;
proc.no_stdin = 0;
opts.get_next_task = parallel_next;
opts.task_finished = task_finished_quiet;
opts.feed_pipe = test_stdin_pipe_feed;
} else if (!strcmp(argv[1], "run-command-divert-output")) {
opts.get_next_task = parallel_next;
opts.consume_output = test_divert_output;
opts.task_finished = task_finished_quiet;
} else { } else {
ret = 1; ret = 1;
fprintf(stderr, "check usage\n"); fprintf(stderr, "check usage\n");

View File

@ -164,6 +164,44 @@ test_expect_success 'run_command runs ungrouped in parallel with more tasks than
test_line_count = 4 err test_line_count = 4 err
' '
test_expect_success 'run_command can divert output' '
test_when_finished rm output_file &&
test-tool run-command run-command-divert-output 3 sh -c "printf \"%s\n%s\n\" Hello World" 2>actual &&
test_must_be_empty actual &&
test_cmp expect output_file
'
test_expect_success 'run_command listens to stdin' '
cat >expect <<-\EOF &&
preloaded output of a child
listening for stdin:
sample stdin 1
sample stdin 0
preloaded output of a child
listening for stdin:
sample stdin 1
sample stdin 0
preloaded output of a child
listening for stdin:
sample stdin 1
sample stdin 0
preloaded output of a child
listening for stdin:
sample stdin 1
sample stdin 0
EOF
write_script stdin-script <<-\EOF &&
echo "listening for stdin:"
while read line
do
echo "$line"
done
EOF
test-tool run-command run-command-stdin 2 ./stdin-script 2>actual &&
test_cmp expect actual
'
cat >expect <<-EOF cat >expect <<-EOF
preloaded output of a child preloaded output of a child
asking for a quick stop asking for a quick stop

View File

@ -1316,65 +1316,66 @@ static void die_with_unpushed_submodules(struct string_list *needs_pushing)
die(_("Aborting.")); die(_("Aborting."));
} }
struct feed_pre_push_hook_data {
struct strbuf buf;
const struct ref *refs;
};
static int pre_push_hook_feed_stdin(int hook_stdin_fd, void *pp_cb UNUSED, void *pp_task_cb)
{
struct feed_pre_push_hook_data *data = pp_task_cb;
const struct ref *r = data->refs;
int ret = 0;
if (!r)
return 1; /* no more refs */
data->refs = r->next;
switch (r->status) {
case REF_STATUS_REJECT_NONFASTFORWARD:
case REF_STATUS_REJECT_REMOTE_UPDATED:
case REF_STATUS_REJECT_STALE:
case REF_STATUS_UPTODATE:
return 0; /* skip refs which won't be pushed */
default:
break;
}
if (!r->peer_ref)
return 0;
strbuf_reset(&data->buf);
strbuf_addf(&data->buf, "%s %s %s %s\n",
r->peer_ref->name, oid_to_hex(&r->new_oid),
r->name, oid_to_hex(&r->old_oid));
ret = write_in_full(hook_stdin_fd, data->buf.buf, data->buf.len);
if (ret < 0 && errno != EPIPE)
return ret; /* We do not mind if a hook does not read all refs. */
return 0;
}
static int run_pre_push_hook(struct transport *transport, static int run_pre_push_hook(struct transport *transport,
struct ref *remote_refs) struct ref *remote_refs)
{ {
int ret = 0, x; struct run_hooks_opt opt = RUN_HOOKS_OPT_INIT;
struct ref *r; struct feed_pre_push_hook_data data;
struct child_process proc = CHILD_PROCESS_INIT; int ret = 0;
struct strbuf buf;
const char *hook_path = find_hook(the_repository, "pre-push");
if (!hook_path) strvec_push(&opt.args, transport->remote->name);
return 0; strvec_push(&opt.args, transport->url);
strvec_push(&proc.args, hook_path); strbuf_init(&data.buf, 0);
strvec_push(&proc.args, transport->remote->name); data.refs = remote_refs;
strvec_push(&proc.args, transport->url);
proc.in = -1; opt.feed_pipe = pre_push_hook_feed_stdin;
proc.trace2_hook_name = "pre-push"; opt.feed_pipe_cb_data = &data;
if (start_command(&proc)) { ret = run_hooks_opt(the_repository, "pre-push", &opt);
finish_command(&proc);
return -1;
}
sigchain_push(SIGPIPE, SIG_IGN); strbuf_release(&data.buf);
strbuf_init(&buf, 256);
for (r = remote_refs; r; r = r->next) {
if (!r->peer_ref) continue;
if (r->status == REF_STATUS_REJECT_NONFASTFORWARD) continue;
if (r->status == REF_STATUS_REJECT_STALE) continue;
if (r->status == REF_STATUS_REJECT_REMOTE_UPDATED) continue;
if (r->status == REF_STATUS_UPTODATE) continue;
strbuf_reset(&buf);
strbuf_addf( &buf, "%s %s %s %s\n",
r->peer_ref->name, oid_to_hex(&r->new_oid),
r->name, oid_to_hex(&r->old_oid));
if (write_in_full(proc.in, buf.buf, buf.len) < 0) {
/* We do not mind if a hook does not read all refs. */
if (errno != EPIPE)
ret = -1;
break;
}
}
strbuf_release(&buf);
x = close(proc.in);
if (!ret)
ret = x;
sigchain_pop(SIGPIPE);
x = finish_command(&proc);
if (!ret)
ret = x;
return ret; return ret;
} }