aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2024-04-24 13:47:54 +0200
committerThomas White <taw@physics.org>2024-04-24 13:47:54 +0200
commitd7329d0a207cd17cde5651abd989a66c1b5da82c (patch)
tree74e98db0e28d2215fbe3a431df8246a28b838cf0
parent895a493ce36f7fc81232727177145bf3fd2bde63 (diff)
indexamajig: Add ASAP::O acknowledgements
-rw-r--r--src/im-asapo.c14
-rw-r--r--src/im-asapo.h6
-rw-r--r--src/im-sandbox.c10
3 files changed, 27 insertions, 3 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 07666d97..6df14e93 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -173,6 +173,8 @@ struct im_asapo *im_asapo_connect(struct im_asapo_params *params)
a->group_id = asapo_string_from_c_str(params->group_id);
a->wait_for_stream = params->wait_for_stream;
+ asapo_consumer_set_resend_nacs(a->consumer, 1, 10000, 3);
+
asapo_free_handle(&cred);
return a;
@@ -271,6 +273,18 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
}
+void im_asapo_finalise(struct im_asapo *a, uint64_t message_id)
+{
+ AsapoErrorHandle err = asapo_new_handle();
+ asapo_consumer_acknowledge(a->consumer, a->group_id, message_id,
+ a->stream, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't acknowledge ASAP::O message", err);
+ }
+ asapo_free_handle(&err);
+}
+
+
static void send_callback(void *a, AsapoRequestCallbackPayloadHandle payload,
AsapoErrorHandle err)
{
diff --git a/src/im-asapo.h b/src/im-asapo.h
index 424d8b59..d06f9527 100644
--- a/src/im-asapo.h
+++ b/src/im-asapo.h
@@ -59,6 +59,8 @@ extern void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
char **pmeta, char **pfilename, char **pevent,
int *pfinished, int *pmessageid);
+extern void im_asapo_finalise(struct im_asapo *a, uint64_t message_id);
+
extern void im_asapo_send(struct im_asapo *a, struct image *image, int hit);
#else /* defined(HAVE_ASAPO) */
@@ -90,6 +92,10 @@ static UNUSED void im_asapo_send(struct im_asapo *a, struct image *image, int hi
{
}
+static UNUSED void im_asapo_finalise(struct im_asapo *a, uint64_t message_id)
+{
+}
+
#endif /* defined(HAVE_ASAPO) */
#endif /* CRYSTFEL_ASAPO_H */
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index e97e204f..3d9f1f96 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -343,6 +343,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
struct im_asapo *asapostuff = NULL;
Mille *mille;
ImageDataArrays *ida;
+ int asapo_message_id;
if ( sb->profile ) {
profile_init();
@@ -490,7 +491,6 @@ static int run_work(const struct index_args *iargs, Stream *st,
char *filename;
char *event;
int finished = 0;
- int message_id;
profile_start("asapo-fetch");
set_last_task(sb->shared->last_task[cookie], "ASAPO fetch");
@@ -500,7 +500,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
&filename,
&event,
&finished,
- &message_id);
+ &asapo_message_id);
profile_end("asapo-fetch");
if ( pargs.asapo_data != NULL ) {
ok = 1;
@@ -515,7 +515,7 @@ static int run_work(const struct index_args *iargs, Stream *st,
/* We will also use ASAP::O's serial number
* instead of our own. */
- ser = message_id;
+ ser = asapo_message_id;
} else {
if ( finished ) {
sb->shared->end_of_stream[cookie] = 1;
@@ -535,6 +535,10 @@ static int run_work(const struct index_args *iargs, Stream *st,
profile_end("process-image");
}
+ if ( sb->asapo_params != NULL ) {
+ im_asapo_finalise(asapostuff, asapo_message_id);
+ }
+
/* NB pargs.zmq_data, pargs.asapo_data and pargs.asapo_meta
* will be copied into the image structure, so
* that it can be queried for "header" values etc. They will