aboutsummaryrefslogtreecommitdiff
path: root/src/im-zmq.c
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2021-05-05 17:07:13 +0200
committerThomas White <taw@physics.org>2021-05-05 17:14:04 +0200
commitce270ad7d8136aac47a802a9a72c011344f90527 (patch)
tree1df70574efad2ceb31edb43582ba7ff2c8a2c8ea /src/im-zmq.c
parente11394ce9133333af01afd88a0f484d6ea70665d (diff)
indexamajig: Add --zmq-request
This (re-)adds the ability to get data via a request/reply socket. See afcb7b568947c for when it was removed.
Diffstat (limited to 'src/im-zmq.c')
-rw-r--r--src/im-zmq.c63
1 files changed, 47 insertions, 16 deletions
diff --git a/src/im-zmq.c b/src/im-zmq.c
index be3bc544..c2d386cf 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -52,12 +52,14 @@ struct im_zmq
void *ctx;
void *socket;
zmq_msg_t msg;
+ const char *request_str;
};
struct im_zmq *im_zmq_connect(const char *zmq_address,
char **subscriptions,
- int n_subscriptions)
+ int n_subscriptions,
+ const char *zmq_request)
{
struct im_zmq *z;
int i;
@@ -68,30 +70,46 @@ struct im_zmq *im_zmq_connect(const char *zmq_address,
z->ctx = zmq_ctx_new();
if ( z->ctx == NULL ) return NULL;
- z->socket = zmq_socket(z->ctx, ZMQ_SUB);
+ if ( zmq_request == NULL ) {
+ STATUS("Connecting ZMQ subscriber to '%s'\n", zmq_address);
+ z->socket = zmq_socket(z->ctx, ZMQ_SUB);
+ } else {
+ STATUS("Connecting ZMQ requester to '%s'\n", zmq_address);
+ z->socket = zmq_socket(z->ctx, ZMQ_REQ);
+ }
if ( z->socket == NULL ) return NULL;
- STATUS("Connecting to ZMQ at '%s'\n", zmq_address);
if ( zmq_connect(z->socket, zmq_address) == -1 ) {
ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
return NULL;
}
- if ( n_subscriptions == 0 ) {
- ERROR("WARNING: No ZeroMQ subscriptions. You should probably "
- "try again with --zmq-subscribe.\n");
- }
+ if ( zmq_request == NULL ) {
- for ( i=0; i<n_subscriptions; i++ ) {
- STATUS("Subscribing to '%s'\n", subscriptions[i]);
- if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE,
- subscriptions[i],
- strlen(subscriptions[i])) )
- {
- ERROR("ZMQ subscription failed: %s\n",
- zmq_strerror(errno));
- return NULL;
+ /* SUB mode */
+ if ( n_subscriptions == 0 ) {
+ ERROR("WARNING: No ZeroMQ subscriptions. You should "
+ "probably try again with --zmq-subscribe.\n");
}
+ for ( i=0; i<n_subscriptions; i++ ) {
+ STATUS("Subscribing to '%s'\n", subscriptions[i]);
+ if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE,
+ subscriptions[i],
+ strlen(subscriptions[i])) )
+ {
+ ERROR("ZMQ subscription failed: %s\n",
+ zmq_strerror(errno));
+ return NULL;
+ }
+ }
+
+ z->request_str = NULL;
+
+ } else {
+
+ /* REQ mode */
+ z->request_str = zmq_request;
+
}
return z;
@@ -103,6 +121,19 @@ void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size)
int msg_size;
void *data_copy;
+ if ( z->request_str != NULL ) {
+
+ /* Send the request */
+ if ( zmq_send(z->socket, z->request_str,
+ strlen(z->request_str), 0) == -1 )
+ {
+ ERROR("ZMQ message send failed: %s\n",
+ zmq_strerror(errno));
+ return NULL;
+ }
+ }
+
+ /* Receive message */
zmq_msg_init(&z->msg);
msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
if ( msg_size == -1 ) {