diff options
-rw-r--r-- | doc/man/indexamajig.1 | 5 | ||||
-rw-r--r-- | src/im-asapo.c | 47 | ||||
-rw-r--r-- | src/im-asapo.h | 13 | ||||
-rw-r--r-- | src/im-sandbox.c | 18 | ||||
-rw-r--r-- | src/im-sandbox.h | 4 | ||||
-rw-r--r-- | src/indexamajig.c | 11 |
6 files changed, 79 insertions, 19 deletions
diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1 index fa108270..25e025ac 100644 --- a/doc/man/indexamajig.1 +++ b/doc/man/indexamajig.1 @@ -199,6 +199,11 @@ Receive data via the specified ASAP::O endpoint. This option and \fB--zmq-input Authentication token, beamtime, data source and consumer group, respectively, for ASAP::O data. .PD 0 +.IP \fB--asapo-stream=\fIstream\fR +.PD +Name of ASAP::O stream to process. If this option is not given, indexamajig will start processing from the end of the current last stream. + +.PD 0 .IP \fB--data-format=\fIformat\fR .PD Specify the data format for data received over ZeroMQ or ASAP::O. Possible values in this version are \fBmsgpack\fR, \fBhdf5\fR and \fBseedee\fR. diff --git a/src/im-asapo.c b/src/im-asapo.c index 2e5ed504..95445fd0 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -49,6 +49,7 @@ struct im_asapo { char *stream; + int online_mode; AsapoConsumerHandle consumer; AsapoStringHandle group_id; }; @@ -100,7 +101,8 @@ struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *group_id, - const char *data_source) + const char *data_source, + const char *stream) { struct im_asapo *a; AsapoSourceCredentialsHandle cred; @@ -127,7 +129,30 @@ struct im_asapo *im_asapo_connect(const char *endpoint, asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(group_id); - a->stream = NULL; + if ( stream != NULL ) { + + /* Named stream mode */ + AsapoErrorHandle err = asapo_new_handle(); + + a->stream = strdup(stream); + + asapo_consumer_set_last_read_marker(a->consumer, + a->group_id, 0, + a->stream, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Failed to skip to start of stream", err); + } else { + STATUS("Skipped to start of stream %s\n", a->stream); + } + + asapo_free_handle(&err); + a->online_mode = 0; + + } else { + /* Online mode */ + a->stream = NULL; + a->online_mode = 1; + } return a; } @@ -244,7 +269,8 @@ static void skip_to_stream_end(struct im_asapo *a) void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, - char **pmeta, char **pfilename, char **pevent) + char **pmeta, char **pfilename, char **pevent, + int *pfinished) { void *data_copy; AsapoMessageMetaHandle meta; @@ -252,6 +278,8 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, AsapoErrorHandle err; uint64_t msg_size; + *pfinished = 0; + profile_start("select-stream"); if ( a->stream == NULL ) { if ( select_last_stream(a) ) { @@ -273,13 +301,18 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, a->stream, &err); profile_end("asapo-get-next"); if ( asapo_error_get_type(err) == kEndOfStream ) { - profile_start("next-stream"); - select_next_stream(a); - profile_end("next-stream"); asapo_free_handle(&err); asapo_free_handle(&meta); asapo_free_handle(&data); - return NULL; /* Sandbox will call try again very soon */ + if ( a->online_mode ) { + profile_start("next-stream"); + select_next_stream(a); + profile_end("next-stream"); + /* Sandbox will call to try again very soon */ + } else { + *pfinished = 1; + } + return NULL; } if ( asapo_is_error(err) ) { diff --git a/src/im-asapo.h b/src/im-asapo.h index 5fd7665c..ab68e1c2 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -40,12 +40,14 @@ extern struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *group_id, - const char *data_source); + const char *data_source, + const char *stream); extern void im_asapo_shutdown(struct im_asapo *a); extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, - char **pmeta, char **pfilename, char **pevent); + char **pmeta, char **pfilename, char **pevent, + int *pfinished); extern char *im_asapo_make_unique_group_id(const char *endpoint, const char *token); @@ -56,7 +58,8 @@ static UNUSED struct im_asapo *im_asapo_connect(const char *endpoint, const char *token, const char *beamtime, const char *group_id, - const char *data_source) + const char *data_source, + const char *stream) { return NULL; } @@ -66,12 +69,14 @@ static UNUSED void im_asapo_shutdown(struct im_asapo *a) } static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize, - char **pmeta, char **pfilename, char **pevent) + char **pmeta, char **pfilename, char **pevent, + int *pfinished) { *psize = 0; *pmeta = NULL; *pfilename = NULL; *pevent = NULL; + *pfinished = 1; return NULL; } diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 8137db9e..8e1e5004 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -112,6 +112,7 @@ struct sandbox const char *asapo_beamtime; const char *asapo_group_id; const char *asapo_source; + const char *asapo_stream; /* Final output */ Stream *stream; @@ -362,7 +363,8 @@ static int run_work(const struct index_args *iargs, Stream *st, sb->asapo_token, sb->asapo_beamtime, sb->asapo_group_id, - sb->asapo_source); + sb->asapo_source, + sb->asapo_stream); if ( asapostuff == NULL ) { ERROR("ASAP::O setup failed.\n"); return 1; @@ -480,13 +482,15 @@ static int run_work(const struct index_args *iargs, Stream *st, char *filename; char *event; + int finished = 0; profile_start("asapo-fetch"); pargs.asapo_data = im_asapo_fetch(asapostuff, &pargs.asapo_data_size, &pargs.asapo_meta, &filename, - &event); + &event, + &finished); profile_end("asapo-fetch"); if ( pargs.asapo_data != NULL ) { ok = 1; @@ -497,6 +501,11 @@ static int run_work(const struct index_args *iargs, Stream *st, free(pargs.event); pargs.filename = filename; pargs.event = event; + } else { + if ( finished ) { + sb->shared->should_shutdown = 1; + allDone = 1; + } } } else { @@ -1120,8 +1129,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, const char *asapo_endpoint, const char *asapo_token, - const char *asapo_beamtime, - const char *asapo_group_id, const char *asapo_source, + const char *asapo_beamtime, const char *asapo_group_id, + const char *asapo_source, const char *asapo_stream, int timeout, int profile) { int i; @@ -1168,6 +1177,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->asapo_token = asapo_token; sb->asapo_beamtime = asapo_beamtime; sb->asapo_source = asapo_source; + sb->asapo_stream = asapo_stream; } else { sb->asapo = 0; } diff --git a/src/im-sandbox.h b/src/im-sandbox.h index a1b9ed36..e1d2e1b9 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -88,8 +88,8 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, const char *asapo_endpoint, const char *asapo_token, - const char *asapo_beamtime, - const char *asapo_group_id, const char *asapo_source, + const char *asapo_beamtime, const char *asapo_group_id, + const char *asapo_source, const char *asapo_stream, int timeout, int profile); #endif /* IM_SANDBOX_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index dbea7261..6787f903 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -89,6 +89,7 @@ struct indexamajig_arguments char *asapo_beamtime; char *asapo_group_id; char *asapo_source; + char *asapo_stream; int serial_start; char *temp_location; int if_refine; @@ -436,6 +437,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) } break; + case 220 : + args->asapo_stream = strdup(arg); + break; + /* ---------- Peak search ---------- */ case 't' : @@ -857,6 +862,7 @@ int main(int argc, char *argv[]) args.asapo_beamtime = NULL; args.asapo_group_id = NULL; args.asapo_source = NULL; + args.asapo_stream = NULL; args.n_zmq_subscriptions = 0; args.serial_start = 1; args.if_peaks = 1; @@ -971,6 +977,7 @@ int main(int argc, char *argv[]) {"asapo-group", 217, "str", OPTION_NO_USAGE, "ASAP::O group ID"}, {"asapo-source", 218, "str", OPTION_NO_USAGE, "ASAP::O data source"}, {"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"}, + {"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, {"peaks", 301, "method", 0, "Peak search method. Default: zaef"}, @@ -1332,8 +1339,8 @@ int main(int argc, char *argv[]) args.zmq_addr, args.zmq_subscriptions, args.n_zmq_subscriptions, args.zmq_request, args.asapo_endpoint, args.asapo_token, - args.asapo_beamtime, - args.asapo_group_id, args.asapo_source, + args.asapo_beamtime, args.asapo_group_id, + args.asapo_source, args.asapo_stream, timeout, args.profile); cell_free(args.iargs.cell); |