diff options
-rw-r--r-- | doc/man/indexamajig.1 | 5 | ||||
-rw-r--r-- | src/im-asapo.c | 113 | ||||
-rw-r--r-- | src/im-asapo.h | 9 | ||||
-rw-r--r-- | src/im-sandbox.c | 3 | ||||
-rw-r--r-- | src/indexamajig.c | 8 | ||||
-rw-r--r-- | src/process_image.c | 5 | ||||
-rw-r--r-- | src/process_image.h | 4 |
7 files changed, 142 insertions, 5 deletions
diff --git a/doc/man/indexamajig.1 b/doc/man/indexamajig.1 index 8f4dc6a2..0f35d4d8 100644 --- a/doc/man/indexamajig.1 +++ b/doc/man/indexamajig.1 @@ -199,6 +199,11 @@ Receive data via the specified ASAP::O endpoint. This option and \fB--zmq-input Authentication token, beamtime, data source, consumer group and stream, respectively, for ASAP::O data. .PD 0 +.IP \fB--asapo-output-stream=\fIstream\fR +.PD +Send an output stream via ASAP::O. For non-hits, a small placeholder will be sent. + +.PD 0 .IP \fB--asapo-wait-for-stream .PD If the ASAP::O stream does not exist, wait for it to be appear. Without this option, indexamajig will exit immediately if the stream is not found. diff --git a/src/im-asapo.c b/src/im-asapo.c index 43455298..37784bf3 100644 --- a/src/im-asapo.c +++ b/src/im-asapo.c @@ -36,6 +36,7 @@ #include <assert.h> #include <unistd.h> #include <asapo/consumer_c.h> +#include <asapo/producer_c.h> #include <image.h> #include <utils.h> @@ -50,8 +51,10 @@ struct im_asapo { char *stream; AsapoConsumerHandle consumer; + AsapoProducerHandle producer; AsapoStringHandle group_id; int wait_for_stream; + char *output_stream; }; @@ -96,6 +99,10 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) ERROR("ASAP::O stream not specified.\n"); return NULL; } + if ( strcmp(params->stream, params->output_stream) == 0 ) { + ERROR("ASAP::O input and output streams cannot be the same.\n"); + return NULL; + } a = malloc(sizeof(struct im_asapo)); if ( a == NULL ) return NULL; @@ -108,18 +115,41 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params) params->source, params->token); a->consumer = asapo_create_consumer(params->endpoint, "auto", 0, cred, &err); - asapo_free_handle(&cred); if ( asapo_is_error(err) ) { show_asapo_error("Cannot create ASAP::O consumer", err); + asapo_free_handle(&cred); free(a); return NULL; } + if ( params->output_stream != NULL ) { + a->producer = asapo_create_producer(params->endpoint, + 1, /* Number of sender threads */ + kTcp, + cred, + 60000, /* Timeout */ + &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Cannot create ASAP::O producer", err); + asapo_free_handle(&a->consumer); + asapo_free_handle(&a->group_id); + asapo_free_handle(&cred); + free(a); + return NULL; + } + a->output_stream = strdup(params->output_stream); + } else { + a->producer = NULL; + a->output_stream = NULL; + } + a->stream = strdup(params->stream); asapo_consumer_set_timeout(a->consumer, 3000); a->group_id = asapo_string_from_c_str(params->group_id); a->wait_for_stream = params->wait_for_stream; + asapo_free_handle(&cred); + return a; } @@ -216,6 +246,87 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, } +static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload, + AsapoErrorHandle err) +{ + if ( asapo_is_error(err) ) { + show_asapo_error("ASAP::O send error", err); + } else if ( err != NULL ) { + show_asapo_error("ASAP::O send warning", err); + } +} + + +static void send_real(struct im_asapo *a, struct image *image) +{ + AsapoMessageHeaderHandle header; + AsapoErrorHandle err; + + header = asapo_create_message_header(image->serial, + image->data_block_size, + image->filename, + image->meta_data, + 0, /* Dataset substream */ + 0, + 0); /* Auto ID */ + + asapo_producer_send(a->producer, header, image->data_block, + kDefaultIngestMode, a->output_stream, + send_callback, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't ASAP::O message", err); + asapo_free_handle(&header); + asapo_free_handle(&err); + return; + } + + asapo_free_handle(&header); + asapo_free_handle(&err); +} + + +static void send_placeholder(struct im_asapo *a, struct image *image) +{ + AsapoMessageHeaderHandle header; + AsapoErrorHandle err; + + header = asapo_create_message_header(image->serial, + 8, /* strlen("SKIPPED"+\0) */ + image->filename, + image->meta_data, + 0, /* Dataset substream */ + 0, + 0); /* Auto ID */ + + asapo_producer_send(a->producer, header, "SKIPPED", + kDefaultIngestMode, a->output_stream, + send_callback, &err); + if ( asapo_is_error(err) ) { + show_asapo_error("Couldn't ASAP::O message", err); + asapo_free_handle(&header); + asapo_free_handle(&err); + return; + } + + asapo_free_handle(&header); + asapo_free_handle(&err); +} + + +/* Send the image to the output ASAP::O stream, if it's a hit. Otherwise, + * send a placeholder */ +void im_asapo_send(struct im_asapo *a, struct image *image, int hit) +{ + profile_start("asapo-send"); + if ( hit ) { + send_real(a, image); + } else { + send_placeholder(a, image); + } + profile_end("asapo-send"); +} + + void im_asapo_shutdown(struct im_asapo *a) { if ( a == NULL ) return; diff --git a/src/im-asapo.h b/src/im-asapo.h index cf7edfe4..85fffc8a 100644 --- a/src/im-asapo.h +++ b/src/im-asapo.h @@ -43,8 +43,11 @@ struct im_asapo_params char *source; char *stream; int wait_for_stream; + char *output_stream; }; +struct im_asapo; + #if defined(HAVE_ASAPO) extern struct im_asapo *im_asapo_connect(struct im_asapo_params *params); @@ -55,6 +58,8 @@ extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size, char **pmeta, char **pfilename, char **pevent, int *pfinished, int *pmessageid); +extern void im_asapo_send(struct im_asapo *a, struct image *image, int hit); + #else /* defined(HAVE_ASAPO) */ static UNUSED struct im_asapo *im_asapo_connect(struct im_asapo_params *params) @@ -80,6 +85,10 @@ static UNUSED void *im_asapo_fetch(struct im_asapo *a, size_t *psize, return NULL; } +static UNUSED void im_asapo_send(struct im_asapo *a, struct image *image, int hit) +{ +} + #endif /* defined(HAVE_ASAPO) */ #endif /* CRYSTFEL_ASAPO_H */ diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 63ebc537..7f02b1cf 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -516,7 +516,8 @@ static int run_work(const struct index_args *iargs, Stream *st, sb->shared->time_last_start[cookie] = get_monotonic_seconds(); profile_start("process-image"); process_image(iargs, &pargs, st, cookie, tmpdir, ser, - sb->shared, sb->shared->last_task[cookie]); + sb->shared, sb->shared->last_task[cookie], + asapostuff); profile_end("process-image"); } diff --git a/src/indexamajig.c b/src/indexamajig.c index 427843bc..3bcc06b9 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -449,6 +449,10 @@ static error_t parse_arg(int key, char *arg, struct argp_state *state) break; case 222 : + args->asapo_params.output_stream = strdup(arg); + break; + + case 223 : args->cpu_pin = 1; break; @@ -887,6 +891,7 @@ int main(int argc, char *argv[]) args.asapo_params.source = NULL; args.asapo_params.stream = NULL; args.asapo_params.wait_for_stream = 0; + args.asapo_params.output_stream = NULL; args.cpu_pin = 0; args.serial_start = 1; args.if_peaks = 1; @@ -1009,7 +1014,8 @@ int main(int argc, char *argv[]) {"asapo-stream", 220, "str", OPTION_NO_USAGE, "ASAP::O stream name"}, {"asapo-wait-for-stream", 221, NULL, OPTION_NO_USAGE, "Wait for ASAP::O stream to appear"}, - {"cpu-pin", 222, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"}, + {"asapo-output-stream", 222, "str", OPTION_NO_USAGE, "ASAP::O output stream name"}, + {"cpu-pin", 223, NULL, OPTION_NO_USAGE, "Pin worker processes to CPUs"}, {NULL, 0, 0, OPTION_DOC, "Peak search options:", 3}, {"peaks", 301, "method", 0, "Peak search method. Default: zaef"}, diff --git a/src/process_image.c b/src/process_image.c index 5bc527a1..8acd3e86 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -56,6 +56,7 @@ #include "predict-refine.h" #include "im-sandbox.h" #include "im-zmq.h" +#include "im-asapo.h" static float **backup_image_data(float **dp, struct detgeom *det) { @@ -177,7 +178,7 @@ static struct image *file_wait_open_read(const char *filename, void process_image(const struct index_args *iargs, struct pattern_args *pargs, Stream *st, int cookie, const char *tmpdir, int serial, struct sb_shm *sb_shared, - char *last_task) + char *last_task, struct im_asapo *asapostuff) { struct image *image; int i; @@ -480,6 +481,8 @@ streamwrite: "%s\n", n, n>1?"s":"", image->filename, image->ev); } + im_asapo_send(asapostuff, image, image->hit); + out: /* Count crystals which are still good */ set_last_task(last_task, "process_image finalisation"); diff --git a/src/process_image.h b/src/process_image.h index bb68ce19..cca2f7d2 100644 --- a/src/process_image.h +++ b/src/process_image.h @@ -45,6 +45,7 @@ struct index_args; #include "im-sandbox.h" #include "peaks.h" #include "image.h" +#include "im-asapo.h" /* Information about the indexing process which is common to all patterns */ @@ -113,7 +114,8 @@ struct pattern_args extern void process_image(const struct index_args *iargs, struct pattern_args *pargs, Stream *st, int cookie, const char *tmpdir, int serial, - struct sb_shm *sb_shared, char *last_task); + struct sb_shm *sb_shared, char *last_task, + struct im_asapo *asapostuff); #endif /* PROCESS_IMAGE_H */ |