aboutsummaryrefslogtreecommitdiff
path: root/src/im-asapo.c
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2022-05-04 12:14:27 +0200
committerThomas White <taw@physics.org>2022-06-02 12:15:38 +0200
commitffd98b770d6dfa7c1bef4b2ae54e0b637f2e7ac3 (patch)
treec4a21baa21da4617b7da671604c51ef2084d7943 /src/im-asapo.c
parent8e601d452a8b3d022b89e904c0cecee812f2b636 (diff)
indexamajig: Add --asapo-stream
Diffstat (limited to 'src/im-asapo.c')
-rw-r--r--src/im-asapo.c47
1 files changed, 40 insertions, 7 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 2e5ed504..95445fd0 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -49,6 +49,7 @@
struct im_asapo
{
char *stream;
+ int online_mode;
AsapoConsumerHandle consumer;
AsapoStringHandle group_id;
};
@@ -100,7 +101,8 @@ struct im_asapo *im_asapo_connect(const char *endpoint,
const char *token,
const char *beamtime,
const char *group_id,
- const char *data_source)
+ const char *data_source,
+ const char *stream)
{
struct im_asapo *a;
AsapoSourceCredentialsHandle cred;
@@ -127,7 +129,30 @@ struct im_asapo *im_asapo_connect(const char *endpoint,
asapo_consumer_set_timeout(a->consumer, 3000);
a->group_id = asapo_string_from_c_str(group_id);
- a->stream = NULL;
+ if ( stream != NULL ) {
+
+ /* Named stream mode */
+ AsapoErrorHandle err = asapo_new_handle();
+
+ a->stream = strdup(stream);
+
+ asapo_consumer_set_last_read_marker(a->consumer,
+ a->group_id, 0,
+ a->stream, &err);
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Failed to skip to start of stream", err);
+ } else {
+ STATUS("Skipped to start of stream %s\n", a->stream);
+ }
+
+ asapo_free_handle(&err);
+ a->online_mode = 0;
+
+ } else {
+ /* Online mode */
+ a->stream = NULL;
+ a->online_mode = 1;
+ }
return a;
}
@@ -244,7 +269,8 @@ static void skip_to_stream_end(struct im_asapo *a)
void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
- char **pmeta, char **pfilename, char **pevent)
+ char **pmeta, char **pfilename, char **pevent,
+ int *pfinished)
{
void *data_copy;
AsapoMessageMetaHandle meta;
@@ -252,6 +278,8 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
AsapoErrorHandle err;
uint64_t msg_size;
+ *pfinished = 0;
+
profile_start("select-stream");
if ( a->stream == NULL ) {
if ( select_last_stream(a) ) {
@@ -273,13 +301,18 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size,
a->stream, &err);
profile_end("asapo-get-next");
if ( asapo_error_get_type(err) == kEndOfStream ) {
- profile_start("next-stream");
- select_next_stream(a);
- profile_end("next-stream");
asapo_free_handle(&err);
asapo_free_handle(&meta);
asapo_free_handle(&data);
- return NULL; /* Sandbox will call try again very soon */
+ if ( a->online_mode ) {
+ profile_start("next-stream");
+ select_next_stream(a);
+ profile_end("next-stream");
+ /* Sandbox will call to try again very soon */
+ } else {
+ *pfinished = 1;
+ }
+ return NULL;
}
if ( asapo_is_error(err) ) {