aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/man/indexamajig.15
-rw-r--r--src/im-asapo.c113
-rw-r--r--src/im-asapo.h9
-rw-r--r--src/im-sandbox.c3
-rw-r--r--src/indexamajig.c8
-rw-r--r--src/process_image.c5
-rw-r--r--src/process_image.h4
7 files changed, 142 insertions, 5 deletions
diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1
index 8f4dc6a2..0f35d4d8 100644
--- a/doc/man/indexamajig.1
+++ b/doc/man/indexamajig.1
@@ -199,6 +199,11 @@ Receive data via the specified ASAP::O endpoint. This option and \fB--zmq-input
Authentication token, beamtime, data source, consumer group and stream, respectively, for ASAP::O data.
.PD 0
+.IP \fB--asapo-output-stream=\fIstream\fR
+.PD
+Send an output stream via ASAP::O. For non-hits, a small placeholder will be sent.
+
+.PD 0
.IP \fB--asapo-wait-for-stream
.PD
If the ASAP::O stream does not exist, wait for it to be appear. Without this option, indexamajig will exit immediately if the stream is not found.
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 43455298..37784bf3 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -36,6 +36,7 @@
#include <assert.h>
#include <unistd.h>
#include <asapo/consumer_c.h>
+#include <asapo/producer_c.h>
#include <image.h>
#include <utils.h>
@@ -50,8 +51,10 @@ struct im_asapo
{
char *stream;
AsapoConsumerHandle consumer;
+ AsapoProducerHandle producer;
AsapoStringHandle group_id;
int wait_for_stream;
+ char *output_stream;
};
@@ -96,6 +99,10 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
ERROR("ASAP::O stream not specified.\n");
return NULL;
}
+ if ( strcmp(params->stream, params->output_stream) == 0 ) {
+ ERROR("ASAP::O input and output streams cannot be the same.\n");
+ return NULL;
+ }
a = malloc(sizeof(struct im_asapo));
if ( a == NULL ) return NULL;
@@ -108,18 +115,41 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
params->source,
params->token);
a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err);
- asapo_free_handle(&cred);
if ( asapo_is_error(err) ) {
show_asapo_error("Cannot create ASAP::O consumer", err);
+ asapo_free_handle(&cred);
free(a);
return NULL;
}
+ if ( params->output_stream != NULL ) {
+ a->producer = asapo_create_producer(params->endpoint,
+ 1, /* Number of sender threads */
+ kTcp,
+ cred,
+ 60000, /* Timeout */
+ &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Cannot create ASAP::O producer", err);
+ asapo_free_handle(&a->consumer);
+ asapo_free_handle(&a->group_id);
+ asapo_free_handle(&cred);
+ free(a);
+ return NULL;
+ }
+ a->output_stream = strdup(params->output_stream);
+ } else {
+ a->producer = NULL;
+ a->output_stream = NULL;
+ }
+
a->stream = strdup(params->stream);
asapo_consumer_set_timeout(a->consumer, 3000);
a->group_id = asapo_string_from_c_str(params->group_id);
a->wait_for_stream = params->wait_for_stream;
+ asapo_free_handle(&cred);
+
return a;
}
@@ -216,6 +246,87 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
}
+static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload,
+ AsapoErrorHandle err)
+{
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("ASAP::O send error", err);
+ } else if ( err != NULL ) {
+ show_asapo_error("ASAP::O send warning", err);
+ }
+}
+
+
+static void send_real(struct im_asapo *a, struct image *image)
+{
+ AsapoMessageHeaderHandle header;
+ AsapoErrorHandle err;
+
+ header = asapo_create_message_header(image->serial,
+ image->data_block_size,
+ image->filename,
+ image->meta_data,
+ 0, /* Dataset substream */
+ 0,
+ 0); /* Auto ID */
+
+ asapo_producer_send(a->producer, header, image->data_block,
+ kDefaultIngestMode, a->output_stream,
+ send_callback, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't ASAP::O message", err);
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+ return;
+ }
+
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+}
+
+
+static void send_placeholder(struct im_asapo *a, struct image *image)
+{
+ AsapoMessageHeaderHandle header;
+ AsapoErrorHandle err;
+
+ header = asapo_create_message_header(image->serial,
+ 8, /* strlen("SKIPPED"+\0) */
+ image->filename,
+ image->meta_data,
+ 0, /* Dataset substream */
+ 0,
+ 0); /* Auto ID */
+
+ asapo_producer_send(a->producer, header, "SKIPPED",
+ kDefaultIngestMode, a->output_stream,
+ send_callback, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't ASAP::O message", err);
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+ return;
+ }
+
+ asapo_free_handle(&header);
+ asapo_free_handle(&err);
+}
+
+
+/* Send the image to the output ASAP::O stream, if it's a hit. Otherwise,
+ * send a placeholder */
+void im_asapo_send(struct im_asapo *a, struct image *image, int hit)
+{
+ profile_start("asapo-send");
+ if ( hit ) {
+ send_real(a, image);
+ } else {
+ send_placeholder(a, image);
+ }
+ profile_end("asapo-send");
+}
+
+
void im_asapo_shutdown(struct im_asapo *a)
{
if ( a == NULL ) return;
diff --git a/src/im-asapo.h b/src/im-asapo.h
index cf7edfe4..85fffc8a 100644
--- a/src/im-asapo.h
+++ b/src/im-asapo.h
@@ -43,8 +43,11 @@ struct im_asapo_params
char *source;
char *stream;
int wait_for_stream;
+ char *output_stream;
};
+struct im_asapo;
+
#if defined(HAVE_ASAPO)
extern struct im_asapo *im_asapo_connect(struct im_asapo_params *params);
@@ -55,6 +58,8 @@ extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
char **pmeta, char **pfilename, char **pevent,
int *pfinished, int *pmessageid);
+extern void im_asapo_send(struct im_asapo *a, struct image *image, int hit);
+
#else /* defined(HAVE_ASAPO) */
static UNUSED struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
@@ -80,6 +85,10 @@ static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize,
return NULL;
}
+static UNUSED void im_asapo_send(struct im_asapo *a, struct image *image, int hit)
+{
+}
+
#endif /* defined(HAVE_ASAPO) */
#endif /* CRYSTFEL_ASAPO_H */
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 63ebc537..7f02b1cf 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -516,7 +516,8 @@ static int run_work(const struct index_args *iargs, Stream *st,
sb->shared->time_last_start[cookie] = get_monotonic_seconds();
profile_start("process-image");
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
- sb->shared, sb->shared->last_task[cookie]);
+ sb->shared, sb->shared->last_task[cookie],
+ asapostuff);
profile_end("process-image");
}
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 427843bc..3bcc06b9 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -449,6 +449,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state)
break;
case 222 :
+ args->asapo_params.output_stream = strdup(arg);
+ break;
+
+ case 223 :
args->cpu_pin = 1;
break;
@@ -887,6 +891,7 @@ int main(int argc, char *argv[])
args.asapo_params.source = NULL;
args.asapo_params.stream = NULL;
args.asapo_params.wait_for_stream = 0;
+ args.asapo_params.output_stream = NULL;
args.cpu_pin = 0;
args.serial_start = 1;
args.if_peaks = 1;
@@ -1009,7 +1014,8 @@ int main(int argc, char *argv[])
{"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"},
{"asapo-wait-for-stream", 221, NULL, OPTION_NO_USAGE,
"Wait for ASAP::O stream to appear"},
- {"cpu-pin", 222, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"},
+ {"asapo-output-stream", 222, "str", OPTION_NO_USAGE, "ASAP::O output stream name"},
+ {"cpu-pin", 223, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"},
{NULL, 0, 0, OPTION_DOC, "Peak search options:", 3},
{"peaks", 301, "method", 0, "Peak search method. Default: zaef"},
diff --git a/src/process_image.c b/src/process_image.c
index 5bc527a1..8acd3e86 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -56,6 +56,7 @@
#include "predict-refine.h"
#include "im-sandbox.h"
#include "im-zmq.h"
+#include "im-asapo.h"
static float **backup_image_data(float **dp, struct detgeom *det)
{
@@ -177,7 +178,7 @@ static struct image *file_wait_open_read(const char *filename,
void process_image(const struct index_args *iargs, struct pattern_args *pargs,
Stream *st, int cookie, const char *tmpdir,
int serial, struct sb_shm *sb_shared,
- char *last_task)
+ char *last_task, struct im_asapo *asapostuff)
{
struct image *image;
int i;
@@ -480,6 +481,8 @@ streamwrite:
"%s\n", n, n>1?"s":"", image->filename, image->ev);
}
+ im_asapo_send(asapostuff, image, image->hit);
+
out:
/* Count crystals which are still good */
set_last_task(last_task, "process_image finalisation");
diff --git a/src/process_image.h b/src/process_image.h
index bb68ce19..cca2f7d2 100644
--- a/src/process_image.h
+++ b/src/process_image.h
@@ -45,6 +45,7 @@ struct index_args;
#include "im-sandbox.h"
#include "peaks.h"
#include "image.h"
+#include "im-asapo.h"
/* Information about the indexing process which is common to all patterns */
@@ -113,7 +114,8 @@ struct pattern_args
extern void process_image(const struct index_args *iargs,
struct pattern_args *pargs, Stream *st,
int cookie, const char *tmpdir, int serial,
- struct sb_shm *sb_shared, char *last_task);
+ struct sb_shm *sb_shared, char *last_task,
+ struct im_asapo *asapostuff);
#endif /* PROCESS_IMAGE_H */