diff options
-rw-r--r-- | doc/man/indexamajig.1.md | 6 | ||||
-rw-r--r-- | src/im-sandbox.c | 15 | ||||
-rw-r--r-- | src/im-sandbox.h | 3 | ||||
-rw-r--r-- | src/indexamajig.c | 15 |
4 files changed, 34 insertions, 5 deletions
diff --git a/doc/man/indexamajig.1.md b/doc/man/indexamajig.1.md index 541bd583..07620411 100644 --- a/doc/man/indexamajig.1.md +++ b/doc/man/indexamajig.1.md @@ -349,6 +349,12 @@ BASIC OPTIONS : If the ASAP::O stream does not exist, wait for it to be appear. Without this : option, indexamajig will exit immediately if the stream is not found. +**--no-data-timeout** +: Shut down the entire indexamajig process if the specified number of seconds +: elapse without any data being seen. This currently applies to ASAP::O data +: only, but might be extended to other streaming systems in future. The +: default is 60 seconds. + **--data-format=format** : Specify the data format for data received over ZeroMQ or ASAP::O. Possible : values in this version are msgpack, hdf5 and seedee. diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 1a6c9e88..e97e204f 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -1189,7 +1189,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, Stream *stream, const char *tmpdir, int serial_start, struct im_zmq_params *zmq_params, struct im_asapo_params *asapo_params, - int timeout, int profile, int cpu_pin) + int timeout, int profile, int cpu_pin, + int no_data_timeout) { int i; struct sandbox *sb; @@ -1198,6 +1199,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int r; int allDone = 0; struct get_pattern_ctx gpctx; + double t_last_data; if ( n_proc > MAX_NUM_WORKERS ) { ERROR("Number of workers (%i) is too large. Using %i\n", @@ -1346,6 +1348,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return 0; } + t_last_data = get_monotonic_seconds(); do { /* Check for stream output from workers */ @@ -1380,8 +1383,14 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->shared->no_more = 1; } - /* Case 3: All workers saw end of (ASAP::O) stream */ - if ( all_got_end_of_stream(sb) ) allDone = 1; + /* Case 3: No (ASAP::O) data for a long time */ + if ( get_monotonic_seconds() > t_last_data + no_data_timeout ) { + allDone = 1; + } + if ( !all_got_end_of_stream(sb) ) { + /* We are still getting data */ + t_last_data = get_monotonic_seconds(); + } pthread_mutex_unlock(&sb->shared->queue_lock); /* End exit criterion checking */ diff --git a/src/im-sandbox.h b/src/im-sandbox.h index 4d8085ce..ba3a6d3d 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -90,6 +90,7 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, const char *tempdir, int serial_start, struct im_zmq_params *zmq_params, struct im_asapo_params *asapo_params, - int timeout, int profile, int cpu_pin); + int timeout, int profile, int cpu_pin, + int no_data_timeout); #endif /* IM_SANDBOX_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index ef25e396..361b761a 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -94,6 +94,7 @@ struct indexamajig_arguments int if_multi; int if_retry; int profile; /* Whether to do wall-clock time profiling */ + int no_data_timeout; char **copy_headers; int n_copy_headers; char *harvest_file; @@ -457,6 +458,14 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) args->cpu_pin = 1; break; + case 224 : + if (sscanf(arg, "%d", &args->no_data_timeout) != 1) + { + ERROR("Invalid value for --no-data-timeout\n"); + return EINVAL; + } + break; + /* ---------- Peak search ---------- */ case 't' : @@ -910,6 +919,7 @@ int main(int argc, char *argv[]) args.if_refine = 1; args.if_checkcell = 1; args.profile = 0; + args.no_data_timeout = 60; args.copy_headers = NULL; args.n_copy_headers = 0; args.harvest_file = NULL; @@ -1028,6 +1038,8 @@ int main(int argc, char *argv[]) "Wait for ASAP::O stream to appear"}, {"asapo-output-stream", 222, NULL, OPTION_NO_USAGE, "Create an ASAP::O hits-only stream"}, {"cpu-pin", 223, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"}, + {"no-data-timeout", 224, "s", OPTION_NO_USAGE, + "Shut down after this many seconds without ASAP::O data"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, {"peaks", 301, "method", 0, "Peak search method. Default: zaef"}, @@ -1449,7 +1461,8 @@ int main(int argc, char *argv[]) r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename, fh, st, tmpdir, args.serial_start, &args.zmq_params, &args.asapo_params, - timeout, args.profile, args.cpu_pin); + timeout, args.profile, args.cpu_pin, + args.no_data_timeout); if ( pf8_data != NULL ) free_pf8_private_data(pf8_data); cell_free(args.iargs.cell); |