diff options
author | Thomas White <taw@physics.org> | 2013-02-22 10:15:23 +0100 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2013-02-22 10:15:23 +0100 |
commit | b821aae2dd29cbdec5ccd33a3929a61ee047f0e6 (patch) | |
tree | 73a12f565f8f6e94100c5f4d16fafd7d42fc5eae | |
parent | 78ef5671c7533799834b6001c0a08f73f16195f9 (diff) |
More robust stream marshalling
-rw-r--r-- | src/im-sandbox.c | 98 |
1 files changed, 39 insertions, 59 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 0afe9ff3..37ccc35c 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -105,7 +105,6 @@ struct sandbox FILE **fhs; int *running; - int *waiting; FILE **result_fhs; int *filename_pipes; int *stream_pipe_read; @@ -426,8 +425,6 @@ static void run_work(const struct index_args *iargs, } - write_line(st, "DONE"); - cleanup_indexing(iargs->indm, iargs->ipriv); free(iargs->indm); free(iargs->ipriv); @@ -468,7 +465,6 @@ static time_t get_monotonic_seconds() static int pump_chunk(FILE *fh, FILE *ofh) { int chunk_started = 0; - int chunk_finished = 0; do { @@ -479,40 +475,29 @@ static int pump_chunk(FILE *fh, FILE *ofh) if ( rval == NULL ) { if ( feof(fh) ) { - /* Process died */ + /* Whoops, connection lost */ if ( chunk_started ) { ERROR("EOF during chunk!\n"); fprintf(ofh, "Chunk is unfinished!\n"); - } + fprintf(ofh, CHUNK_END_MARKER"\n"); + } /* else normal end of output */ return 1; } else { ERROR("fgets() failed: %s\n", strerror(errno)); } - chunk_finished = 1; - continue; + break; } - if ( strcmp(line, "FLUSH\n") == 0 ) { - chunk_finished = 1; - continue; - } - - if ( strcmp(line, "DONE\n") == 0 ) { - return 1; - } + if ( strcmp(line, "FLUSH\n") == 0 ) break; fprintf(ofh, "%s", line); - if ( strcmp(line, CHUNK_END_MARKER"\n") == 0 ) { - chunk_finished = 1; - } - if ( strcmp(line, CHUNK_START_MARKER"\n") == 0 ) { - chunk_started = 1; - } + if ( strcmp(line, CHUNK_END_MARKER"\n") == 0 ) break; + if ( strcmp(line, CHUNK_START_MARKER"\n") == 0 ) break; - } while ( !chunk_finished ); + } while ( 1 ); return 0; } @@ -529,7 +514,7 @@ static void *run_reader(void *sbv) fd_set fds; int fdmax; - tv.tv_sec = 5; + tv.tv_sec = 1; tv.tv_usec = 0; FD_ZERO(&fds); @@ -539,7 +524,9 @@ static void *run_reader(void *sbv) int fd; - if ( !sb->running[i] ) continue; + /* Listen for output from all processes which have a + * connection, even if they're not "running". */ + if ( sb->fhs[i] == NULL ) continue; fd = sb->stream_pipe_read[i]; @@ -559,28 +546,24 @@ static void *run_reader(void *sbv) continue; } - if ( r == 0 ) continue; /* Nothing this time. Try again */ - lock_sandbox(sb); for ( i=0; i<sb->n_proc; i++ ) { - if ( !sb->running[i] ) continue; - if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) { continue; } + /* If the chunk cannot be read, assume the connection + * is broken and that the process will die soon. */ if ( pump_chunk(sb->fhs[i], sb->ofh) ) { - sb->running[i] = 0; - sb->waiting[i] = 0; + sb->fhs[i] = NULL; } } done = 1; - for ( i=0; i<sb->n_proc; i++ ) { - if ( sb->running[i] ) done = 0; - } + if ( sb->running != NULL ) done = 0; + unlock_sandbox(sb); } @@ -733,7 +716,6 @@ static void handle_zombie(struct sandbox *sb) int status, p; if ( !sb->running[i] ) continue; - if ( sb->waiting[i] ) continue; p = waitpid(sb->pids[i], &status, WNOHANG); @@ -745,7 +727,6 @@ static void handle_zombie(struct sandbox *sb) if ( p == sb->pids[i] ) { sb->running[i] = 0; - sb->waiting[i] = 1; if ( WIFEXITED(status) ) { continue; @@ -827,7 +808,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->result_fhs = calloc(n_proc, sizeof(FILE *)); sb->pids = calloc(n_proc, sizeof(pid_t)); sb->running = calloc(n_proc, sizeof(int)); - sb->waiting = calloc(n_proc, sizeof(int)); sb->fhs = calloc(sb->n_proc, sizeof(FILE *)); if ( sb->filename_pipes == NULL ) { ERROR("Couldn't allocate memory for pipes.\n"); @@ -845,10 +825,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("Couldn't allocate memory for process flags.\n"); return; } - if ( sb->waiting == NULL ) { - ERROR("Couldn't allocate memory for process flags.\n"); - return; - } sb->last_filename = calloc(n_proc, sizeof(char *)); if ( sb->last_filename == NULL ) { @@ -861,11 +837,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } unlock_sandbox(sb); - if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { - ERROR("Failed to create reader thread.\n"); - return; - } - if ( pipe(signal_pipe) == -1 ) { ERROR("Failed to create signal pipe.\n"); return; @@ -890,6 +861,13 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } unlock_sandbox(sb); + /* Start reader thread after forking, so that things are definitely + * "running" */ + if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) { + ERROR("Failed to create reader thread.\n"); + return; + } + allDone = 0; while ( !allDone ) { @@ -909,9 +887,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int fd; - if ( !sb->running[i] ) { - continue; - } + if ( sb->result_fhs[i] == NULL) continue; fd = fileno(sb->result_fhs[i]); FD_SET(fd, &fds); @@ -949,14 +925,10 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int fd; char *eptr; - if ( !sb->running[i] ) { - continue; - } + if ( sb->result_fhs[i] == NULL ) continue; fd = fileno(sb->result_fhs[i]); - if ( !FD_ISSET(fd, &fds) ) { - continue; - } + if ( !FD_ISSET(fd, &fds) ) continue; rval = fgets(results, 1024, sb->result_fhs[i]); if ( rval == NULL ) { @@ -964,6 +936,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, ERROR("fgets() failed: %s\n", strerror(errno)); } + sb->result_fhs[i] = NULL; continue; } @@ -1042,7 +1015,13 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, fclose(fh); - pthread_mutex_destroy(&sb->lock); + /* Indicate to the reader thread that we are done */ + lock_sandbox(sb); + free(sb->running); + sb->running = NULL; + unlock_sandbox(sb); + + pthread_join(reader_thread, NULL); for ( i=0; i<n_proc; i++ ) { int status; @@ -1051,7 +1030,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, for ( i=0; i<n_proc; i++ ) { close(sb->filename_pipes[i]); - fclose(sb->result_fhs[i]); + if ( sb->result_fhs[i] != NULL ) fclose(sb->result_fhs[i]); } for ( i=0; i<sb->n_proc; i++ ) { @@ -1061,11 +1040,12 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, free(sb->filename_pipes); free(sb->result_fhs); free(sb->pids); - free(sb->running); - free(sb->waiting); + + pthread_mutex_destroy(&sb->lock); STATUS("Final:" " %i images processed, %i had crystals, %i crystals overall.\n", sb->n_processed, sb->n_hadcrystals, sb->n_crystals); + free(sb); } |