aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2022-06-22 14:48:40 +0200
committerThomas White <taw@physics.org>2022-06-22 15:26:10 +0200
commitaaf02b5fe6bec4745e0e88cf345e420715867b2e (patch)
treefc81995fce18d16ee41dc868bf1f8827a31fb389 /src
parent60df21975c5cbac190bf1e7b4d2e6627f1685dcf (diff)
indexamajig: Wrap ASAP::O parameters up inside separate structure
Diffstat (limited to 'src')
-rw-r--r--src/im-asapo.c29
-rw-r--r--src/im-asapo.h24
-rw-r--r--src/im-sandbox.c44
-rw-r--r--src/im-sandbox.h5
-rw-r--r--src/indexamajig.c43
5 files changed, 55 insertions, 90 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 2685036b..b19c5bdc 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -62,34 +62,29 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err)
}
-struct im_asapo *im_asapo_connect(const char *endpoint,
- const char *token,
- const char *beamtime,
- const char *group_id,
- const char *data_source,
- const char *stream)
+struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
{
struct im_asapo *a;
AsapoSourceCredentialsHandle cred;
AsapoErrorHandle err = asapo_new_handle();
- if ( endpoint == NULL ) {
+ if ( params->endpoint == NULL ) {
ERROR("ASAP::O endpoint not specified.\n");
return NULL;
}
- if ( beamtime == NULL ) {
+ if ( params->beamtime == NULL ) {
ERROR("ASAP::O beamtime not specified.\n");
return NULL;
}
- if ( group_id == NULL ) {
+ if ( params->group_id == NULL ) {
ERROR("ASAP::O consumer group ID not specified.\n");
return NULL;
}
- if ( data_source == NULL ) {
+ if ( params->source == NULL ) {
ERROR("ASAP::O data source not specified.\n");
return NULL;
}
- if ( stream == NULL ) {
+ if ( params->stream == NULL ) {
ERROR("ASAP::O stream not specified.\n");
return NULL;
}
@@ -100,11 +95,11 @@ struct im_asapo *im_asapo_connect(const char *endpoint,
cred = asapo_create_source_credentials(kProcessed,
"auto", /* instance ID */
"indexamajig", /* pipeline step */
- beamtime,
+ params->beamtime,
"", /* beamline */
- data_source,
- token);
- a->consumer = asapo_create_consumer(endpoint, "auto", 0, cred, &err);
+ params->source,
+ params->token);
+ a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err);
asapo_free_handle(&cred);
if ( asapo_is_error(err) ) {
show_asapo_error("Cannot create ASAP::O consumer", err);
@@ -112,9 +107,9 @@ struct im_asapo *im_asapo_connect(const char *endpoint,
return NULL;
}
- a->stream = strdup(stream);
+ a->stream = strdup(params->stream);
asapo_consumer_set_timeout(a->consumer, 3000);
- a->group_id = asapo_string_from_c_str(group_id);
+ a->group_id = asapo_string_from_c_str(params->group_id);
return a;
}
diff --git a/src/im-asapo.h b/src/im-asapo.h
index 0a11829f..f2e9e40a 100644
--- a/src/im-asapo.h
+++ b/src/im-asapo.h
@@ -34,14 +34,19 @@
#include <config.h>
#endif
+struct im_asapo_params
+{
+ char *endpoint;
+ char *token;
+ char *beamtime;
+ char *group_id;
+ char *source;
+ char *stream;
+};
+
#if defined(HAVE_ASAPO)
-extern struct im_asapo *im_asapo_connect(const char *endpoint,
- const char *token,
- const char *beamtime,
- const char *group_id,
- const char *data_source,
- const char *stream);
+extern struct im_asapo *im_asapo_connect(struct im_asapo_params *params);
extern void im_asapo_shutdown(struct im_asapo *a);
@@ -51,12 +56,7 @@ extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
#else /* defined(HAVE_ASAPO) */
-static UNUSED struct im_asapo *im_asapo_connect(const char *endpoint,
- const char *token,
- const char *beamtime,
- const char *group_id,
- const char *data_source,
- const char *stream)
+static UNUSED struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
{
ERROR("This installation of CrystFEL was compiled without ASAP::O support.\n");
return NULL;
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 44a04338..de2e08b6 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -105,14 +105,8 @@ struct sandbox
int n_zmq_subscriptions;
const char *zmq_request;
- /* ASAP::O mode */
- int asapo;
- const char *asapo_endpoint;
- const char *asapo_token;
- const char *asapo_beamtime;
- const char *asapo_group_id;
- const char *asapo_source;
- const char *asapo_stream;
+ /* If non-NULL, we are using ASAP::O */
+ struct im_asapo_params *asapo_params;
/* Final output */
Stream *stream;
@@ -358,13 +352,8 @@ static int run_work(const struct index_args *iargs, Stream *st,
}
}
- if ( sb->asapo ) {
- asapostuff = im_asapo_connect(sb->asapo_endpoint,
- sb->asapo_token,
- sb->asapo_beamtime,
- sb->asapo_group_id,
- sb->asapo_source,
- sb->asapo_stream);
+ if ( sb->asapo_params != NULL ) {
+ asapostuff = im_asapo_connect(sb->asapo_params);
if ( asapostuff == NULL ) {
ERROR("ASAP::O setup failed.\n");
sb->shared->should_shutdown = 1;
@@ -479,7 +468,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
* importantly, the event queue gave us a unique
* serial number for this image. */
- } else if ( sb->asapo ) {
+ } else if ( sb->asapo_params != NULL ) {
char *filename;
char *event;
@@ -919,7 +908,7 @@ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb)
filename = "ZMQdata";
evstr = malloc(64);
snprintf(evstr, 64, "//%i", sb->serial);
- } else if ( sb->asapo ) {
+ } else if ( sb->asapo_params != NULL ) {
filename = "ASAPOdata";
evstr = malloc(64);
snprintf(evstr, 64, "//%i", sb->serial);
@@ -1130,9 +1119,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
Stream *stream, const char *tmpdir, int serial_start,
const char *zmq_address, char **zmq_subscriptions,
int n_zmq_subscriptions, const char *zmq_request,
- const char *asapo_endpoint, const char *asapo_token,
- const char *asapo_beamtime, const char *asapo_group_id,
- const char *asapo_source, const char *asapo_stream,
+ struct im_asapo_params *asapo_params,
int timeout, int profile)
{
int i;
@@ -1173,27 +1160,18 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->zmq = 0;
}
- if ( asapo_endpoint != NULL ) {
- sb->asapo = 1;
- sb->asapo_endpoint = asapo_endpoint;
- sb->asapo_token = asapo_token;
- sb->asapo_beamtime = asapo_beamtime;
- sb->asapo_source = asapo_source;
- sb->asapo_stream = asapo_stream;
+ if ( asapo_params->endpoint != NULL ) {
+ sb->asapo_params = asapo_params;
} else {
- sb->asapo = 0;
+ sb->asapo_params = NULL;
}
- if ( sb->zmq && sb->asapo ) {
+ if ( sb->zmq && sb->asapo_params ) {
ERROR("Cannot simultaneously use ZMQ and ASAP::O input.\n");
free(sb);
return 0;
}
- if ( sb->asapo ) {
- sb->asapo_group_id = strdup(asapo_group_id);
- }
-
sb->fds = NULL;
sb->fhs = NULL;
sb->stream = stream;
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index e1d2e1b9..78f542b9 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -42,6 +42,7 @@ struct sb_shm;
#include "stream.h"
#include "cell.h"
#include "process_image.h"
+#include "im-asapo.h"
/* Length of event queue */
#define QUEUE_SIZE (256)
@@ -87,9 +88,7 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
const char *tempdir, int serial_start,
const char *zmq_address, char **zmq_subscriptions,
int n_zmq_subscriptions, const char *zmq_request,
- const char *asapo_endpoint, const char *asapo_token,
- const char *asapo_beamtime, const char *asapo_group_id,
- const char *asapo_source, const char *asapo_stream,
+ struct im_asapo_params *asapo_params,
int timeout, int profile);
#endif /* IM_SANDBOX_H */
diff --git a/src/indexamajig.c b/src/indexamajig.c
index b31d2e10..020612a8 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -63,6 +63,7 @@
#include <datatemplate.h>
#include "im-sandbox.h"
+#include "im-asapo.h"
#include "version.h"
#include "json-utils.h"
@@ -84,12 +85,7 @@ struct indexamajig_arguments
char *zmq_request;
char *zmq_subscriptions[256];
int n_zmq_subscriptions;
- char *asapo_endpoint;
- char *asapo_token;
- char *asapo_beamtime;
- char *asapo_group_id;
- char *asapo_source;
- char *asapo_stream;
+ struct im_asapo_params asapo_params;
int serial_start;
char *temp_location;
int if_refine;
@@ -410,23 +406,23 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
break;
case 213 :
- args->asapo_endpoint = strdup(arg);
+ args->asapo_params.endpoint = strdup(arg);
break;
case 214 :
- args->asapo_token = strdup(arg);
+ args->asapo_params.token = strdup(arg);
break;
case 215 :
- args->asapo_beamtime = strdup(arg);
+ args->asapo_params.beamtime = strdup(arg);
break;
case 217 :
- args->asapo_group_id = strdup(arg);
+ args->asapo_params.group_id = strdup(arg);
break;
case 218 :
- args->asapo_source = strdup(arg);
+ args->asapo_params.source = strdup(arg);
break;
case 219 :
@@ -438,7 +434,7 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
break;
case 220 :
- args->asapo_stream = strdup(arg);
+ args->asapo_params.stream = strdup(arg);
break;
/* ---------- Peak search ---------- */
@@ -857,12 +853,12 @@ int main(int argc, char *argv[])
args.basename = 0;
args.zmq_addr = NULL;
args.zmq_request = NULL;
- args.asapo_endpoint = NULL;
- args.asapo_token = NULL;
- args.asapo_beamtime = NULL;
- args.asapo_group_id = NULL;
- args.asapo_source = NULL;
- args.asapo_stream = NULL;
+ args.asapo_params.endpoint = NULL;
+ args.asapo_params.token = NULL;
+ args.asapo_params.beamtime = NULL;
+ args.asapo_params.group_id = NULL;
+ args.asapo_params.source = NULL;
+ args.asapo_params.stream = NULL;
args.n_zmq_subscriptions = 0;
args.serial_start = 1;
args.if_peaks = 1;
@@ -1096,7 +1092,7 @@ int main(int argc, char *argv[])
/* Check for minimal information */
if ( (args.filename == NULL)
&& (args.zmq_addr == NULL)
- && (args.asapo_endpoint == NULL) ) {
+ && (args.asapo_params.endpoint == NULL) ) {
ERROR("You need to provide the input filename (use -i)\n");
return 1;
}
@@ -1115,13 +1111,13 @@ int main(int argc, char *argv[])
return 1;
}
- if ( (args.filename != NULL) && (args.asapo_endpoint != NULL) ) {
+ if ( (args.filename != NULL) && (args.asapo_params.endpoint != NULL) ) {
ERROR("The options --input and --asapo-endpoint are mutually "
"exclusive.\n");
return 1;
}
- if ( (args.asapo_endpoint != NULL) && (args.zmq_addr != NULL) ) {
+ if ( (args.asapo_params.endpoint != NULL) && (args.zmq_addr != NULL) ) {
ERROR("The options --asapo-endpoint and --zmq-input are mutually "
"exclusive.\n");
return 1;
@@ -1351,10 +1347,7 @@ int main(int argc, char *argv[])
fh, st, tmpdir, args.serial_start,
args.zmq_addr, args.zmq_subscriptions,
args.n_zmq_subscriptions, args.zmq_request,
- args.asapo_endpoint, args.asapo_token,
- args.asapo_beamtime, args.asapo_group_id,
- args.asapo_source, args.asapo_stream,
- timeout, args.profile);
+ &args.asapo_params, timeout, args.profile);
cell_free(args.iargs.cell);
free(args.prefix);