diff options
-rw-r--r-- | CMakeLists.txt | 16 | ||||
-rw-r--r-- | libcrystfel/CMakeLists.txt | 5 | ||||
-rw-r--r-- | libcrystfel/src/image-msgpack.c | 91 | ||||
-rw-r--r-- | libcrystfel/src/image-msgpack.h | 20 | ||||
-rw-r--r-- | src/im-sandbox.c | 13 | ||||
-rw-r--r-- | src/im-zmq.c | 56 | ||||
-rw-r--r-- | src/im-zmq.h | 24 | ||||
-rw-r--r-- | src/process_image.c | 8 | ||||
-rw-r--r-- | src/process_image.h | 7 |
9 files changed, 102 insertions, 138 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 6cfaab14..62637a78 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,13 +31,6 @@ link_directories (${GLIB_LIBRARY_DIRS}) set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall") -pkg_search_module(MSGPACK msgpack) -if (MSGPACK_FOUND) - message(STATUS "Found Messagepack") -else () - message(STATUS "MessagePack not found.") -endif () - pkg_search_module(ZMQ libzmq) if (ZMQ_FOUND) message(STATUS "Found ZMQ") @@ -117,7 +110,6 @@ set(HAVE_GTK ${GTK_FOUND}) set(HAVE_OPENCL ${OpenCL_FOUND}) set(HAVE_GDKPIXBUF ${GDKPIXBUF_FOUND}) set(HAVE_GDK ${GDK_FOUND}) -set(HAVE_MSGPACK ${MSGPACK_FOUND}) set(HAVE_ZMQ ${ZMQ_FOUND}) set(PACKAGE_VERSION ${PROJECT_VERSION}) @@ -255,7 +247,7 @@ list(APPEND CRYSTFEL_EXECUTABLES list_events) set(INDEXAMAJIG_SOURCES src/indexamajig.c src/im-sandbox.c src/process_image.c src/time-accounts.c) -if ( ZMQ_FOUND AND MSGPACK_FOUND ) +if ( ZMQ_FOUND ) list(APPEND INDEXAMAJIG_SOURCES src/im-zmq.c) endif () @@ -265,9 +257,9 @@ target_include_directories(indexamajig PRIVATE ${COMMON_INCLUDES}) target_link_libraries(indexamajig ${COMMON_LIBRARIES}) list(APPEND CRYSTFEL_EXECUTABLES indexamajig) -if ( ZMQ_FOUND AND MSGPACK_FOUND ) - target_include_directories(indexamajig PRIVATE ${ZMQ_INCLUDE_DIR} ${MSGPACK_INCLUDE_DIR}) - target_link_libraries(indexamajig ${ZMQ_LIBRARIES} ${MSGPACK_LIBRARIES}) +if ( ZMQ_FOUND ) + target_include_directories(indexamajig PRIVATE ${ZMQ_INCLUDE_DIR}) + target_link_libraries(indexamajig ${ZMQ_LIBRARIES}) endif () diff --git a/libcrystfel/CMakeLists.txt b/libcrystfel/CMakeLists.txt index fa963c22..7bf301f0 100644 --- a/libcrystfel/CMakeLists.txt +++ b/libcrystfel/CMakeLists.txt @@ -177,6 +177,11 @@ if (LIBCCP4_FOUND) target_link_libraries(${PROJECT_NAME} PRIVATE ${LIBCCP4_LIBRARIES}) endif (LIBCCP4_FOUND) +if (MSGPACK_FOUND) + target_include_directories(${PROJECT_NAME} PRIVATE ${MSGPACK_INCLUDES}) + target_link_libraries(${PROJECT_NAME} PRIVATE ${MSGPACK_LIBRARIES}) +endif (MSGPACK_FOUND) + target_compile_options(${PROJECT_NAME} PRIVATE -Wall) set_target_properties(${PROJECT_NAME} PROPERTIES PUBLIC_HEADER "${LIBCRYSTFEL_HEADERS}") diff --git a/libcrystfel/src/image-msgpack.c b/libcrystfel/src/image-msgpack.c index 420ecfb4..2c9947a9 100644 --- a/libcrystfel/src/image-msgpack.c +++ b/libcrystfel/src/image-msgpack.c @@ -7,7 +7,7 @@ * a research centre of the Helmholtz Association. * * Authors: - * 2018-2020 Thomas White <taw@physics.org> + * 2018-2021 Thomas White <taw@physics.org> * 2014 Valerio Mariani * 2017 Stijn de Graaf * @@ -210,49 +210,36 @@ static int unpack_slab(struct image *image, static double *find_msgpack_data(msgpack_object *obj, int *width, int *height) { - msgpack_object *corr_data_obj; - msgpack_object *data_obj; - msgpack_object *shape_obj; - double *data; - - corr_data_obj = find_msgpack_kv(obj, "corr_data"); - if ( corr_data_obj == NULL ) { - ERROR("No corr_data MessagePack object found.\n"); - return NULL; + FILE *fh = fopen("msgpack.data", "a"); + fprintf(fh, "object %p:\n", obj); + msgpack_object_print(fh, *obj); + fprintf(fh, "\n\n\n"); + fclose(fh); + + #if 0 + printf("Data type: %i\n", obj->type); + if ( obj->type == MSGPACK_OBJECT_POSITIVE_INTEGER ) { + printf("got an integer: %li\n", obj->via.i64); } - data_obj = find_msgpack_kv(corr_data_obj, "data"); - if ( data_obj == NULL ) { - ERROR("No data MessagePack object found inside corr_data.\n"); - return NULL; - } - if ( data_obj->type != MSGPACK_OBJECT_STR ) { - ERROR("corr_data.data isn't a binary object.\n"); - return NULL; - } - data = (double *)data_obj->via.str.ptr; + if ( obj->type == MSGPACK_OBJECT_ARRAY ) { - shape_obj = find_msgpack_kv(corr_data_obj, "shape"); - if ( shape_obj == NULL ) { - ERROR("No shape MessagePack object found inside corr_data.\n"); - return NULL; - } - if ( shape_obj->type != MSGPACK_OBJECT_ARRAY ) { - ERROR("corr_data.shape isn't an array object.\n"); - return NULL; - } - if ( shape_obj->via.array.size != 2 ) { - ERROR("corr_data.shape is wrong size (%i, should be 2)\n", - shape_obj->via.array.size); - return NULL; - } - if ( shape_obj->via.array.ptr[0].type != MSGPACK_OBJECT_POSITIVE_INTEGER ) { - ERROR("corr_data.shape contains wrong type of element.\n"); - return NULL; + int i; + printf("Array %i items\n", obj->via.array.size); + + for ( i=0; i<obj->via.array.size; i++ ) { + msgpack_object *obj2 = obj->via.array.ptr[i]; + printf("Item %i: type %i\n", i, obj2->type); + if ( obj2->type == MSGPACK_OBJECT_MAP ) { + printf("Map: '%s' -> "); + } + } } - *height = shape_obj->via.array.ptr[0].via.i64; - *width = shape_obj->via.array.ptr[1].via.i64; - return data; + #endif + + *width = 2068; + *height = 2162; + return NULL; } @@ -272,15 +259,18 @@ static double *find_msgpack_data(msgpack_object *obj, int *width, int *height) * } */ struct image *image_msgpack_read(DataTemplate *dtempl, - msgpack_object *obj, + void *data, + size_t data_size, int no_image_data, int no_mask_data) { struct image *image; int data_width, data_height; - double *data; + double *image_data; + msgpack_unpacked unpacked; + int r; - if ( obj == NULL ) { + if ( data == NULL ) { ERROR("No MessagePack object!\n"); return NULL; } @@ -290,6 +280,13 @@ struct image *image_msgpack_read(DataTemplate *dtempl, return NULL; } + msgpack_unpacked_init(&unpacked); + r = msgpack_unpack_next(&unpacked, data, data_size, NULL); + if ( r != MSGPACK_UNPACK_SUCCESS ) { + ERROR("Msgpack unpack failed: %i\n", r); + return NULL; + } + image = image_new(); if ( image == NULL ) { ERROR("Couldn't allocate image structure.\n"); @@ -297,13 +294,13 @@ struct image *image_msgpack_read(DataTemplate *dtempl, } if ( !no_image_data ) { - data = find_msgpack_data(obj, - &data_width, &data_height); - if ( data == NULL ) { + image_data = find_msgpack_data(&unpacked.data, + &data_width, &data_height); + if ( image_data == NULL ) { ERROR("No image data in MessagePack object.\n"); return NULL; } - unpack_slab(image, dtempl, data, + unpack_slab(image, dtempl, image_data, data_width, data_height); } else { image_set_zero_data(image, dtempl); diff --git a/libcrystfel/src/image-msgpack.h b/libcrystfel/src/image-msgpack.h index a8e5af34..06fcffec 100644 --- a/libcrystfel/src/image-msgpack.h +++ b/libcrystfel/src/image-msgpack.h @@ -3,11 +3,11 @@ * * Image loading, MessagePack parts * - * Copyright © 2012-2020 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2021 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * * Authors: - * 2020 Thomas White <taw@physics.org> + * 2020-2021 Thomas White <taw@physics.org> * * This file is part of CrystFEL. * @@ -33,27 +33,29 @@ #if defined(HAVE_MSGPACK) -#include <msgpack.h> - extern struct image *image_msgpack_read(DataTemplate *dtempl, - msgpack_object *obj, + void *data, + size_t data_size, int no_image_data); extern ImageFeatureList *image_msgpack_read_peaks(const DataTemplate *dtempl, - msgpack_object *obj, + void *data, + size_t data_size, int half_pixel_shift); #else /* defined(HAVE_MSGPACK) */ static UNUSED struct image *image_msgpack_read(DataTemplate *dtempl, - void *obj, - int no_image_data) + void *data, + size_t data_size, + int no_image_data) { return NULL; } static UNUSED ImageFeatureList *image_msgpack_read_peaks(const DataTemplate *dtempl, - void *obj, + void *data, + size_t data_size, int half_pixel_shift) { return NULL; diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 6615c418..eb0b942f 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -427,14 +427,16 @@ static int run_work(const struct index_args *iargs, Stream *st, free(line); - pargs.msgpack_obj = NULL; + pargs.zmq_data = NULL; + pargs.zmq_data_size = 0; } else { - pargs.msgpack_obj = im_zmq_fetch(zmqstuff); + pargs.zmq_data = im_zmq_fetch(zmqstuff, + &pargs.zmq_data_size); pargs.filename = strdup("(from ZMQ)"); pargs.event = NULL; - ser = 0; /* FIXME */ + ser = 0; /* FIXME: Serial numbers from ZMQ? */ } @@ -442,10 +444,7 @@ static int run_work(const struct index_args *iargs, Stream *st, process_image(iargs, &pargs, st, cookie, tmpdir, ser, sb->shared, taccs, sb->shared->last_task[cookie]); - if ( sb->zmq ) { - im_zmq_clean(zmqstuff); - } - + free(pargs.zmq_data); } im_zmq_shutdown(zmqstuff); diff --git a/src/im-zmq.c b/src/im-zmq.c index dea8515b..5c9e90bc 100644 --- a/src/im-zmq.c +++ b/src/im-zmq.c @@ -7,7 +7,7 @@ * a research centre of the Helmholtz Association. * * Authors: - * 2018-2020 Thomas White <taw@physics.org> + * 2018-2021 Thomas White <taw@physics.org> * 2014 Valerio Mariani * 2017 Stijn de Graaf * @@ -38,7 +38,6 @@ #include <assert.h> #include <unistd.h> #include <zmq.h> -#include <msgpack.h> #include <image.h> #include <utils.h> @@ -53,8 +52,6 @@ struct im_zmq void *ctx; void *socket; zmq_msg_t msg; - msgpack_unpacked unpacked; - int unpacked_set; }; @@ -65,12 +62,10 @@ struct im_zmq *im_zmq_connect(const char *zmq_address) z = malloc(sizeof(struct im_zmq)); if ( z == NULL ) return NULL; - z->unpacked_set = 0; - z->ctx = zmq_ctx_new(); if ( z->ctx == NULL ) return NULL; - z->socket = zmq_socket(z->ctx, ZMQ_REQ); + z->socket = zmq_socket(z->ctx, ZMQ_SUB); if ( z->socket == NULL ) return NULL; STATUS("Connecting to ZMQ at '%s'\n", zmq_address); @@ -78,60 +73,45 @@ struct im_zmq *im_zmq_connect(const char *zmq_address) ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno)); return NULL; } - STATUS("ZMQ connected.\n"); + STATUS("ZMQ connected. Subscribing to 'ondaframedata'\n"); + + if ( zmq_setsockopt(z->socket, ZMQ_SUBSCRIBE, "ondaframedata", 13) ) { + ERROR("ZMQ subscription failed: %s\n", zmq_strerror(errno)); + return NULL; + } return z; } -msgpack_object *im_zmq_fetch(struct im_zmq *z) +void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size) { int msg_size; - int r; - - if ( zmq_send(z->socket, "m", 1, 0) == -1 ) { - ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno)); - return NULL; - } + void *data_copy; zmq_msg_init(&z->msg); + STATUS("requesting data...\n"); msg_size = zmq_msg_recv(&z->msg, z->socket, 0); + STATUS("done (got %i bytes)\n", msg_size); if ( msg_size == -1 ) { ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno)); zmq_msg_close(&z->msg); return NULL; } - msgpack_unpacked_init(&z->unpacked); - r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg), - msg_size, NULL); - if ( r != MSGPACK_UNPACK_SUCCESS ) { - ERROR("Msgpack unpack failed: %i\n", r); - zmq_msg_close(&z->msg); - return NULL; - } - z->unpacked_set = 1; + data_copy = malloc(msg_size); + if ( data_copy == NULL ) return NULL; + memcpy(data_copy, zmq_msg_data(&z->msg), msg_size); - return &z->unpacked.data; -} - - -/* Clean structures ready for next frame */ -void im_zmq_clean(struct im_zmq *z) -{ - if ( z->unpacked_set ) { - msgpack_unpacked_destroy(&z->unpacked); - zmq_msg_close(&z->msg); - z->unpacked_set = 0; - } + zmq_msg_close(&z->msg); + *pdata_size = msg_size; + return data_copy; } void im_zmq_shutdown(struct im_zmq *z) { if ( z == NULL ) return; - - zmq_msg_close(&z->msg); zmq_close(z->socket); zmq_ctx_destroy(z->ctx); } diff --git a/src/im-zmq.h b/src/im-zmq.h index 87128895..c6bad6cc 100644 --- a/src/im-zmq.h +++ b/src/im-zmq.h @@ -7,7 +7,7 @@ * a research centre of the Helmholtz Association. * * Authors: - * 2018-2019 Thomas White <taw@physics.org> + * 2018-2021 Thomas White <taw@physics.org> * 2014 Valerio Mariani * 2017 Stijn de Graaf * @@ -36,28 +36,18 @@ #include <config.h> #endif -#if defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) - -#include <msgpack.h> +#if defined(HAVE_ZMQ) extern struct im_zmq *im_zmq_connect(const char *zmq_address); - -extern void im_zmq_clean(struct im_zmq *z); - extern void im_zmq_shutdown(struct im_zmq *z); +extern void *im_zmq_fetch(struct im_zmq *z, size_t *pdata_size); -extern msgpack_object *im_zmq_fetch(struct im_zmq *z); - -#else /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */ +#else /* defined(HAVE_ZMQ) */ static UNUSED struct im_zmq *im_zmq_connect(const char *zmq_address) { return NULL; } +static UNUSED void im_zmq_shutdown(struct im_zmq *z) { } +static UNUSED void *im_zmq_fetch(struct im_zmq *z, size_t *psize) { *psize = 0; return NULL; } -static UNUSED void im_zmq_clean(struct im_zmq *z) { return; } - -static UNUSED void im_zmq_shutdown(struct im_zmq *z) { return; } - -static UNUSED void *im_zmq_fetch(struct im_zmq *z) { return NULL; } - -#endif /* defined(HAVE_MSGPACK) && defined(HAVE_ZMQ) */ +#endif /* defined(HAVE_ZMQ) */ #endif /* CRYSTFEL_ZMQ_H */ diff --git a/src/process_image.c b/src/process_image.c index 825e57d9..52e0a9e0 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -193,10 +193,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, float **prefilter; int any_crystals; - if ( pargs->msgpack_obj != NULL ) { + if ( pargs->zmq_data != NULL ) { set_last_task(last_task, "unpacking messagepack object"); image = image_msgpack_read(iargs->dtempl, - pargs->msgpack_obj, + pargs->zmq_data, + pargs->zmq_data_size, iargs->no_image_data); if ( image == NULL ) return; } else { @@ -303,7 +304,8 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, case PEAK_MSGPACK: image->features = image_msgpack_read_peaks(iargs->dtempl, - pargs->msgpack_obj, + pargs->zmq_data, + pargs->zmq_data_size, iargs->half_pixel_shift); break; diff --git a/src/process_image.h b/src/process_image.h index e8be0c29..ec7e22e2 100644 --- a/src/process_image.h +++ b/src/process_image.h @@ -119,11 +119,8 @@ struct pattern_args /* "Input" */ char *filename; char *event; -#ifdef HAVE_MSGPACK - msgpack_object *msgpack_obj; -#else - void *msgpack_obj; -#endif + void *zmq_data; + size_t zmq_data_size; }; |