diff options
author | Thomas White <taw@physics.org> | 2019-01-08 15:44:52 +0100 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2019-03-14 11:36:56 +0100 |
commit | 94ea1c4468fed5ee4be48e3334fe75541ecaba02 (patch) | |
tree | 37dcb31bb339e4f4b63780253740d10ab5ad4526 /src | |
parent | e9391fe842fee1966f195442ba9be7ada9199585 (diff) |
ZMQ connection and setup stuff
Diffstat (limited to 'src')
-rw-r--r-- | src/im-sandbox.c | 138 | ||||
-rw-r--r-- | src/im-sandbox.h | 7 | ||||
-rw-r--r-- | src/im-zmq.c (renamed from src/zmq.c) | 89 | ||||
-rw-r--r-- | src/im-zmq.h (renamed from src/zmq.h) | 17 | ||||
-rw-r--r-- | src/indexamajig.c | 20 | ||||
-rw-r--r-- | src/process_image.c | 6 | ||||
-rw-r--r-- | src/process_image.h | 7 |
7 files changed, 225 insertions, 59 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index e8fbe763..37574310 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -3,17 +3,18 @@ * * Sandbox for indexing * - * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * Copyright © 2012 Richard Kirian * Copyright © 2012 Lorenzo Galli * * Authors: - * 2010-2018 Thomas White <taw@physics.org> + * 2010-2019 Thomas White <taw@physics.org> * 2014 Valerio Mariani * 2011 Richard Kirian * 2012 Lorenzo Galli * 2012 Chunhong Yoon + * 2017 Stijn de Graaf * * This file is part of CrystFEL. * @@ -64,6 +65,7 @@ #include "im-sandbox.h" #include "process_image.h" #include "time-accounts.h" +#include "im-zmq.h" struct sandbox @@ -92,6 +94,10 @@ struct sandbox const char *tmpdir; + /* ZMQ mode */ + int zmq; + const char *zmq_address; + /* Final output */ Stream *stream; }; @@ -364,7 +370,19 @@ static void run_work(const struct index_args *iargs, Stream *st, int cookie, const char *tmpdir, struct sandbox *sb) { int allDone = 0; - TimeAccounts *taccs = time_accounts_init(); + TimeAccounts *taccs; + struct im_zmq *zmqstuff; + + /* Connect via ZMQ */ + if ( sb->zmq ) { + zmqstuff = im_zmq_connect(sb->zmq_address); + if ( zmqstuff == NULL ) { + ERROR("ZMQ setup failed.\n"); + return; + } + } + + taccs = time_accounts_init(); while ( !allDone ) { @@ -375,58 +393,66 @@ static void run_work(const struct index_args *iargs, Stream *st, struct event *ev; int r; - /* Wait until an event is ready */ - time_accounts_set(taccs, TACC_EVENTWAIT); - set_last_task(sb->shared->last_task[cookie], "wait_event"); - if ( sem_wait(sb->queue_sem) != 0 ) { - ERROR("Failed to wait on queue semaphore: %s\n", - strerror(errno)); - } + if ( !sb->zmq ) { - /* Get the event from the queue */ - set_last_task(sb->shared->last_task[cookie], "read_queue"); - pthread_mutex_lock(&sb->shared->queue_lock); - if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) { - /* Queue is empty and no more coming, so exit */ - pthread_mutex_unlock(&sb->shared->queue_lock); - allDone = 1; - continue; - } - if ( sb->shared->n_events == 0 ) { - ERROR("Got the semaphore, but no events in queue!\n"); - ERROR("no_more = %i\n", sb->shared->no_more); + /* Wait until an event is ready */ + time_accounts_set(taccs, TACC_EVENTWAIT); + set_last_task(sb->shared->last_task[cookie], "wait_event"); + if ( sem_wait(sb->queue_sem) != 0 ) { + ERROR("Failed to wait on queue semaphore: %s\n", + strerror(errno)); + } + + /* Get the event from the queue */ + set_last_task(sb->shared->last_task[cookie], "read_queue"); + pthread_mutex_lock(&sb->shared->queue_lock); + if ( (sb->shared->n_events==0) && (sb->shared->no_more) ) { + /* Queue is empty and no more coming, so exit */ + pthread_mutex_unlock(&sb->shared->queue_lock); + allDone = 1; + continue; + } + if ( sb->shared->n_events == 0 ) { + ERROR("Got the semaphore, but no events in queue!\n"); + ERROR("no_more = %i\n", sb->shared->no_more); + pthread_mutex_unlock(&sb->shared->queue_lock); + allDone = 1; + continue; + } + r = sscanf(sb->shared->queue[0], "%s %s %i", + filename, event_str, &ser); + if ( r != 3 ) { + STATUS("Invalid event string '%s'\n", + sb->shared->queue[0]); + } + memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0], + MAX_EV_LEN); + shuffle_events(sb->shared); pthread_mutex_unlock(&sb->shared->queue_lock); - allDone = 1; - continue; - } - r = sscanf(sb->shared->queue[0], "%s %s %i", - filename, event_str, &ser); - if ( r != 3 ) { - STATUS("Invalid event string '%s'\n", - sb->shared->queue[0]); - } - memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0], - MAX_EV_LEN); - shuffle_events(sb->shared); - pthread_mutex_unlock(&sb->shared->queue_lock); - if ( r != 3 ) continue; + if ( r != 3 ) continue; - pargs.filename_p_e = initialize_filename_plus_event(); - pargs.filename_p_e->filename = strdup(filename); + pargs.filename_p_e = initialize_filename_plus_event(); + pargs.filename_p_e->filename = strdup(filename); - if ( strcmp(event_str, "(none)") != 0 ) { + if ( strcmp(event_str, "(none)") != 0 ) { + + ev = get_event_from_event_string(event_str); + if ( ev == NULL ) { + ERROR("Bad event string '%s'\n", event_str); + continue; + } + pargs.filename_p_e->ev = ev; + + } else { + + pargs.filename_p_e->ev = NULL; - ev = get_event_from_event_string(event_str); - if ( ev == NULL ) { - ERROR("Bad event string '%s'\n", event_str); - continue; } - pargs.filename_p_e->ev = ev; } else { - pargs.filename_p_e->ev = NULL; + pargs.msgpack_obj = im_zmq_fetch(zmqstuff); } @@ -434,10 +460,16 @@ static void run_work(const struct index_args *iargs, Stream *st, process_image(iargs, &pargs, st, cookie, tmpdir, ser, sb->shared, taccs, sb->shared->last_task[cookie]); - free_filename_plus_event(pargs.filename_p_e); + if ( !sb->zmq ) { + free_filename_plus_event(pargs.filename_p_e); + } else { + im_zmq_clean(zmqstuff); + } } + im_zmq_shutdown(zmqstuff); + time_accounts_set(taccs, TACC_FINALCLEANUP); cleanup_indexing(iargs->ipriv); free_detector_geometry(iargs->det); @@ -804,6 +836,11 @@ static int setup_shm(struct sandbox *sb) /* Assumes the caller is already holding queue_lock! */ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb) { + if ( sb->zmq ) { + /* Do nothing */ + return 0; + } + while ( sb->shared->n_events < QUEUE_SIZE ) { struct filename_plus_event *ne; @@ -998,7 +1035,8 @@ char *create_tempdir(const char *temp_location) * If the return value is zero, something is probably wrong. */ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, - Stream *stream, const char *tmpdir, int serial_start) + Stream *stream, const char *tmpdir, int serial_start, + const char *zmq_address) { int i; struct sandbox *sb; @@ -1027,6 +1065,12 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->iargs = iargs; sb->serial = serial_start; sb->tmpdir = tmpdir; + if ( zmq_address != NULL ) { + sb->zmq = 1; + sb->zmq_address = zmq_address; + } else { + sb->zmq = 0; + } sb->fds = NULL; sb->fhs = NULL; diff --git a/src/im-sandbox.h b/src/im-sandbox.h index ee2de993..9da11526 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -3,13 +3,13 @@ * * Sandbox for indexing * - * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * Copyright © 2012 Richard Kirian * Copyright © 2012 Lorenzo Galli * * Authors: - * 2010-2018 Thomas White <taw@physics.org> + * 2010-2019 Thomas White <taw@physics.org> * 2011 Richard Kirian * 2012 Lorenzo Galli * 2012 Chunhong Yoon @@ -83,6 +83,7 @@ extern void set_last_task(char *lt, const char *task); extern int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int config_basename, FILE *fh, Stream *stream, - const char *tempdir, int serial_start); + const char *tempdir, int serial_start, + const char *zmq_address); #endif /* IM_SANDBOX_H */ diff --git a/src/zmq.c b/src/im-zmq.c index 41bd2eec..c299b980 100644 --- a/src/zmq.c +++ b/src/im-zmq.c @@ -38,6 +38,7 @@ #include <hdf5.h> #include <assert.h> #include <unistd.h> +#include <zmq.h> #include <msgpack.h> #include "events.h" @@ -46,6 +47,94 @@ #include "utils.h" +struct im_zmq +{ + void *ctx; + void *socket; + zmq_msg_t msg; + msgpack_unpacked unpacked; + int unpacked_set; +}; + + +struct im_zmq *im_zmq_connect(const char *zmq_address) +{ + struct im_zmq *z; + + z = malloc(sizeof(struct im_zmq)); + if ( z == NULL ) return NULL; + + z->unpacked_set = 0; + + z->ctx = zmq_ctx_new(); + if ( z->ctx == NULL ) return NULL; + + z->socket = zmq_socket(z->ctx, ZMQ_REQ); + if ( z->socket == NULL ) return NULL; + + STATUS("Connecting to ZMQ at '%s'\n", zmq_address); + if ( zmq_connect(z->socket, zmq_address) == -1 ) { + ERROR("ZMQ connection failed: %s\n", zmq_strerror(errno)); + return NULL; + } + STATUS("ZMQ connected.\n"); + + return z; +} + + +msgpack_object *im_zmq_fetch(struct im_zmq *z) +{ + int msg_size; + int r; + + if ( zmq_send(z->socket, "m", 1, 0) == -1 ) { + ERROR("ZMQ message send failed: %s\n", zmq_strerror(errno)); + return NULL; + } + + zmq_msg_init(&z->msg); + msg_size = zmq_msg_recv(&z->msg, z->socket, 0); + if ( msg_size == -1 ) { + ERROR("ZMQ recieve failed: %s\n", zmq_strerror(errno)); + zmq_msg_close(&z->msg); + return NULL; + } + + msgpack_unpacked_init(&z->unpacked); + r = msgpack_unpack_next(&z->unpacked, zmq_msg_data(&z->msg), + msg_size, NULL); + if ( r != MSGPACK_UNPACK_SUCCESS ) { + ERROR("Msgpack unpack failed: %i\n", r); + zmq_msg_close(&z->msg); + return NULL; + } + z->unpacked_set = 1; + + return &z->unpacked.data; +} + + +/* Clean structures ready for next frame */ +void im_zmq_clean(struct im_zmq *z) +{ + if ( z->unpacked_set ) { + msgpack_unpacked_destroy(&z->unpacked); + zmq_msg_close(&z->msg); + z->unpacked_set = 0; + } +} + + +void im_zmq_shutdown(struct im_zmq *z) +{ + if ( z == NULL ) return; + + zmq_msg_close(&z->msg); + zmq_close(z->socket); + zmq_ctx_destroy(z->ctx); +} + /** * get_peaks_onda: * @obj: A %msgpack_object containing data in OnDA format diff --git a/src/zmq.h b/src/im-zmq.h index ee04f437..e6abe562 100644 --- a/src/zmq.h +++ b/src/im-zmq.h @@ -3,13 +3,13 @@ * * ZMQ data interface * - * Copyright © 2017-2018 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2017-2019 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * * Authors: - * 2018 Thomas White <taw@physics.org> - * 2014 Valerio Mariani - * 2017 Stijna de Graaf + * 2018-2019 Thomas White <taw@physics.org> + * 2014 Valerio Mariani + * 2017 Stijn de Graaf * * This file is part of CrystFEL. * @@ -40,9 +40,18 @@ #include "image.h" +extern struct im_zmq *im_zmq_connect(const char *zmq_address); + +extern msgpack_object *im_zmq_fetch(struct im_zmq *z); + +extern void im_zmq_clean(struct im_zmq *z); + +extern void im_zmq_shutdown(struct im_zmq *z); + extern int get_peaks_onda(msgpack_object *obj, struct image *image, int half_pixel_shift); extern int obj_read(msgpack_object *obj, struct image *image); + #endif /* CRYSTFEL_ZMQ_H */ diff --git a/src/indexamajig.c b/src/indexamajig.c index 2d584726..356d0c94 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -86,6 +86,7 @@ static void show_help(const char *s) " --profile Show timing data for performance monitoring\n" " --temp-dir=<path> Put the temporary folder under <path>\n" " --wait-for-file=<n> Time to wait for each file before processing\n" +" --zmq-msgpack Receive data in MessagePack format over ZMQ\n" "\nPeak search options:\n\n" " --peaks=<method> Peak search method (zaef,peakfinder8,peakfinder9,hdf5,cxi)\n" " Default: zaef\n" @@ -340,6 +341,8 @@ int main(int argc, char *argv[]) int if_retry = 1; int serial_start = 1; char *spectrum_fn = NULL; + int zmq = 0; + char *zmq_address = NULL; /* Defaults */ iargs.cell = NULL; @@ -457,6 +460,7 @@ int main(int argc, char *argv[]) {"no-multi", 0, &if_multi, 0}, {"multi", 0, &if_multi, 1}, {"overpredict", 0, &iargs.overpredict, 1}, + {"zmq-msgpack", 0, &zmq, 1}, /* Long-only options which don't actually do anything */ {"no-sat-corr", 0, &iargs.satcorr, 0}, @@ -1297,8 +1301,22 @@ int main(int argc, char *argv[]) gsl_set_error_handler_off(); + if ( zmq ) { + char line[1024]; + char *rval; + rval = fgets(line, 1024, fh); + if ( rval == NULL ) { + ERROR("Failed to read ZMQ server/port from input.\n"); + return 1; + } + chomp(line); + zmq_address = strdup(line); + /* In future, read multiple addresses and hand them out + * evenly to workers */ + } + r = create_sandbox(&iargs, n_proc, prefix, config_basename, fh, - st, tmpdir, serial_start); + st, tmpdir, serial_start, zmq_address); free_imagefile_field_list(iargs.copyme); cell_free(iargs.cell); diff --git a/src/process_image.c b/src/process_image.c index 2f2eb698..b3b3b1da 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -3,12 +3,13 @@ * * The processing pipeline for one image * - * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * * Authors: - * 2010-2017 Thomas White <taw@physics.org> + * 2010-2019 Thomas White <taw@physics.org> * 2014-2017 Valerio Mariani <valerio.mariani@desy.de> + * 2017 Stijn de Graaf * * This file is part of CrystFEL. * @@ -38,6 +39,7 @@ #include <gsl/gsl_sort.h> #include <unistd.h> #include <sys/stat.h> +#include <msgpack.h> #include "utils.h" #include "hdf5-file.h" diff --git a/src/process_image.h b/src/process_image.h index 2a43d11d..b61fe83f 100644 --- a/src/process_image.h +++ b/src/process_image.h @@ -3,11 +3,11 @@ * * The processing pipeline for one image * - * Copyright © 2012-2018 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2019 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * * Authors: - * 2010-2016 Thomas White <taw@physics.org> + * 2010-2019 Thomas White <taw@physics.org> * 2014-2017 Valerio Mariani <valerio.mariani@desy.de> * 2017-2018 Yaroslav Gevorkov <yaroslav.gevorkov@desy.de> * @@ -37,6 +37,8 @@ struct index_args; +#include <msgpack.h> + #include "integration.h" #include "im-sandbox.h" #include "time-accounts.h" @@ -122,6 +124,7 @@ struct pattern_args { /* "Input" */ struct filename_plus_event *filename_p_e; + msgpack_object *msgpack_obj; }; |