diff options
-rw-r--r-- | doc/man/indexamajig.1 | 5 | ||||
-rw-r--r-- | src/im-asapo.c | 26 | ||||
-rw-r--r-- | src/im-asapo.h | 1 | ||||
-rw-r--r-- | src/indexamajig.c | 7 |
4 files changed, 38 insertions, 1 deletions
diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1 index 25e025ac..4ad84325 100644 --- a/doc/man/indexamajig.1 +++ b/doc/man/indexamajig.1 @@ -204,6 +204,11 @@ Authentication token, beamtime, data source and consumer group, respectively, fo Name of ASAP::O stream to process. If this option is not given, indexamajig will start processing from the end of the current last stream. .PD 0 +.IP \fB--asapo-wait-for-stream +.PD +If the ASAP::O stream does not exist, wait for it to be appear. Without this option, indexamajig will exit immediately if the stream is not found. + +.PD 0 .IP \fB--data-format=\fIformat\fR .PD Specify the data format for data received over ZeroMQ or ASAP::O. Possible values in this version are \fBmsgpack\fR, \fBhdf5\fR and \fBseedee\fR. diff --git a/src/im-asapo.c b/src/im-asapo.c index b19c5bdc..255a942d 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -51,6 +51,7 @@ struct im_asapo char *stream; AsapoConsumerHandle consumer; AsapoStringHandle group_id; + int wait_for_stream; }; @@ -110,11 +111,30 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) a->stream = strdup(params->stream); asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(params->group_id); + a->wait_for_stream = params->wait_for_stream; return a; } +static int stream_empty(struct im_asapo *a) +{ + AsapoErrorHandle err; + + err = asapo_new_handle(); + int64_t size = asapo_consumer_get_current_size(a->consumer, a->stream, + &err); + + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't get stream size", err); + asapo_free_handle(&err); + return 0; + } + + return ( size == 0 ); +} + + void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, int *pfinished) @@ -141,7 +161,11 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); - *pfinished = 1; + if ( stream_empty(a) && a->wait_for_stream ) { + *pfinished = 0; + } else { + *pfinished = 1; + } return NULL; } diff --git a/src/im-asapo.h b/src/im-asapo.h index f2e9e40a..cda9fbd9 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -42,6 +42,7 @@ struct im_asapo_params char *group_id; char *source; char *stream; + int wait_for_stream; }; #if defined(HAVE_ASAPO) diff --git a/src/indexamajig.c b/src/indexamajig.c index af1a5704..6e821434 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -435,6 +435,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) args->asapo_params.stream = strdup(arg); break; + case 221 : + args->asapo_params.wait_for_stream = 1; + break; + /* ---------- Peak search ---------- */ case 't' : @@ -858,6 +862,7 @@ int main(int argc, char *argv[]) args.asapo_params.group_id = NULL; args.asapo_params.source = NULL; args.asapo_params.stream = NULL; + args.asapo_params.wait_for_stream = 0; args.serial_start = 1; args.if_peaks = 1; args.if_multi = 0; @@ -972,6 +977,8 @@ int main(int argc, char *argv[]) {"asapo-source", 218, "str", OPTION_NO_USAGE, "ASAP::O data source"}, {"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"}, {"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"}, + {"asapo-wait-for-stream", 221, "str", OPTION_NO_USAGE, + "Wait for ASAP::O stream to appear"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, {"peaks", 301, "method", 0, "Peak search method. Default: zaef"}, |