aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2021-09-15 17:15:52 +0200
committerThomas White <taw@physics.org>2022-06-02 12:15:38 +0200
commit36ac02772c42c41b454cc0da051cebcddcd433e2 (patch)
treead1a5e82b30f26badd03ecf94cd4d5e5e67ededa
parent65b0ee1d76dc47a9d890f94a309b1947260f74ef (diff)
ASAP::O: Stream switching
-rw-r--r--src/im-asapo.c122
1 files changed, 118 insertions, 4 deletions
diff --git a/src/im-asapo.c b/src/im-asapo.c
index 5f4f2b65..0fc0a2b2 100644
--- a/src/im-asapo.c
+++ b/src/im-asapo.c
@@ -47,6 +47,7 @@
struct im_asapo
{
+ char *stream;
AsapoConsumerHandle consumer;
AsapoStringHandle group_id;
};
@@ -114,23 +115,130 @@ struct im_asapo *im_asapo_connect(const char *endpoint,
asapo_consumer_set_timeout(a->consumer, 1000);
a->group_id = asapo_string_from_c_str(group_id);
+ a->stream = NULL;
return a;
}
+static int select_last_stream(struct im_asapo *a)
+{
+ AsapoStreamInfosHandle si;
+ size_t len;
+ int i;
+ AsapoStreamInfoHandle st;
+ AsapoErrorHandle err = asapo_new_handle();
+
+ si = asapo_consumer_get_stream_list(a->consumer, NULL,
+ kAllStreams, &err);
+
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't get ASAP::O stream list", err);
+ asapo_free_handle(err);
+ return 1;
+ }
+
+ STATUS("for info: stream list:\n");
+ n = asapo_stream_infos_get_size(si);
+ for ( i=0; i<n; i++ ) {
+ AsapoStreamInfoHandle st;
+ st = asapo_stream_infos_get_item(si, i);
+ STATUS("Stream %i: %s\n", i, asapo_stream_info_get_name(st));
+ asapo_free_handle(st);
+ }
+ STATUS("end of stream list\n");
+
+ st = asapo_stream_infos_get_item(si, n-1);
+ a->stream = strdup(asapo_stream_info_get_name(st));
+ asapo_free_handle(st);
+
+ asapo_free_handle(si);
+ asapo_free_handle(err);
+ return 0;
+}
+
+
+static int select_next_stream(struct im_asapo *a)
+{
+ AsapoStreamInfosHandle si;
+ size_t len;
+ int i;
+ AsapoErrorHandle err = asapo_new_handle();
+
+ si = asapo_consumer_get_stream_list(a->consumer, NULL,
+ kAllStreams, &err);
+
+ if ( asapo_is_error(err) ) {
+ show_asapo_error("Couldn't get ASAP::O stream list", err);
+ asapo_free_handle(err);
+ return 1;
+ }
+
+ n = asapo_stream_infos_get_size(si);
+ for ( i=n-1; i>0; i-- ) {
+
+ AsapoStreamInfoHandle st;
+ const char *name;
+
+ st = asapo_stream_infos_get_item(si, i);
+ name = asapo_stream_info_get_name(st);
+
+ if ( strcmp(name, a->stream) == 0 ) {
+ free(a->stream);
+ a->stream = strdup(asapo_stream_info_get_next_stream(st));
+ asapo_free_handle(st);
+ break;
+ }
+
+ asapo_free_handle(st);
+ }
+ asapo_free_handle(si);
+ asapo_free_handle(err);
+
+ return 0;
+}
+
+
+static void skip_to_stream_end(struct im_asapo *a)
+{
+ /* FIXME: Implementation */
+}
+
+
void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size)
{
void *data_copy;
- AsapoMessageMetaHandle meta = asapo_new_handle();
- AsapoMessageDataHandle data = asapo_new_handle();
+ AsapoMessageMetaHandle meta;
+ AsapoMessageDataHandle data;
AsapoErrorHandle err = asapo_new_handle();
uint64_t msg_size;
+ if ( a->stream == NULL ) {
+ if ( select_last_stream(a) ) {
+ asapo_free_handle(&err);
+ return NULL;
+ }
+ skip_to_stream_end(a);
+ }
+
+ meta = asapo_new_handle();
+ data = asapo_new_handle();
+
asapo_consumer_get_next(a->consumer, a->group_id, &meta, &data,
- "default", &err);
+ a->stream, &err);
+ if ( asapo_error_get_type(err) == kEndOfStream ) {
+ select_next_stream(a);
+ asapo_free_handle(&err);
+ asapo_free_handle(&meta);
+ asapo_free_handle(&data);
+ return NULL; /* Please call back later! */
+ }
+
if ( asapo_is_error(err) ) {
show_asapo_error("Couldn't get next ASAP::O record", err);
+ asapo_free_handle(&err);
+ asapo_free_handle(&meta);
+ asapo_free_handle(&data);
return NULL;
}
@@ -141,7 +249,13 @@ void *im_asapo_fetch(struct im_asapo *a, size_t *pdata_size)
STATUS("ASAP::O size: %lli\n", (long long int)msg_size);
data_copy = malloc(msg_size);
- if ( data_copy == NULL ) return NULL;
+ if ( data_copy == NULL ) {
+ ERROR("Failed to copy data block.\n");
+ asapo_free_handle(&err);
+ asapo_free_handle(&meta);
+ asapo_free_handle(&data);
+ return NULL;
+ }
memcpy(data_copy, asapo_message_data_get_as_chars(data), msg_size);
asapo_free_handle(&err);