diff options
author | Thomas White <taw@physics.org> | 2021-04-15 15:30:25 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2021-04-15 16:43:40 +0200 |
commit | afcb7b568947c20fb3477a178be5aefe3203b603 (patch) | |
tree | 25d3d8a475bd4353083ce00f893e766525e1e625 /src/im-zmq.c | |
parent | 0dcd6c7e2fbfe78e4d2f26e01de0d4ea032d8fd6 (diff) |
Separate ZMQ from MessagePack, switch to pub/sub socket
Indexamajig uses only ZMQ, to receive streaming data, while libcrystfel
uses only msgpack to implement another type of image format.
The two of these are eventually tied together in process_image, which
does this: if ( have_zmq_data ) interpret_zmq_data_as_msgpack;
- however, they would be easy to split up if we wanted to do something
else (CBF data over ZMQ, anyone?).
This commit also switches the ZMQ connector to use a pub/sub socket
instead of a request/reply one. This matches changes in OnDA.
At the moment, the MessagePack image reader simply dumps the object to
disk.
Diffstat (limited to 'src/im-zmq.c')
-rw-r--r-- | src/im-zmq.c | 56 |
1 files changed, 18 insertions, 38 deletions
diff --git a/src/im-zmq.c b/src/im-zmq.c index dea8515b..5c9e90bc 100644 --- a/src/im-zmq.c +++ b/src/im-zmq.c @@ -7,7 +7,7 @@ * a research centre of the Helmholtz Association. * * Authors: - * 2018-2020 Thomas White <taw@physics.org> + * 2018-2021 Thomas White <taw@physics.org> * 2014 Valerio Mariani * 2017 Stijn de Graaf * @@ -38,7 +38,6 @@ #include <assert.h> #include <unistd.h> #include <zmq.h> -#include <msgpack.h> #include <image.h> #include <utils.h> @@ -53,8 +52,6 @@ struct im_zmq void *ctx; void *socket; zmq_msg_t msg; - msgpack_unpacked unpacked; - int unpacked_set; }; @@ -65,12 +62,10 @@ struct im_zmq *im_zmq_connect(const char *zmq_address) z = malloc(sizeof(struct im_zmq)); if ( z == NULL ) return NULL; - z->unpacked_set = 0; - z->ctx = zmq_ctx_new(); if ( z->ctx == NULL ) return NULL; - z->socket = zmq_socket(z->ctx, ZMQ_REQ); + z->socket = zmq_socket(z->ctx, ZMQ_SUB); if ( z->socket == NULL ) return NULL; STATUS("Connecting to ZMQ at '%s'\n", zmq_address); @@ -78,60 +73,45 @@ struct im_zmq *im_zmq_connect(const char *zmq_address) ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno)); return NULL; } - STATUS("ZMQ connected.\n"); + STATUS("ZMQ connected. Subscribing to 'ondaframedata'\n"); + + if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, "ondaframedata", 13) ) { + ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno)); + return NULL; + } return z; } -msgpack_object *im_zmq_fetch(struct im_zmq *z) +void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size) { int msg_size; - int r; - - if ( zmq_send(z->socket, "m", 1, 0) == -1 ) { - ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno)); - return NULL; - } + void *data_copy; zmq_msg_init(&z->msg); + STATUS("requesting data...\n"); msg_size = zmq_msg_recv(&z->msg, z->socket, 0); + STATUS("done (got %i bytes)\n", msg_size); if ( msg_size == -1 ) { ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno)); zmq_msg_close(&z->msg); return NULL; } - msgpack_unpacked_init(&z->unpacked); - r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg), - msg_size, NULL); - if ( r != MSGPACK_UNPACK_SUCCESS ) { - ERROR("Msgpack unpack failed: %i\n", r); - zmq_msg_close(&z->msg); - return NULL; - } - z->unpacked_set = 1; + data_copy = malloc(msg_size); + if ( data_copy == NULL ) return NULL; + memcpy(data_copy, zmq_msg_data(&z->msg), msg_size); - return &z->unpacked.data; -} - - -/* Clean structures ready for next frame */ -void im_zmq_clean(struct im_zmq *z) -{ - if ( z->unpacked_set ) { - msgpack_unpacked_destroy(&z->unpacked); - zmq_msg_close(&z->msg); - z->unpacked_set = 0; - } + zmq_msg_close(&z->msg); + *pdata_size = msg_size; + return data_copy; } void im_zmq_shutdown(struct im_zmq *z) { if ( z == NULL ) return; - - zmq_msg_close(&z->msg); zmq_close(z->socket); zmq_ctx_destroy(z->ctx); } |