aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2019-01-10 17:02:15 +0100
committerThomas White <taw@physics.org>2019-03-14 11:36:56 +0100
commit4d62f31f90b76bce8b66fe2be6ccccb7b1542209 (patch)
tree14bf58ae1f03e85ffb803ecab4bfb95864cb38e5
parent002d6cfab105095ddc6b1cdfde9eb939c12ca0f8 (diff)
Connect up hooks for unpacking MsgPack data
-rw-r--r--src/im-zmq.c8
-rw-r--r--src/im-zmq.h6
-rw-r--r--src/indexamajig.c2
-rw-r--r--src/process_image.c118
-rw-r--r--src/process_image.h1
5 files changed, 79 insertions, 56 deletions
diff --git a/src/im-zmq.c b/src/im-zmq.c
index c299b980..f501095a 100644
--- a/src/im-zmq.c
+++ b/src/im-zmq.c
@@ -136,7 +136,7 @@ void im_zmq_shutdown(struct im_zmq *z)
}
/**
- * get_peaks_onda:
+ * get_peaks_msgpack:
* @obj: A %msgpack_object containing data in OnDA format
* @image: An %image structure
* @half_pixel_shift: Non-zero if 0.5 should be added to all peak coordinates
@@ -158,8 +158,8 @@ void im_zmq_shutdown(struct im_zmq *z)
* Returns: non-zero on error, zero otherwise.
*
*/
-int get_peaks_onda(msgpack_object *obj, struct image *image,
- int half_pixel_shift)
+int get_peaks_msgpack(msgpack_object *obj, struct image *image,
+ int half_pixel_shift)
{
int num_peaks;
@@ -273,7 +273,7 @@ static void onda_fill_in_beam_parameters(struct beam_params *beam,
* ...
* }
*/
-int obj_read(msgpack_object *obj, struct image *image)
+int unpack_msgpack_data(msgpack_object *obj, struct image *image)
{
uint16_t *flags = NULL;
diff --git a/src/im-zmq.h b/src/im-zmq.h
index e6abe562..eb2c6068 100644
--- a/src/im-zmq.h
+++ b/src/im-zmq.h
@@ -48,10 +48,10 @@ extern void im_zmq_clean(struct im_zmq *z);
extern void im_zmq_shutdown(struct im_zmq *z);
-extern int get_peaks_onda(msgpack_object *obj, struct image *image,
- int half_pixel_shift);
+extern int get_peaks_msgpack(msgpack_object *obj, struct image *image,
+ int half_pixel_shift);
-extern int obj_read(msgpack_object *obj, struct image *image);
+extern int unpack_msgpack_data(msgpack_object *obj, struct image *image);
#endif /* CRYSTFEL_ZMQ_H */
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 356d0c94..37cec295 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -1006,6 +1006,8 @@ int main(int argc, char *argv[])
iargs.peaks = PEAK_CXI;
} else if ( strcmp(speaks, "peakfinder9") == 0 ) {
iargs.peaks = PEAK_PEAKFINDER9;
+ } else if ( strcmp(speaks, "msgpack") == 0 ) {
+ iargs.peaks = PEAK_MSGPACK;
} else {
ERROR("Unrecognised peak detection method '%s'\n", speaks);
return 1;
diff --git a/src/process_image.c b/src/process_image.c
index 7667a05c..afd43d3b 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -39,7 +39,6 @@
#include <gsl/gsl_sort.h>
#include <unistd.h>
#include <sys/stat.h>
-#include <msgpack.h>
#include "utils.h"
#include "hdf5-file.h"
@@ -56,6 +55,7 @@
#include "predict-refine.h"
#include "im-sandbox.h"
#include "time-accounts.h"
+#include "im-zmq.h"
static float **backup_image_data(float **dp, struct detector *det)
@@ -100,39 +100,16 @@ static void restore_image_data(float **dp, struct detector *det, float **bu)
}
-void process_image(const struct index_args *iargs, struct pattern_args *pargs,
- Stream *st, int cookie, const char *tmpdir,
- int serial, struct sb_shm *sb_shared, TimeAccounts *taccs,
- char *last_task)
+static int file_wait_open_read(struct sb_shm *sb_shared, struct image *image,
+ TimeAccounts *taccs, char *last_task,
+ signed int wait_for_file, int cookie,
+ struct imagefile **pimfile)
{
- struct imagefile *imfile;
- struct image image;
- int i;
- int r;
- int ret;
- char *rn;
- float **prefilter;
- int any_crystals;
+ signed int file_wait_time = wait_for_file;
int wait_message_done = 0;
int read_retry_done = 0;
- signed int file_wait_time = iargs->wait_for_file;
-
- image.features = NULL;
- image.copyme = iargs->copyme;
- image.id = cookie;
- image.beam = iargs->beam;
- image.det = copy_geom(iargs->det);
- image.crystals = NULL;
- image.n_crystals = 0;
- image.serial = serial;
- image.indexed_by = INDEXING_NONE;
-
- if ( pargs->filename_p_e != NULL ) {
- image.filename = pargs->filename_p_e->filename;
- image.event = pargs->filename_p_e->ev;
- } else if ( pargs->msgpack_obj != NULL ) {
- STATUS("Msgpack!\n");
- }
+ int r;
+ struct imagefile *imfile;
time_accounts_set(taccs, TACC_WAITFILE);
set_last_task(last_task, "wait for file");
@@ -142,29 +119,27 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
struct stat statbuf;
sb_shared->pings[cookie]++;
- r = stat(image.filename, &statbuf);
+ r = stat(image->filename, &statbuf);
if ( r ) {
- if ( (iargs->wait_for_file != 0)
- && (file_wait_time != 0) )
- {
+ if ( (wait_for_file != 0) && (file_wait_time != 0) ) {
if ( !wait_message_done ) {
STATUS("Waiting for '%s'\n",
- image.filename);
+ image->filename);
wait_message_done = 1;
}
sleep(1);
- if ( iargs->wait_for_file != -1 ) {
+ if ( wait_for_file != -1 ) {
file_wait_time--;
}
continue;
}
- ERROR("File %s not found\n", image.filename);
- return;
+ ERROR("File %s not found\n", image->filename);
+ return 1;
}
} while ( r );
@@ -174,42 +149,82 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
sb_shared->pings[cookie]++;
do {
- imfile = imagefile_open(image.filename);
+ imfile = imagefile_open(image->filename);
if ( imfile == NULL ) {
- if ( iargs->wait_for_file && !read_retry_done ) {
+ if ( wait_for_file && !read_retry_done ) {
read_retry_done = 1;
r = 1;
STATUS("File '%s' exists but could not be opened."
" Trying again after 10 seconds.\n",
- image.filename);
+ image->filename);
sleep(10);
continue;
}
- ERROR("Couldn't open file: %s\n", image.filename);
- return;
+ ERROR("Couldn't open file: %s\n", image->filename);
+ return 1;
}
time_accounts_set(taccs, TACC_HDF5READ);
set_last_task(last_task, "read file");
sb_shared->pings[cookie]++;
- r = imagefile_read(imfile, &image, image.event);
+ r = imagefile_read(imfile, image, image->event);
if ( r ) {
- if ( iargs->wait_for_file && !read_retry_done ) {
+ if ( wait_for_file && !read_retry_done ) {
read_retry_done = 1;
imagefile_close(imfile);
STATUS("File '%s' exists but could not be read."
" Trying again after 10 seconds.\n",
- image.filename);
+ image->filename);
sleep(10);
continue;
}
- ERROR("Couldn't open file: %s\n", image.filename);
- return;
+ ERROR("Couldn't open file: %s\n", image->filename);
+ return 1;
}
} while ( r );
+ *pimfile = imfile;
+ return 0;
+}
+
+
+void process_image(const struct index_args *iargs, struct pattern_args *pargs,
+ Stream *st, int cookie, const char *tmpdir,
+ int serial, struct sb_shm *sb_shared, TimeAccounts *taccs,
+ char *last_task)
+{
+ struct imagefile *imfile;
+ struct image image;
+ int i;
+ int r;
+ int ret;
+ char *rn;
+ float **prefilter;
+ int any_crystals;
+
+ image.features = NULL;
+ image.copyme = iargs->copyme;
+ image.id = cookie;
+ image.beam = iargs->beam;
+ image.det = copy_geom(iargs->det);
+ image.crystals = NULL;
+ image.n_crystals = 0;
+ image.serial = serial;
+ image.indexed_by = INDEXING_NONE;
+
+ if ( pargs->filename_p_e != NULL ) {
+ image.filename = pargs->filename_p_e->filename;
+ image.event = pargs->filename_p_e->ev;
+ if ( file_wait_open_read(sb_shared, &image, taccs, last_task,
+ iargs->wait_for_file, cookie,
+ &imfile) ) return;
+ } else if ( pargs->msgpack_obj != NULL ) {
+ STATUS("Msgpack!\n");
+ if ( unpack_msgpack_data(pargs->msgpack_obj, &image) ) return;
+ }
+
/* Take snapshot of image before applying horrible noise filters */
time_accounts_set(taccs, TACC_FILTER);
set_last_task(last_task, "image filter");
@@ -322,6 +337,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
}
break;
+ case PEAK_MSGPACK:
+ get_peaks_msgpack(pargs->msgpack_obj, &image,
+ iargs->half_pixel_shift);
+ break;
+
}
image.peak_resolution = estimate_peak_resolution(image.features,
diff --git a/src/process_image.h b/src/process_image.h
index b61fe83f..54d97f77 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -53,6 +53,7 @@ enum {
PEAK_ZAEF,
PEAK_HDF5,
PEAK_CXI,
+ PEAK_MSGPACK,
};