From aaf02b5fe6bec4745e0e88cf345e420715867b2e Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 22 Jun 2022 14:48:40 +0200 Subject: indexamajig: Wrap ASAP::O parameters up inside separate structure --- src/im-asapo.c | 29 ++++++++++++----------------- src/im-asapo.h | 24 ++++++++++++------------ src/im-sandbox.c | 44 +++++++++++--------------------------------- src/im-sandbox.h | 5 ++--- src/indexamajig.c | 43 ++++++++++++++++++------------------------- 5 files changed, 55 insertions(+), 90 deletions(-) (limited to 'src') diff --git a/src/im-asapo.c b/src/im-asapo.c index 2685036b..b19c5bdc 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -62,34 +62,29 @@ static void show_asapo_error(const char *msg, const AsapoErrorHandle err) } -struct im_asapo *im_asapo_connect(const char *endpoint, - const char *token, - const char *beamtime, - const char *group_id, - const char *data_source, - const char *stream) +struct im_asapo *im_asapo_connect(struct im_asapo_params *params) { struct im_asapo *a; AsapoSourceCredentialsHandle cred; AsapoErrorHandle err = asapo_new_handle(); - if ( endpoint == NULL ) { + if ( params->endpoint == NULL ) { ERROR("ASAP::O endpoint not specified.\n"); return NULL; } - if ( beamtime == NULL ) { + if ( params->beamtime == NULL ) { ERROR("ASAP::O beamtime not specified.\n"); return NULL; } - if ( group_id == NULL ) { + if ( params->group_id == NULL ) { ERROR("ASAP::O consumer group ID not specified.\n"); return NULL; } - if ( data_source == NULL ) { + if ( params->source == NULL ) { ERROR("ASAP::O data source not specified.\n"); return NULL; } - if ( stream == NULL ) { + if ( params->stream == NULL ) { ERROR("ASAP::O stream not specified.\n"); return NULL; } @@ -100,11 +95,11 @@ struct im_asapo *im_asapo_connect(const char *endpoint, cred = asapo_create_source_credentials(kProcessed, "auto", /* instance ID */ "indexamajig", /* pipeline step */ - beamtime, + params->beamtime, "", /* beamline */ - data_source, - token); - a->consumer = asapo_create_consumer(endpoint, "auto", 0, cred, &err); + params->source, + params->token); + a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err); asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O consumer", err); @@ -112,9 +107,9 @@ struct im_asapo *im_asapo_connect(const char *endpoint, return NULL; } - a->stream = strdup(stream); + a->stream = strdup(params->stream); asapo_consumer_set_timeout(a->consumer, 3000); - a->group_id = asapo_string_from_c_str(group_id); + a->group_id = asapo_string_from_c_str(params->group_id); return a; } diff --git a/src/im-asapo.h b/src/im-asapo.h index 0a11829f..f2e9e40a 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -34,14 +34,19 @@ #include #endif +struct im_asapo_params +{ + char *endpoint; + char *token; + char *beamtime; + char *group_id; + char *source; + char *stream; +}; + #if defined(HAVE_ASAPO) -extern struct im_asapo *im_asapo_connect(const char *endpoint, - const char *token, - const char *beamtime, - const char *group_id, - const char *data_source, - const char *stream); +extern struct im_asapo *im_asapo_connect(struct im_asapo_params *params); extern void im_asapo_shutdown(struct im_asapo *a); @@ -51,12 +56,7 @@ extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, #else /* defined(HAVE_ASAPO) */ -static UNUSED struct im_asapo *im_asapo_connect(const char *endpoint, - const char *token, - const char *beamtime, - const char *group_id, - const char *data_source, - const char *stream) +static UNUSED struct im_asapo *im_asapo_connect(struct im_asapo_params *params) { ERROR("This installation of CrystFEL was compiled without ASAP::O support.\n"); return NULL; diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 44a04338..de2e08b6 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -105,14 +105,8 @@ struct sandbox int n_zmq_subscriptions; const char *zmq_request; - /* ASAP::O mode */ - int asapo; - const char *asapo_endpoint; - const char *asapo_token; - const char *asapo_beamtime; - const char *asapo_group_id; - const char *asapo_source; - const char *asapo_stream; + /* If non-NULL, we are using ASAP::O */ + struct im_asapo_params *asapo_params; /* Final output */ Stream *stream; @@ -358,13 +352,8 @@ static int run_work(const struct index_args *iargs, Stream *st, } } - if ( sb->asapo ) { - asapostuff = im_asapo_connect(sb->asapo_endpoint, - sb->asapo_token, - sb->asapo_beamtime, - sb->asapo_group_id, - sb->asapo_source, - sb->asapo_stream); + if ( sb->asapo_params != NULL ) { + asapostuff = im_asapo_connect(sb->asapo_params); if ( asapostuff == NULL ) { ERROR("ASAP::O setup failed.\n"); sb->shared->should_shutdown = 1; @@ -479,7 +468,7 @@ static int run_work(const struct index_args *iargs, Stream *st, * importantly, the event queue gave us a unique * serial number for this image. */ - } else if ( sb->asapo ) { + } else if ( sb->asapo_params != NULL ) { char *filename; char *event; @@ -919,7 +908,7 @@ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb) filename = "ZMQdata"; evstr = malloc(64); snprintf(evstr, 64, "//%i", sb->serial); - } else if ( sb->asapo ) { + } else if ( sb->asapo_params != NULL ) { filename = "ASAPOdata"; evstr = malloc(64); snprintf(evstr, 64, "//%i", sb->serial); @@ -1130,9 +1119,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, Stream *stream, const char *tmpdir, int serial_start, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, - const char *asapo_endpoint, const char *asapo_token, - const char *asapo_beamtime, const char *asapo_group_id, - const char *asapo_source, const char *asapo_stream, + struct im_asapo_params *asapo_params, int timeout, int profile) { int i; @@ -1173,27 +1160,18 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->zmq = 0; } - if ( asapo_endpoint != NULL ) { - sb->asapo = 1; - sb->asapo_endpoint = asapo_endpoint; - sb->asapo_token = asapo_token; - sb->asapo_beamtime = asapo_beamtime; - sb->asapo_source = asapo_source; - sb->asapo_stream = asapo_stream; + if ( asapo_params->endpoint != NULL ) { + sb->asapo_params = asapo_params; } else { - sb->asapo = 0; + sb->asapo_params = NULL; } - if ( sb->zmq && sb->asapo ) { + if ( sb->zmq && sb->asapo_params ) { ERROR("Cannot simultaneously use ZMQ and ASAP::O input.\n"); free(sb); return 0; } - if ( sb->asapo ) { - sb->asapo_group_id = strdup(asapo_group_id); - } - sb->fds = NULL; sb->fhs = NULL; sb->stream = stream; diff --git a/src/im-sandbox.h b/src/im-sandbox.h index e1d2e1b9..78f542b9 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -42,6 +42,7 @@ struct sb_shm; #include "stream.h" #include "cell.h" #include "process_image.h" +#include "im-asapo.h" /* Length of event queue */ #define QUEUE_SIZE (256) @@ -87,9 +88,7 @@ extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, const char *tempdir, int serial_start, const char *zmq_address, char **zmq_subscriptions, int n_zmq_subscriptions, const char *zmq_request, - const char *asapo_endpoint, const char *asapo_token, - const char *asapo_beamtime, const char *asapo_group_id, - const char *asapo_source, const char *asapo_stream, + struct im_asapo_params *asapo_params, int timeout, int profile); #endif /* IM_SANDBOX_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index b31d2e10..020612a8 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -63,6 +63,7 @@ #include #include "im-sandbox.h" +#include "im-asapo.h" #include "version.h" #include "json-utils.h" @@ -84,12 +85,7 @@ struct indexamajig_arguments char *zmq_request; char *zmq_subscriptions[256]; int n_zmq_subscriptions; - char *asapo_endpoint; - char *asapo_token; - char *asapo_beamtime; - char *asapo_group_id; - char *asapo_source; - char *asapo_stream; + struct im_asapo_params asapo_params; int serial_start; char *temp_location; int if_refine; @@ -410,23 +406,23 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) break; case 213 : - args->asapo_endpoint = strdup(arg); + args->asapo_params.endpoint = strdup(arg); break; case 214 : - args->asapo_token = strdup(arg); + args->asapo_params.token = strdup(arg); break; case 215 : - args->asapo_beamtime = strdup(arg); + args->asapo_params.beamtime = strdup(arg); break; case 217 : - args->asapo_group_id = strdup(arg); + args->asapo_params.group_id = strdup(arg); break; case 218 : - args->asapo_source = strdup(arg); + args->asapo_params.source = strdup(arg); break; case 219 : @@ -438,7 +434,7 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) break; case 220 : - args->asapo_stream = strdup(arg); + args->asapo_params.stream = strdup(arg); break; /* ---------- Peak search ---------- */ @@ -857,12 +853,12 @@ int main(int argc, char *argv[]) args.basename = 0; args.zmq_addr = NULL; args.zmq_request = NULL; - args.asapo_endpoint = NULL; - args.asapo_token = NULL; - args.asapo_beamtime = NULL; - args.asapo_group_id = NULL; - args.asapo_source = NULL; - args.asapo_stream = NULL; + args.asapo_params.endpoint = NULL; + args.asapo_params.token = NULL; + args.asapo_params.beamtime = NULL; + args.asapo_params.group_id = NULL; + args.asapo_params.source = NULL; + args.asapo_params.stream = NULL; args.n_zmq_subscriptions = 0; args.serial_start = 1; args.if_peaks = 1; @@ -1096,7 +1092,7 @@ int main(int argc, char *argv[]) /* Check for minimal information */ if ( (args.filename == NULL) && (args.zmq_addr == NULL) - && (args.asapo_endpoint == NULL) ) { + && (args.asapo_params.endpoint == NULL) ) { ERROR("You need to provide the input filename (use -i)\n"); return 1; } @@ -1115,13 +1111,13 @@ int main(int argc, char *argv[]) return 1; } - if ( (args.filename != NULL) && (args.asapo_endpoint != NULL) ) { + if ( (args.filename != NULL) && (args.asapo_params.endpoint != NULL) ) { ERROR("The options --input and --asapo-endpoint are mutually " "exclusive.\n"); return 1; } - if ( (args.asapo_endpoint != NULL) && (args.zmq_addr != NULL) ) { + if ( (args.asapo_params.endpoint != NULL) && (args.zmq_addr != NULL) ) { ERROR("The options --asapo-endpoint and --zmq-input are mutually " "exclusive.\n"); return 1; @@ -1351,10 +1347,7 @@ int main(int argc, char *argv[]) fh, st, tmpdir, args.serial_start, args.zmq_addr, args.zmq_subscriptions, args.n_zmq_subscriptions, args.zmq_request, - args.asapo_endpoint, args.asapo_token, - args.asapo_beamtime, args.asapo_group_id, - args.asapo_source, args.asapo_stream, - timeout, args.profile); + &args.asapo_params, timeout, args.profile); cell_free(args.iargs.cell); free(args.prefix); -- cgit v1.2.3