aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/man/indexamajig.15
-rw-r--r--src/im-asapo.c47
-rw-r--r--src/im-asapo.h13
-rw-r--r--src/im-sandbox.c18
-rw-r--r--src/im-sandbox.h4
-rw-r--r--src/indexamajig.c11
6 files changed, 79 insertions, 19 deletions
diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1
index fa108270..25e025ac 100644
--- a/doc/man/indexamajig.1
+++ b/doc/man/indexamajig.1
@@ -199,6 +199,11 @@ Receive data via the specified ASAP::O endpoint. This option and \fB--zmq-input
Authentication token, beamtime, data source and consumer group, respectively, for ASAP::O data.
.PD 0
+.IP \fB--asapo-stream=\fIstream\fR
+.PD
+Name of ASAP::O stream to process. If this option is not given, indexamajig will start processing from the end of the current last stream.
+
+.PD 0
.IP \fB--data-format=\fIformat\fR
.PD
Specify the data format for data received over ZeroMQ or ASAP::O. Possible values in this version are \fBmsgpack\fR, \fBhdf5\fR and \fBseedee\fR.
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 2e5ed504..95445fd0 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -49,6 +49,7 @@
struct im_asapo
{
char *stream;
+ int online_mode;
AsapoConsumerHandle consumer;
AsapoStringHandle group_id;
};
@@ -100,7 +101,8 @@ struct im_asapo *im_asapo_connect(const char *endpoint,
const char *token,
const char *beamtime,
const char *group_id,
- const char *data_source)
+ const char *data_source,
+ const char *stream)
{
struct im_asapo *a;
AsapoSourceCredentialsHandle cred;
@@ -127,7 +129,30 @@ struct im_asapo *im_asapo_connect(const char *endpoint,
asapo_consumer_set_timeout(a->consumer, 3000);
a->group_id = asapo_string_from_c_str(group_id);
- a->stream = NULL;
+ if ( stream != NULL ) {
+
+ /* Named stream mode */
+ AsapoErrorHandle err = asapo_new_handle();
+
+ a->stream = strdup(stream);
+
+ asapo_consumer_set_last_read_marker(a->consumer,
+ a->group_id, 0,
+ a->stream, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Failed to skip to start of stream", err);
+ } else {
+ STATUS("Skipped to start of stream %s\n", a->stream);
+ }
+
+ asapo_free_handle(&err);
+ a->online_mode = 0;
+
+ } else {
+ /* Online mode */
+ a->stream = NULL;
+ a->online_mode = 1;
+ }
return a;
}
@@ -244,7 +269,8 @@ static void skip_to_stream_end(struct im_asapo *a)
void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
- char **pmeta, char **pfilename, char **pevent)
+ char **pmeta, char **pfilename, char **pevent,
+ int *pfinished)
{
void *data_copy;
AsapoMessageMetaHandle meta;
@@ -252,6 +278,8 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
AsapoErrorHandle err;
uint64_t msg_size;
+ *pfinished = 0;
+
profile_start("select-stream");
if ( a->stream == NULL ) {
if ( select_last_stream(a) ) {
@@ -273,13 +301,18 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
a->stream, &err);
profile_end("asapo-get-next");
if ( asapo_error_get_type(err) == kEndOfStream ) {
- profile_start("next-stream");
- select_next_stream(a);
- profile_end("next-stream");
asapo_free_handle(&err);
asapo_free_handle(&meta);
asapo_free_handle(&data);
- return NULL; /* Sandbox will call try again very soon */
+ if ( a->online_mode ) {
+ profile_start("next-stream");
+ select_next_stream(a);
+ profile_end("next-stream");
+ /* Sandbox will call to try again very soon */
+ } else {
+ *pfinished = 1;
+ }
+ return NULL;
}
if ( asapo_is_error(err) ) {
diff --git a/src/im-asapo.h b/src/im-asapo.h
index 5fd7665c..ab68e1c2 100644
--- a/src/im-asapo.h
+++ b/src/im-asapo.h
@@ -40,12 +40,14 @@ extern struct im_asapo *im_asapo_connect(const char *endpoint,
const char *token,
const char *beamtime,
const char *group_id,
- const char *data_source);
+ const char *data_source,
+ const char *stream);
extern void im_asapo_shutdown(struct im_asapo *a);
extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
- char **pmeta, char **pfilename, char **pevent);
+ char **pmeta, char **pfilename, char **pevent,
+ int *pfinished);
extern char *im_asapo_make_unique_group_id(const char *endpoint,
const char *token);
@@ -56,7 +58,8 @@ static UNUSED struct im_asapo *im_asapo_connect(const char *endpoint,
const char *token,
const char *beamtime,
const char *group_id,
- const char *data_source)
+ const char *data_source,
+ const char *stream)
{
return NULL;
}
@@ -66,12 +69,14 @@ static UNUSED void im_asapo_shutdown(struct im_asapo *a)
}
static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize,
- char **pmeta, char **pfilename, char **pevent)
+ char **pmeta, char **pfilename, char **pevent,
+ int *pfinished)
{
*psize = 0;
*pmeta = NULL;
*pfilename = NULL;
*pevent = NULL;
+ *pfinished = 1;
return NULL;
}
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 8137db9e..8e1e5004 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -112,6 +112,7 @@ struct sandbox
const char *asapo_beamtime;
const char *asapo_group_id;
const char *asapo_source;
+ const char *asapo_stream;
/* Final output */
Stream *stream;
@@ -362,7 +363,8 @@ static int run_work(const struct index_args *iargs, Stream *st,
sb->asapo_token,
sb->asapo_beamtime,
sb->asapo_group_id,
- sb->asapo_source);
+ sb->asapo_source,
+ sb->asapo_stream);
if ( asapostuff == NULL ) {
ERROR("ASAP::O setup failed.\n");
return 1;
@@ -480,13 +482,15 @@ static int run_work(const struct index_args *iargs, Stream *st,
char *filename;
char *event;
+ int finished = 0;
profile_start("asapo-fetch");
pargs.asapo_data = im_asapo_fetch(asapostuff,
&pargs.asapo_data_size,
&pargs.asapo_meta,
&filename,
- &event);
+ &event,
+ &finished);
profile_end("asapo-fetch");
if ( pargs.asapo_data != NULL ) {
ok = 1;
@@ -497,6 +501,11 @@ static int run_work(const struct index_args *iargs, Stream *st,
free(pargs.event);
pargs.filename = filename;
pargs.event = event;
+ } else {
+ if ( finished ) {
+ sb->shared->should_shutdown = 1;
+ allDone = 1;
+ }
}
} else {
@@ -1120,8 +1129,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
const char *zmq_address, char **zmq_subscriptions,
int n_zmq_subscriptions, const char *zmq_request,
const char *asapo_endpoint, const char *asapo_token,
- const char *asapo_beamtime,
- const char *asapo_group_id, const char *asapo_source,
+ const char *asapo_beamtime, const char *asapo_group_id,
+ const char *asapo_source, const char *asapo_stream,
int timeout, int profile)
{
int i;
@@ -1168,6 +1177,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->asapo_token = asapo_token;
sb->asapo_beamtime = asapo_beamtime;
sb->asapo_source = asapo_source;
+ sb->asapo_stream = asapo_stream;
} else {
sb->asapo = 0;
}
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index a1b9ed36..e1d2e1b9 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -88,8 +88,8 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
const char *zmq_address, char **zmq_subscriptions,
int n_zmq_subscriptions, const char *zmq_request,
const char *asapo_endpoint, const char *asapo_token,
- const char *asapo_beamtime,
- const char *asapo_group_id, const char *asapo_source,
+ const char *asapo_beamtime, const char *asapo_group_id,
+ const char *asapo_source, const char *asapo_stream,
int timeout, int profile);
#endif /* IM_SANDBOX_H */
diff --git a/src/indexamajig.c b/src/indexamajig.c
index dbea7261..6787f903 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -89,6 +89,7 @@ struct indexamajig_arguments
char *asapo_beamtime;
char *asapo_group_id;
char *asapo_source;
+ char *asapo_stream;
int serial_start;
char *temp_location;
int if_refine;
@@ -436,6 +437,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
}
break;
+ case 220 :
+ args->asapo_stream = strdup(arg);
+ break;
+
/* ---------- Peak search ---------- */
case 't' :
@@ -857,6 +862,7 @@ int main(int argc, char *argv[])
args.asapo_beamtime = NULL;
args.asapo_group_id = NULL;
args.asapo_source = NULL;
+ args.asapo_stream = NULL;
args.n_zmq_subscriptions = 0;
args.serial_start = 1;
args.if_peaks = 1;
@@ -971,6 +977,7 @@ int main(int argc, char *argv[])
{"asapo-group", 217, "str", OPTION_NO_USAGE, "ASAP::O group ID"},
{"asapo-source", 218, "str", OPTION_NO_USAGE, "ASAP::O data source"},
{"data-format", 219, "str", OPTION_NO_USAGE, "Streamed data format"},
+ {"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
@@ -1332,8 +1339,8 @@ int main(int argc, char *argv[])
args.zmq_addr, args.zmq_subscriptions,
args.n_zmq_subscriptions, args.zmq_request,
args.asapo_endpoint, args.asapo_token,
- args.asapo_beamtime,
- args.asapo_group_id, args.asapo_source,
+ args.asapo_beamtime, args.asapo_group_id,
+ args.asapo_source, args.asapo_stream,
timeout, args.profile);
cell_free(args.iargs.cell);