aboutsummaryrefslogtreecommitdiff
path: root/src/im-zmq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/im-zmq.c')
-rw-r--r--src/im-zmq.c56
1 files changed, 18 insertions, 38 deletions
diff --git a/src/im-zmq.c b/src/im-zmq.c
index dea8515b..5c9e90bc 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -7,7 +7,7 @@
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2018-2020 Thomas White <taw@physics.org>
+ * 2018-2021 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
* 2017 Stijn de Graaf
*
@@ -38,7 +38,6 @@
#include <assert.h>
#include <unistd.h>
#include <zmq.h>
-#include <msgpack.h>
#include <image.h>
#include <utils.h>
@@ -53,8 +52,6 @@ struct im_zmq
void *ctx;
void *socket;
zmq_msg_t msg;
- msgpack_unpacked unpacked;
- int unpacked_set;
};
@@ -65,12 +62,10 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
z = malloc(sizeof(struct im_zmq));
if ( z == NULL ) return NULL;
- z->unpacked_set = 0;
-
z->ctx = zmq_ctx_new();
if ( z->ctx == NULL ) return NULL;
- z->socket = zmq_socket(z->ctx, ZMQ_REQ);
+ z->socket = zmq_socket(z->ctx, ZMQ_SUB);
if ( z->socket == NULL ) return NULL;
STATUS("Connecting to ZMQ at '%s'\n", zmq_address);
@@ -78,60 +73,45 @@ struct im_zmq *im_zmq_connect(const char *zmq_address)
ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno));
return NULL;
}
- STATUS("ZMQ connected.\n");
+ STATUS("ZMQ connected. Subscribing to 'ondaframedata'\n");
+
+ if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, "ondaframedata", 13) ) {
+ ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno));
+ return NULL;
+ }
return z;
}
-msgpack_object *im_zmq_fetch(struct im_zmq *z)
+void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size)
{
int msg_size;
- int r;
-
- if ( zmq_send(z->socket, "m", 1, 0) == -1 ) {
- ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno));
- return NULL;
- }
+ void *data_copy;
zmq_msg_init(&z->msg);
+ STATUS("requesting data...\n");
msg_size = zmq_msg_recv(&z->msg, z->socket, 0);
+ STATUS("done (got %i bytes)\n", msg_size);
if ( msg_size == -1 ) {
ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno));
zmq_msg_close(&z->msg);
return NULL;
}
- msgpack_unpacked_init(&z->unpacked);
- r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg),
- msg_size, NULL);
- if ( r != MSGPACK_UNPACK_SUCCESS ) {
- ERROR("Msgpack unpack failed: %i\n", r);
- zmq_msg_close(&z->msg);
- return NULL;
- }
- z->unpacked_set = 1;
+ data_copy = malloc(msg_size);
+ if ( data_copy == NULL ) return NULL;
+ memcpy(data_copy, zmq_msg_data(&z->msg), msg_size);
- return &z->unpacked.data;
-}
-
-
-/* Clean structures ready for next frame */
-void im_zmq_clean(struct im_zmq *z)
-{
- if ( z->unpacked_set ) {
- msgpack_unpacked_destroy(&z->unpacked);
- zmq_msg_close(&z->msg);
- z->unpacked_set = 0;
- }
+ zmq_msg_close(&z->msg);
+ *pdata_size = msg_size;
+ return data_copy;
}
void im_zmq_shutdown(struct im_zmq *z)
{
if ( z == NULL ) return;
-
- zmq_msg_close(&z->msg);
zmq_close(z->socket);
zmq_ctx_destroy(z->ctx);
}