From ce270ad7d8136aac47a802a9a72c011344f90527 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Wed, 5 May 2021 17:07:13 +0200 Subject: indexamajig: Add --zmq-request This (re-)adds the ability to get data via a request/reply socket. See afcb7b568947c for when it was removed. --- src/im-zmq.c | 63 +++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 16 deletions(-) (limited to 'src/im-zmq.c') diff --git a/src/im-zmq.c b/src/im-zmq.c index be3bc544..c2d386cf 100644 --- a/src/im-zmq.c +++ b/src/im-zmq.c @@ -52,12 +52,14 @@ struct im_zmq void *ctx; void *socket; zmq_msg_t msg; + const char *request_str; }; struct im_zmq *im_zmq_connect(const char *zmq_address, char **subscriptions, - int n_subscriptions) + int n_subscriptions, + const char *zmq_request) { struct im_zmq *z; int i; @@ -68,30 +70,46 @@ struct im_zmq *im_zmq_connect(const char *zmq_address, z->ctx = zmq_ctx_new(); if ( z->ctx == NULL ) return NULL; - z->socket = zmq_socket(z->ctx, ZMQ_SUB); + if ( zmq_request == NULL ) { + STATUS("Connecting ZMQ subscriber to '%s'\n", zmq_address); + z->socket = zmq_socket(z->ctx, ZMQ_SUB); + } else { + STATUS("Connecting ZMQ requester to '%s'\n", zmq_address); + z->socket = zmq_socket(z->ctx, ZMQ_REQ); + } if ( z->socket == NULL ) return NULL; - STATUS("Connecting to ZMQ at '%s'\n", zmq_address); if ( zmq_connect(z->socket, zmq_address) == -1 ) { ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno)); return NULL; } - if ( n_subscriptions == 0 ) { - ERROR("WARNING: No ZeroMQ subscriptions. You should probably " - "try again with --zmq-subscribe.\n"); - } + if ( zmq_request == NULL ) { - for ( i=0; isocket, ZMQ_SUBSCRIBE, - subscriptions[i], - strlen(subscriptions[i])) ) - { - ERROR("ZMQ subscription failed: %s\n", - zmq_strerror(errno)); - return NULL; + /* SUB mode */ + if ( n_subscriptions == 0 ) { + ERROR("WARNING: No ZeroMQ subscriptions. You should " + "probably try again with --zmq-subscribe.\n"); } + for ( i=0; isocket, ZMQ_SUBSCRIBE, + subscriptions[i], + strlen(subscriptions[i])) ) + { + ERROR("ZMQ subscription failed: %s\n", + zmq_strerror(errno)); + return NULL; + } + } + + z->request_str = NULL; + + } else { + + /* REQ mode */ + z->request_str = zmq_request; + } return z; @@ -103,6 +121,19 @@ void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size) int msg_size; void *data_copy; + if ( z->request_str != NULL ) { + + /* Send the request */ + if ( zmq_send(z->socket, z->request_str, + strlen(z->request_str), 0) == -1 ) + { + ERROR("ZMQ message send failed: %s\n", + zmq_strerror(errno)); + return NULL; + } + } + + /* Receive message */ zmq_msg_init(&z->msg); msg_size = zmq_msg_recv(&z->msg, z->socket, 0); if ( msg_size == -1 ) { -- cgit v1.2.3