diff options
Diffstat (limited to 'src/im-zmq.c')
-rw-r--r-- | src/im-zmq.c | 56 |
1 files changed, 18 insertions, 38 deletions
diff --git a/src/im-zmq.c b/src/im-zmq.c index dea8515b..5c9e90bc 100644 --- a/src/im-zmq.c +++ b/src/im-zmq.c @@ -7,7 +7,7 @@ * a research centre of the Helmholtz Association. * * Authors: - * 2018-2020 Thomas White <taw@physics.org> + * 2018-2021 Thomas White <taw@physics.org> * 2014 Valerio Mariani * 2017 Stijn de Graaf * @@ -38,7 +38,6 @@ #include <assert.h> #include <unistd.h> #include <zmq.h> -#include <msgpack.h> #include <image.h> #include <utils.h> @@ -53,8 +52,6 @@ struct im_zmq void *ctx; void *socket; zmq_msg_t msg; - msgpack_unpacked unpacked; - int unpacked_set; }; @@ -65,12 +62,10 @@ struct im_zmq *im_zmq_connect(const char *zmq_address) z = malloc(sizeof(struct im_zmq)); if ( z == NULL ) return NULL; - z->unpacked_set = 0; - z->ctx = zmq_ctx_new(); if ( z->ctx == NULL ) return NULL; - z->socket = zmq_socket(z->ctx, ZMQ_REQ); + z->socket = zmq_socket(z->ctx, ZMQ_SUB); if ( z->socket == NULL ) return NULL; STATUS("Connecting to ZMQ at '%s'\n", zmq_address); @@ -78,60 +73,45 @@ struct im_zmq *im_zmq_connect(const char *zmq_address) ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno)); return NULL; } - STATUS("ZMQ connected.\n"); + STATUS("ZMQ connected. Subscribing to 'ondaframedata'\n"); + + if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, "ondaframedata", 13) ) { + ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno)); + return NULL; + } return z; } -msgpack_object *im_zmq_fetch(struct im_zmq *z) +void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size) { int msg_size; - int r; - - if ( zmq_send(z->socket, "m", 1, 0) == -1 ) { - ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno)); - return NULL; - } + void *data_copy; zmq_msg_init(&z->msg); + STATUS("requesting data...\n"); msg_size = zmq_msg_recv(&z->msg, z->socket, 0); + STATUS("done (got %i bytes)\n", msg_size); if ( msg_size == -1 ) { ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno)); zmq_msg_close(&z->msg); return NULL; } - msgpack_unpacked_init(&z->unpacked); - r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg), - msg_size, NULL); - if ( r != MSGPACK_UNPACK_SUCCESS ) { - ERROR("Msgpack unpack failed: %i\n", r); - zmq_msg_close(&z->msg); - return NULL; - } - z->unpacked_set = 1; + data_copy = malloc(msg_size); + if ( data_copy == NULL ) return NULL; + memcpy(data_copy, zmq_msg_data(&z->msg), msg_size); - return &z->unpacked.data; -} - - -/* Clean structures ready for next frame */ -void im_zmq_clean(struct im_zmq *z) -{ - if ( z->unpacked_set ) { - msgpack_unpacked_destroy(&z->unpacked); - zmq_msg_close(&z->msg); - z->unpacked_set = 0; - } + zmq_msg_close(&z->msg); + *pdata_size = msg_size; + return data_copy; } void im_zmq_shutdown(struct im_zmq *z) { if ( z == NULL ) return; - - zmq_msg_close(&z->msg); zmq_close(z->socket); zmq_ctx_destroy(z->ctx); } |