From 097e9601f8d869a6f8734bbc7aaa24a22088b909 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 22 Jun 2022 15:27:11 +0200 Subject: indexamajig: Wrap ZMQ parameters into separate structure --- src/im-sandbox.c | 35 ++++++++++++----------------------- src/im-sandbox.h | 4 ++-- src/im-zmq.c | 27 ++++++++++++--------------- src/im-zmq.h | 18 ++++++++++-------- src/indexamajig.c | 33 +++++++++++++++------------------ 5 files changed, 51 insertions(+), 66 deletions(-) diff --git a/src/im-sandbox.c b/src/im-sandbox.c index de2e08b6..1e4a95a6 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -98,12 +98,8 @@ struct sandbox const char *tmpdir; - /* ZMQ mode */ - int zmq; - const char *zmq_address; - char **zmq_subscriptions; - int n_zmq_subscriptions; - const char *zmq_request; + /* If non-NULL, we are using ZMQ */ + struct im_zmq_params *zmq_params; /* If non-NULL, we are using ASAP::O */ struct im_asapo_params *asapo_params; @@ -341,11 +337,8 @@ static int run_work(const struct index_args *iargs, Stream *st, } /* Connect via ZMQ */ - if ( sb->zmq ) { - zmqstuff = im_zmq_connect(sb->zmq_address, - sb->zmq_subscriptions, - sb->n_zmq_subscriptions, - sb->zmq_request); + if ( sb->zmq_params != NULL ) { + zmqstuff = im_zmq_connect(sb->zmq_params); if ( zmqstuff == NULL ) { ERROR("ZMQ setup failed.\n"); return 1; @@ -455,7 +448,7 @@ static int run_work(const struct index_args *iargs, Stream *st, pargs.asapo_data_size = 0; pargs.asapo_meta = NULL; - if ( sb->zmq ) { + if ( sb->zmq_params != NULL ) { do { pargs.zmq_data = im_zmq_fetch(zmqstuff, @@ -898,7 +891,7 @@ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb) char *filename; char *evstr; - if ( sb->zmq ) { + if ( sb->zmq_params != NULL ) { /* These are just semi-meaningful placeholder values to * be put into the queue, instead of "(null)". * A unique filename is needed so that the GUI can @@ -1117,8 +1110,7 @@ char *create_tempdir(const char *temp_location) int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, const char *tmpdir, int serial_start, - const char *zmq_address, char **zmq_subscriptions, - int n_zmq_subscriptions, const char *zmq_request, + struct im_zmq_params *zmq_params, struct im_asapo_params *asapo_params, int timeout, int profile) { @@ -1150,14 +1142,11 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->tmpdir = tmpdir; sb->profile = profile; sb->timeout = timeout; - if ( zmq_address != NULL ) { - sb->zmq = 1; - sb->zmq_address = zmq_address; - sb->zmq_subscriptions = zmq_subscriptions; - sb->n_zmq_subscriptions = n_zmq_subscriptions; - sb->zmq_request = zmq_request; + + if ( zmq_params->addr != NULL ) { + sb->zmq_params = zmq_params; } else { - sb->zmq = 0; + sb->zmq_params = NULL; } if ( asapo_params->endpoint != NULL ) { @@ -1166,7 +1155,7 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->asapo_params = NULL; } - if ( sb->zmq && sb->asapo_params ) { + if ( sb->zmq_params && sb->asapo_params ) { ERROR("Cannot simultaneously use ZMQ and ASAP::O input.\n"); free(sb); return 0; diff --git a/src/im-sandbox.h b/src/im-sandbox.h index 78f542b9..52094a3d 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -42,6 +42,7 @@ struct sb_shm; #include "stream.h" #include "cell.h" #include "process_image.h" +#include "im-zmq.h" #include "im-asapo.h" /* Length of event queue */ @@ -86,8 +87,7 @@ extern void set_last_task(char *lt, const char *task); extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, const char *tempdir, int serial_start, - const char *zmq_address, char **zmq_subscriptions, - int n_zmq_subscriptions, const char *zmq_request, + struct im_zmq_params *zmq_params, struct im_asapo_params *asapo_params, int timeout, int profile); diff --git a/src/im-zmq.c b/src/im-zmq.c index 3fd04428..b326e868 100644 --- a/src/im-zmq.c +++ b/src/im-zmq.c @@ -56,10 +56,7 @@ struct im_zmq }; -struct im_zmq *im_zmq_connect(const char *zmq_address, - char **subscriptions, - int n_subscriptions, - const char *zmq_request) +struct im_zmq *im_zmq_connect(struct im_zmq_params *params) { struct im_zmq *z; @@ -72,11 +69,11 @@ struct im_zmq *im_zmq_connect(const char *zmq_address, return NULL; } - if ( zmq_request == NULL ) { - STATUS("Connecting ZMQ subscriber to '%s'\n", zmq_address); + if ( params->request == NULL ) { + STATUS("Connecting ZMQ subscriber to '%s'\n", params->addr); z->socket = zmq_socket(z->ctx, ZMQ_SUB); } else { - STATUS("Connecting ZMQ requester to '%s'\n", zmq_address); + STATUS("Connecting ZMQ requester to '%s'\n", params->addr); z->socket = zmq_socket(z->ctx, ZMQ_REQ); } if ( z->socket == NULL ) { @@ -84,26 +81,26 @@ struct im_zmq *im_zmq_connect(const char *zmq_address, return NULL; } - if ( zmq_connect(z->socket, zmq_address) == -1 ) { + if ( zmq_connect(z->socket, params->addr) == -1 ) { ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno)); free(z); return NULL; } - if ( zmq_request == NULL ) { + if ( params->request == NULL ) { int i; /* SUB mode */ - if ( n_subscriptions == 0 ) { + if ( params->n_subscriptions == 0 ) { ERROR("WARNING: No ZeroMQ subscriptions. You should " "probably try again with --zmq-subscribe.\n"); } - for ( i=0; in_subscriptions; i++ ) { + STATUS("Subscribing to '%s'\n", params->subscriptions[i]); if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, - subscriptions[i], - strlen(subscriptions[i])) ) + params->subscriptions[i], + strlen(params->subscriptions[i])) ) { ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno)); @@ -116,7 +113,7 @@ struct im_zmq *im_zmq_connect(const char *zmq_address, } else { /* REQ mode */ - z->request_str = zmq_request; + z->request_str = params->request; } diff --git a/src/im-zmq.h b/src/im-zmq.h index 88c0a568..cfb98ff4 100644 --- a/src/im-zmq.h +++ b/src/im-zmq.h @@ -36,21 +36,23 @@ #include #endif +struct im_zmq_params +{ + char *addr; + char *request; + char *subscriptions[256]; + int n_subscriptions; +}; + #if defined(HAVE_ZMQ) -extern struct im_zmq *im_zmq_connect(const char *zmq_address, - char **subscriptions, - int n_subscriptions, - const char *zmq_request); +extern struct im_zmq *im_zmq_connect(struct im_zmq_params *params); extern void im_zmq_shutdown(struct im_zmq *z); extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size); #else /* defined(HAVE_ZMQ) */ -static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address, - char **zmq_subscriptions, - int n_subscriptions, - const char *zmq_request) { return NULL; } +static UNUSED struct im_zmq *im_zmq_connect(struct im_zmq_params *params) { return NULL; } static UNUSED void im_zmq_shutdown(struct im_zmq *z) { } static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; } diff --git a/src/indexamajig.c b/src/indexamajig.c index 020612a8..af1a5704 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -63,6 +63,7 @@ #include #include "im-sandbox.h" +#include "im-zmq.h" #include "im-asapo.h" #include "version.h" #include "json-utils.h" @@ -81,10 +82,7 @@ struct indexamajig_arguments char *cellfile; char *indm_str; int basename; - char *zmq_addr; - char *zmq_request; - char *zmq_subscriptions[256]; - int n_zmq_subscriptions; + struct im_zmq_params zmq_params; struct im_asapo_params asapo_params; int serial_start; char *temp_location; @@ -378,7 +376,7 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) break; case 207 : - args->zmq_addr = strdup(arg); + args->zmq_params.addr = strdup(arg); break; case 208 : @@ -394,15 +392,15 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) break; case 211 : - if ( args->n_zmq_subscriptions == 256 ) { + if ( args->zmq_params.n_subscriptions == 256 ) { ERROR("Too many ZMQ subscriptions.\n"); return 1; } - args->zmq_subscriptions[args->n_zmq_subscriptions++] = strdup(arg); + args->zmq_params.subscriptions[args->zmq_params.n_subscriptions++] = strdup(arg); break; case 212 : - args->zmq_request = strdup(arg); + args->zmq_params.request = strdup(arg); break; case 213 : @@ -851,15 +849,15 @@ int main(int argc, char *argv[]) args.cellfile = NULL; args.indm_str = NULL; args.basename = 0; - args.zmq_addr = NULL; - args.zmq_request = NULL; + args.zmq_params.addr = NULL; + args.zmq_params.request = NULL; + args.zmq_params.n_subscriptions = 0; args.asapo_params.endpoint = NULL; args.asapo_params.token = NULL; args.asapo_params.beamtime = NULL; args.asapo_params.group_id = NULL; args.asapo_params.source = NULL; args.asapo_params.stream = NULL; - args.n_zmq_subscriptions = 0; args.serial_start = 1; args.if_peaks = 1; args.if_multi = 0; @@ -1091,7 +1089,7 @@ int main(int argc, char *argv[]) /* Check for minimal information */ if ( (args.filename == NULL) - && (args.zmq_addr == NULL) + && (args.zmq_params.addr == NULL) && (args.asapo_params.endpoint == NULL) ) { ERROR("You need to provide the input filename (use -i)\n"); return 1; @@ -1105,7 +1103,7 @@ int main(int argc, char *argv[]) return 1; } - if ( (args.filename != NULL) && (args.zmq_addr != NULL) ) { + if ( (args.filename != NULL) && (args.zmq_params.addr != NULL) ) { ERROR("The options --input and --zmq-input are mutually " "exclusive.\n"); return 1; @@ -1117,13 +1115,13 @@ int main(int argc, char *argv[]) return 1; } - if ( (args.asapo_params.endpoint != NULL) && (args.zmq_addr != NULL) ) { + if ( (args.asapo_params.endpoint != NULL) && (args.zmq_params.addr != NULL) ) { ERROR("The options --asapo-endpoint and --zmq-input are mutually " "exclusive.\n"); return 1; } - if ( (args.zmq_request != NULL) && (args.n_zmq_subscriptions > 0) ) { + if ( (args.zmq_params.request != NULL) && (args.zmq_params.n_subscriptions > 0) ) { ERROR("The options --zmq-request and --zmq-subscribe are " "mutually exclusive.\n"); return 1; @@ -1345,9 +1343,8 @@ int main(int argc, char *argv[]) r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename, fh, st, tmpdir, args.serial_start, - args.zmq_addr, args.zmq_subscriptions, - args.n_zmq_subscriptions, args.zmq_request, - &args.asapo_params, timeout, args.profile); + &args.zmq_params, &args.asapo_params, + timeout, args.profile); cell_free(args.iargs.cell); free(args.prefix); -- cgit v1.2.3