aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2021-04-22 15:53:56 +0200
committerThomas White <taw@physics.org>2021-04-23 14:57:24 +0200
commit2f4f6ad97467a62ed8bf5cb44040548b89493c0b (patch)
tree7e3e3ccd0c2087f75486844bdb179945affa703a /src
parent518a25b79026a17604c0430e1f0ed14954fe713a (diff)
indexamajig: Add --zmq-subscribe
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c11
-rw-r--r--src/im-sandbox.h3
-rw-r--r--src/im-zmq.c24
-rw-r--r--src/im-zmq.h8
-rw-r--r--src/indexamajig.c13
5 files changed, 49 insertions, 10 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 22e2c05f..3f689a0d 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -100,6 +100,8 @@ struct sandbox
/* ZMQ mode */
int zmq;
const char *zmq_address;
+ char **zmq_subscriptions;
+ int n_zmq_subscriptions;
/* Final output */
Stream *stream;
@@ -331,7 +333,9 @@ static int run_work(const struct index_args *iargs, Stream *st,
/* Connect via ZMQ */
if ( sb->zmq ) {
- zmqstuff = im_zmq_connect(sb->zmq_address);
+ zmqstuff = im_zmq_connect(sb->zmq_address,
+ sb->zmq_subscriptions,
+ sb->n_zmq_subscriptions);
if ( zmqstuff == NULL ) {
ERROR("ZMQ setup failed.\n");
return 1;
@@ -1013,7 +1017,8 @@ char *create_tempdir(const char *temp_location)
int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
Stream *stream, const char *tmpdir, int serial_start,
- const char *zmq_address, int timeout, int profile)
+ const char *zmq_address, char **zmq_subscriptions,
+ int n_zmq_subscriptions, int timeout, int profile)
{
int i;
struct sandbox *sb;
@@ -1047,6 +1052,8 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
if ( zmq_address != NULL ) {
sb->zmq = 1;
sb->zmq_address = zmq_address;
+ sb->zmq_subscriptions = zmq_subscriptions;
+ sb->n_zmq_subscriptions = n_zmq_subscriptions;
} else {
sb->zmq = 0;
}
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index 2e006be4..47d23a18 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -85,6 +85,7 @@ extern void set_last_task(char *lt, const char *task);
extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh, Stream *stream,
const char *tempdir, int serial_start,
- const char *zmq_address, int timeout, int profile);
+ const char *zmq_address, char **zmq_subscriptions,
+ int n_zmq_subscriptions, int timeout, int profile);
#endif /* IM_SANDBOX_H */
diff --git a/src/im-zmq.c b/src/im-zmq.c
index 2f489811..be3bc544 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -55,9 +55,12 @@ struct im_zmq
};
-struct im_zmq *im_zmq_connect(const char *zmq_address)
+struct im_zmq *im_zmq_connect(const char *zmq_address,
+ char **subscriptions,
+ int n_subscriptions)
{
struct im_zmq *z;
+ int i;
z = malloc(sizeof(struct im_zmq));
if ( z == NULL ) return NULL;
@@ -73,11 +76,22 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
return NULL;
}
- STATUS("ZMQ connected. Subscribing to 'ondaframedata'\n");
- if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, "ondaframedata", 13) ) {
- ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno));
- return NULL;
+ if ( n_subscriptions == 0 ) {
+ ERROR("WARNING: No ZeroMQ subscriptions. You should probably "
+ "try again with --zmq-subscribe.\n");
+ }
+
+ for ( i=0; i<n_subscriptions; i++ ) {
+ STATUS("Subscribing to '%s'\n", subscriptions[i]);
+ if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE,
+ subscriptions[i],
+ strlen(subscriptions[i])) )
+ {
+ ERROR("ZMQ subscription failed: %s\n",
+ zmq_strerror(errno));
+ return NULL;
+ }
}
return z;
diff --git a/src/im-zmq.h b/src/im-zmq.h
index c6bad6cc..9cc36b48 100644
--- a/src/im-zmq.h
+++ b/src/im-zmq.h
@@ -38,13 +38,17 @@
#if defined(HAVE_ZMQ)
-extern struct im_zmq *im_zmq_connect(const char *zmq_address);
+extern struct im_zmq *im_zmq_connect(const char *zmq_address,
+ char **subscriptions,
+ int n_subscriptions);
extern void im_zmq_shutdown(struct im_zmq *z);
extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size);
#else /* defined(HAVE_ZMQ) */
-static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address) { return NULL; }
+static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address,
+ char *zmq_subscriptions,
+ int n_subscriptions) { return NULL; }
static UNUSED void im_zmq_shutdown(struct im_zmq *z) { }
static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; }
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 27071256..524022b2 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -81,6 +81,8 @@ struct indexamajig_arguments
char *indm_str;
int basename;
char *zmq_addr;
+ char *zmq_subscriptions[256];
+ int n_zmq_subscriptions;
int serial_start;
char *temp_location;
int if_refine;
@@ -361,6 +363,13 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
args->iargs.no_mask_data = 1;
break;
+ case 211 :
+ if ( args->n_zmq_subscriptions == 256 ) {
+ ERROR("Too many ZMQ subscriptions.\n");
+ return 1;
+ }
+ args->zmq_subscriptions[args->n_zmq_subscriptions++] = strdup(arg);
+
/* ---------- Peak search ---------- */
case 't' :
@@ -776,6 +785,7 @@ int main(int argc, char *argv[])
args.indm_str = NULL;
args.basename = 0;
args.zmq_addr = NULL;
+ args.n_zmq_subscriptions = 0;
args.serial_start = 1;
args.if_peaks = 1;
args.if_multi = 0;
@@ -879,6 +889,8 @@ int main(int argc, char *argv[])
{"spectrum-file", 209, "fn", OPTION_NO_USAGE | OPTION_HIDDEN,
"File containing radiation spectrum"},
{"no-mask-data", 210, NULL, OPTION_NO_USAGE, "Do not load mask data"},
+ {"zmq-subscribe", 211, "tag", OPTION_NO_USAGE, "Subscribe to ZMQ message"
+ "type"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
@@ -1229,6 +1241,7 @@ int main(int argc, char *argv[])
r = create_sandbox(&args.iargs, args.n_proc, args.prefix, args.basename,
fh, st, tmpdir, args.serial_start, args.zmq_addr,
+ args.zmq_subscriptions, args.n_zmq_subscriptions,
timeout, args.profile);
cell_free(args.iargs.cell);