aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2022-11-07 10:30:16 +0100
committerThomas White <taw@physics.org>2022-11-08 13:42:27 +0100
commitdf3224acef436c70fd95ac1f990f4dd2a68c3206 (patch)
tree55b35988ffb2cb78d531ec569dba95d9112d789d
parent6f55de63b6aa64481db0e294844f39a1ca7fffee (diff)
indexamajig: Exit only once all workers received kEndOfStream
Previously, the whole program would exit if any worker saw kEndOfStream. Unfortunately, this happens quite often, due to data starvation (too many workers for the data rate) or just general slowness. Therefore we need a more robust criterion.
-rw-r--r--src/im-sandbox.c30
-rw-r--r--src/im-sandbox.h1
2 files changed, 27 insertions, 4 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 8231b041..0602da26 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -488,10 +488,10 @@ static int run_work(const struct index_args *iargs, Stream *st,
free(pargs.event);
pargs.filename = filename;
pargs.event = event;
+ sb->shared->end_of_stream[cookie] = 0;
} else {
if ( finished ) {
- sb->shared->should_shutdown = 1;
- allDone = 1;
+ sb->shared->end_of_stream[cookie] = 1;
}
}
@@ -688,10 +688,13 @@ static void start_worker_process(struct sandbox *sb, int slot)
return;
}
+ pthread_mutex_lock(&sb->shared->queue_lock);
sb->shared->pings[slot] = 0;
+ sb->shared->end_of_stream[slot] = 0;
sb->last_ping[slot] = 0;
sb->shared->time_last_start[slot] = get_monotonic_seconds();
sb->shared->warned_long_running[slot] = 0;
+ pthread_mutex_unlock(&sb->shared->queue_lock);
p = fork();
if ( p == -1 ) {
@@ -1118,6 +1121,17 @@ char *create_tempdir(const char *temp_location)
}
+/* Call under queue_lock */
+static int all_got_end_of_stream(struct sandbox *sb)
+{
+ int i;
+ for ( i=0; i<sb->n_proc; i++ ) {
+ if ( !sb->shared->end_of_stream[i] ) return 0;
+ }
+ return 1;
+}
+
+
/* Returns the number of frames processed (not necessarily indexed).
* If the return value is zero, something is probably wrong. */
int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
@@ -1284,16 +1298,24 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
/* Update progress */
try_status(sb, 0);
- /* Have all the events been swallowed? */
+ /* Begin exit criterion checking */
pthread_mutex_lock(&sb->shared->queue_lock);
+
+ /* Case 1: Queue empty and no more coming? */
if ( sb->shared->no_more && (sb->shared->n_events == 0) ) allDone = 1;
+
+ /* Case 2: Worker process requested immediate shutdown */
if ( sb->shared->should_shutdown ) {
- /* Worker process requested immediate shutdown */
allDone = 1;
sb->shared->n_events = 0;
sb->shared->no_more = 1;
}
+
+ /* Case 3: All workers saw end of (ASAP::O) stream */
+ if ( all_got_end_of_stream(sb) ) allDone = 1;
+
pthread_mutex_unlock(&sb->shared->queue_lock);
+ /* End exit criterion checking */
} while ( !allDone );
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index 52094a3d..a6adddd5 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -69,6 +69,7 @@ struct sb_shm
char last_ev[MAX_NUM_WORKERS][MAX_EV_LEN];
char last_task[MAX_NUM_WORKERS][MAX_TASK_LEN];
int pings[MAX_NUM_WORKERS];
+ int end_of_stream[MAX_NUM_WORKERS];
time_t time_last_start[MAX_NUM_WORKERS];
int warned_long_running[MAX_NUM_WORKERS];