diff options
author | Thomas White <taw@physics.org> | 2019-01-10 17:02:15 +0100 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2019-03-14 11:36:56 +0100 |
commit | 4d62f31f90b76bce8b66fe2be6ccccb7b1542209 (patch) | |
tree | 14bf58ae1f03e85ffb803ecab4bfb95864cb38e5 /src/process_image.c | |
parent | 002d6cfab105095ddc6b1cdfde9eb939c12ca0f8 (diff) |
Connect up hooks for unpacking MsgPack data
Diffstat (limited to 'src/process_image.c')
-rw-r--r-- | src/process_image.c | 118 |
1 files changed, 69 insertions, 49 deletions
diff --git a/src/process_image.c b/src/process_image.c index 7667a05c..afd43d3b 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -39,7 +39,6 @@ #include <gsl/gsl_sort.h> #include <unistd.h> #include <sys/stat.h> -#include <msgpack.h> #include "utils.h" #include "hdf5-file.h" @@ -56,6 +55,7 @@ #include "predict-refine.h" #include "im-sandbox.h" #include "time-accounts.h" +#include "im-zmq.h" static float **backup_image_data(float **dp, struct detector *det) @@ -100,39 +100,16 @@ static void restore_image_data(float **dp, struct detector *det, float **bu) } -void process_image(const struct index_args *iargs, struct pattern_args *pargs, - Stream *st, int cookie, const char *tmpdir, - int serial, struct sb_shm *sb_shared, TimeAccounts *taccs, - char *last_task) +static int file_wait_open_read(struct sb_shm *sb_shared, struct image *image, + TimeAccounts *taccs, char *last_task, + signed int wait_for_file, int cookie, + struct imagefile **pimfile) { - struct imagefile *imfile; - struct image image; - int i; - int r; - int ret; - char *rn; - float **prefilter; - int any_crystals; + signed int file_wait_time = wait_for_file; int wait_message_done = 0; int read_retry_done = 0; - signed int file_wait_time = iargs->wait_for_file; - - image.features = NULL; - image.copyme = iargs->copyme; - image.id = cookie; - image.beam = iargs->beam; - image.det = copy_geom(iargs->det); - image.crystals = NULL; - image.n_crystals = 0; - image.serial = serial; - image.indexed_by = INDEXING_NONE; - - if ( pargs->filename_p_e != NULL ) { - image.filename = pargs->filename_p_e->filename; - image.event = pargs->filename_p_e->ev; - } else if ( pargs->msgpack_obj != NULL ) { - STATUS("Msgpack!\n"); - } + int r; + struct imagefile *imfile; time_accounts_set(taccs, TACC_WAITFILE); set_last_task(last_task, "wait for file"); @@ -142,29 +119,27 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, struct stat statbuf; sb_shared->pings[cookie]++; - r = stat(image.filename, &statbuf); + r = stat(image->filename, &statbuf); if ( r ) { - if ( (iargs->wait_for_file != 0) - && (file_wait_time != 0) ) - { + if ( (wait_for_file != 0) && (file_wait_time != 0) ) { if ( !wait_message_done ) { STATUS("Waiting for '%s'\n", - image.filename); + image->filename); wait_message_done = 1; } sleep(1); - if ( iargs->wait_for_file != -1 ) { + if ( wait_for_file != -1 ) { file_wait_time--; } continue; } - ERROR("File %s not found\n", image.filename); - return; + ERROR("File %s not found\n", image->filename); + return 1; } } while ( r ); @@ -174,42 +149,82 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, sb_shared->pings[cookie]++; do { - imfile = imagefile_open(image.filename); + imfile = imagefile_open(image->filename); if ( imfile == NULL ) { - if ( iargs->wait_for_file && !read_retry_done ) { + if ( wait_for_file && !read_retry_done ) { read_retry_done = 1; r = 1; STATUS("File '%s' exists but could not be opened." " Trying again after 10 seconds.\n", - image.filename); + image->filename); sleep(10); continue; } - ERROR("Couldn't open file: %s\n", image.filename); - return; + ERROR("Couldn't open file: %s\n", image->filename); + return 1; } time_accounts_set(taccs, TACC_HDF5READ); set_last_task(last_task, "read file"); sb_shared->pings[cookie]++; - r = imagefile_read(imfile, &image, image.event); + r = imagefile_read(imfile, image, image->event); if ( r ) { - if ( iargs->wait_for_file && !read_retry_done ) { + if ( wait_for_file && !read_retry_done ) { read_retry_done = 1; imagefile_close(imfile); STATUS("File '%s' exists but could not be read." " Trying again after 10 seconds.\n", - image.filename); + image->filename); sleep(10); continue; } - ERROR("Couldn't open file: %s\n", image.filename); - return; + ERROR("Couldn't open file: %s\n", image->filename); + return 1; } } while ( r ); + *pimfile = imfile; + return 0; +} + + +void process_image(const struct index_args *iargs, struct pattern_args *pargs, + Stream *st, int cookie, const char *tmpdir, + int serial, struct sb_shm *sb_shared, TimeAccounts *taccs, + char *last_task) +{ + struct imagefile *imfile; + struct image image; + int i; + int r; + int ret; + char *rn; + float **prefilter; + int any_crystals; + + image.features = NULL; + image.copyme = iargs->copyme; + image.id = cookie; + image.beam = iargs->beam; + image.det = copy_geom(iargs->det); + image.crystals = NULL; + image.n_crystals = 0; + image.serial = serial; + image.indexed_by = INDEXING_NONE; + + if ( pargs->filename_p_e != NULL ) { + image.filename = pargs->filename_p_e->filename; + image.event = pargs->filename_p_e->ev; + if ( file_wait_open_read(sb_shared, &image, taccs, last_task, + iargs->wait_for_file, cookie, + &imfile) ) return; + } else if ( pargs->msgpack_obj != NULL ) { + STATUS("Msgpack!\n"); + if ( unpack_msgpack_data(pargs->msgpack_obj, &image) ) return; + } + /* Take snapshot of image before applying horrible noise filters */ time_accounts_set(taccs, TACC_FILTER); set_last_task(last_task, "image filter"); @@ -322,6 +337,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, } break; + case PEAK_MSGPACK: + get_peaks_msgpack(pargs->msgpack_obj, &image, + iargs->half_pixel_shift); + break; + } image.peak_resolution = estimate_peak_resolution(image.features, |