aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2023-12-18 14:42:22 +0100
committerThomas White <taw@physics.org>2023-12-22 14:44:27 +0100
commitb8a765d620dae8d5dc3c5aef8c713d6d71e45bac (patch)
treeadfd6152812ced86a12662b2145f3a77d28cf726
parent32efc83f9587844a27ba67c83a1b77a9e60efbb3 (diff)
indexamajig: Add --no-data-timeout
-rw-r--r--doc/man/indexamajig.1.md6
-rw-r--r--src/im-sandbox.c15
-rw-r--r--src/im-sandbox.h3
-rw-r--r--src/indexamajig.c15
4 files changed, 34 insertions, 5 deletions
diff --git a/doc/man/indexamajig.1.md b/doc/man/indexamajig.1.md
index 541bd583..07620411 100644
--- a/doc/man/indexamajig.1.md
+++ b/doc/man/indexamajig.1.md
@@ -349,6 +349,12 @@ BASIC OPTIONS
: If the ASAP::O stream does not exist, wait for it to be appear. Without this
: option, indexamajig will exit immediately if the stream is not found.
+**--no-data-timeout**
+: Shut down the entire indexamajig process if the specified number of seconds
+: elapse without any data being seen. This currently applies to ASAP::O data
+: only, but might be extended to other streaming systems in future. The
+: default is 60 seconds.
+
**--data-format=format**
: Specify the data format for data received over ZeroMQ or ASAP::O. Possible
: values in this version are msgpack, hdf5 and seedee.
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 1a6c9e88..e97e204f 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -1189,7 +1189,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
Stream *stream, const char *tmpdir, int serial_start,
struct im_zmq_params *zmq_params,
struct im_asapo_params *asapo_params,
- int timeout, int profile, int cpu_pin)
+ int timeout, int profile, int cpu_pin,
+ int no_data_timeout)
{
int i;
struct sandbox *sb;
@@ -1198,6 +1199,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int r;
int allDone = 0;
struct get_pattern_ctx gpctx;
+ double t_last_data;
if ( n_proc > MAX_NUM_WORKERS ) {
ERROR("Number of workers (%i) is too large. Using %i\n",
@@ -1346,6 +1348,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return 0;
}
+ t_last_data = get_monotonic_seconds();
do {
/* Check for stream output from workers */
@@ -1380,8 +1383,14 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->shared->no_more = 1;
}
- /* Case 3: All workers saw end of (ASAP::O) stream */
- if ( all_got_end_of_stream(sb) ) allDone = 1;
+ /* Case 3: No (ASAP::O) data for a long time */
+ if ( get_monotonic_seconds() > t_last_data + no_data_timeout ) {
+ allDone = 1;
+ }
+ if ( !all_got_end_of_stream(sb) ) {
+ /* We are still getting data */
+ t_last_data = get_monotonic_seconds();
+ }
pthread_mutex_unlock(&sb->shared->queue_lock);
/* End exit criterion checking */
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index 4d8085ce..ba3a6d3d 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -90,6 +90,7 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
const char *tempdir, int serial_start,
struct im_zmq_params *zmq_params,
struct im_asapo_params *asapo_params,
- int timeout, int profile, int cpu_pin);
+ int timeout, int profile, int cpu_pin,
+ int no_data_timeout);
#endif /* IM_SANDBOX_H */
diff --git a/src/indexamajig.c b/src/indexamajig.c
index ef25e396..361b761a 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -94,6 +94,7 @@ struct indexamajig_arguments
int if_multi;
int if_retry;
int profile; /* Whether to do wall-clock time profiling */
+ int no_data_timeout;
char **copy_headers;
int n_copy_headers;
char *harvest_file;
@@ -457,6 +458,14 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
args->cpu_pin = 1;
break;
+ case 224 :
+ if (sscanf(arg, "%d", &args->no_data_timeout) != 1)
+ {
+ ERROR("Invalid value for --no-data-timeout\n");
+ return EINVAL;
+ }
+ break;
+
/* ---------- Peak search ---------- */
case 't' :
@@ -910,6 +919,7 @@ int main(int argc, char *argv[])
args.if_refine = 1;
args.if_checkcell = 1;
args.profile = 0;
+ args.no_data_timeout = 60;
args.copy_headers = NULL;
args.n_copy_headers = 0;
args.harvest_file = NULL;
@@ -1028,6 +1038,8 @@ int main(int argc, char *argv[])
"Wait for ASAP::O stream to appear"},
{"asapo-output-stream", 222, NULL, OPTION_NO_USAGE, "Create an ASAP::O hits-only stream"},
{"cpu-pin", 223, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"},
+ {"no-data-timeout", 224, "s", OPTION_NO_USAGE,
+ "Shut down after this many seconds without ASAP::O data"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
@@ -1449,7 +1461,8 @@ int main(int argc, char *argv[])
r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename,
fh, st, tmpdir, args.serial_start,
&args.zmq_params, &args.asapo_params,
- timeout, args.profile, args.cpu_pin);
+ timeout, args.profile, args.cpu_pin,
+ args.no_data_timeout);
if ( pf8_data != NULL ) free_pf8_private_data(pf8_data);
cell_free(args.iargs.cell);