diff options
author | Thomas White <taw@physics.org> | 2015-07-13 14:01:45 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2015-07-13 16:00:16 +0200 |
commit | 65d5d478b4288b26a455e260ee9bc153a1789f29 (patch) | |
tree | 61c602653c4c216bdd54005b872a88dc40df9e86 /src/im-sandbox.c | |
parent | 7591530ff83fc538ac5d92792d01d3b6fe88c781 (diff) |
indexamajig: Avoid forking while multithreaded
Conflicts:
src/im-sandbox.c
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r-- | src/im-sandbox.c | 201 |
1 files changed, 69 insertions, 132 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 747c7d03..1b3dbfcd 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -65,27 +65,6 @@ #include "process_image.h" -/* Write statistics at APPROXIMATELY this interval */ -#define STATS_EVERY_N_SECONDS (5) - -struct sb_reader -{ - pthread_mutex_t lock; - int done; - - /* If a worker process dies unexpectedly (e.g. if it segfaults), then - * the pipe for its output can still stay open for a little while while - * its buffer empties. The number of pipes being read from is therefore - * not necessarily the same as the number of worker processes. */ - int n_read; - FILE **fhs; - int *fds; - - /* Final output */ - Stream *stream; -}; - - struct sandbox { int n_processed_last_stats; @@ -106,7 +85,13 @@ struct sandbox char *tmpdir; - struct sb_reader *reader; + /* Streams to read from */ + int n_read; + FILE **fhs; + int *fds; + + /* Final output */ + Stream *stream; }; @@ -399,129 +384,105 @@ static int pump_chunk(FILE *fh, int ofd) /* Add an fd to the list of pipes to be read from */ -static void add_pipe(struct sb_reader *rd, int fd) +static void add_pipe(struct sandbox *sb, int fd) { int *fds_new; FILE **fhs_new; int slot; - pthread_mutex_lock(&rd->lock); - - fds_new = realloc(rd->fds, (rd->n_read+1)*sizeof(int)); + fds_new = realloc(sb->fds, (sb->n_read+1)*sizeof(int)); if ( fds_new == NULL ) { ERROR("Failed to allocate memory for new pipe.\n"); return; } - fhs_new = realloc(rd->fhs, (rd->n_read+1)*sizeof(FILE *)); + fhs_new = realloc(sb->fhs, (sb->n_read+1)*sizeof(FILE *)); if ( fhs_new == NULL ) { ERROR("Failed to allocate memory for new FH.\n"); return; } - rd->fds = fds_new; - rd->fhs = fhs_new; - slot = rd->n_read; + sb->fds = fds_new; + sb->fhs = fhs_new; + slot = sb->n_read; - rd->fds[slot] = fd; + sb->fds[slot] = fd; - rd->fhs[slot] = fdopen(fd, "r"); - if ( rd->fhs[slot] == NULL ) { + sb->fhs[slot] = fdopen(fd, "r"); + if ( sb->fhs[slot] == NULL ) { ERROR("Couldn't fdopen() stream!\n"); return; } - rd->n_read++; - - pthread_mutex_unlock(&rd->lock); + sb->n_read++; } -/* Assumes that the caller is already holding rd->lock! */ -static void remove_pipe(struct sb_reader *rd, int d) +static void remove_pipe(struct sandbox *sb, int d) { int i; - fclose(rd->fhs[d]); + fclose(sb->fhs[d]); - for ( i=d; i<rd->n_read; i++ ) { - if ( i < rd->n_read-1 ) { - rd->fds[i] = rd->fds[i+1]; - rd->fhs[i] = rd->fhs[i+1]; + for ( i=d; i<sb->n_read; i++ ) { + if ( i < sb->n_read-1 ) { + sb->fds[i] = sb->fds[i+1]; + sb->fhs[i] = sb->fhs[i+1]; } /* else don't bother */ } - rd->n_read--; + sb->n_read--; /* We don't bother shrinking the arrays */ } -static void *run_reader(void *rdv) +static void try_read(struct sandbox *sb) { - struct sb_reader *rd = rdv; - const int ofd = get_stream_fd(rd->stream); - - while ( 1 ) { + int r, i; + struct timeval tv; + fd_set fds; + int fdmax; + const int ofd = get_stream_fd(sb->stream); - int r, i; - struct timeval tv; - fd_set fds; - int fdmax; + tv.tv_sec = 5; + tv.tv_usec = 0; - /* Exit when: - * - No fhs left open to read from - * AND - Main thread says "done" */ - if ( (rd->n_read == 0) && rd->done ) break; + FD_ZERO(&fds); + fdmax = 0; + for ( i=0; i<sb->n_read; i++ ) { - tv.tv_sec = 1; - tv.tv_usec = 0; + int fd; - FD_ZERO(&fds); - fdmax = 0; - pthread_mutex_lock(&rd->lock); - for ( i=0; i<rd->n_read; i++ ) { + fd = sb->fds[i]; - int fd; + FD_SET(fd, &fds); + if ( fd > fdmax ) fdmax = fd; - fd = rd->fds[i]; + } - FD_SET(fd, &fds); - if ( fd > fdmax ) fdmax = fd; + r = select(fdmax+1, &fds, NULL, NULL, &tv); - } - pthread_mutex_unlock(&rd->lock); + if ( r == -1 ) { + if ( errno != EINTR ) { + ERROR("select() failed: %s\n", strerror(errno)); + } /* Otherwise no big deal */ + return; + } - r = select(fdmax+1, &fds, NULL, NULL, &tv); + for ( i=0; i<sb->n_read; i++ ) { - if ( r == -1 ) { - if ( errno != EINTR ) { - ERROR("select() failed: %s\n", strerror(errno)); - } /* Otherwise no big deal */ + if ( !FD_ISSET(sb->fds[i], &fds) ) { continue; } - pthread_mutex_lock(&rd->lock); - for ( i=0; i<rd->n_read; i++ ) { - - if ( !FD_ISSET(rd->fds[i], &fds) ) { - continue; - } - - /* If the chunk cannot be read, assume the connection - * is broken and that the process will die soon. */ - if ( pump_chunk(rd->fhs[i], ofd) ) { - /* remove_pipe() assumes that the caller is - * holding rd->lock ! */ - remove_pipe(rd, i); - } - + /* If the chunk cannot be read, assume the connection + * is broken and that the process will die soon. */ + if ( pump_chunk(sb->fhs[i], ofd) ) { + remove_pipe(sb, i); } - pthread_mutex_unlock(&rd->lock); } - - return NULL; } @@ -586,12 +547,11 @@ static void start_worker_process(struct sandbox *sb, int slot) /* Free resources which will not be needed by worker */ free(sb->pids); - for ( i=0; i<sb->reader->n_read; i++ ) { - fclose(sb->reader->fhs[i]); + for ( i=0; i<sb->n_read; i++ ) { + fclose(sb->fhs[i]); } - free(sb->reader->fhs); - free(sb->reader->fds); - free(sb->reader); + free(sb->fhs); + free(sb->fds); free(sb->tmpdir); free(sb->running); /* Not freed because it's not worth passing them down just for @@ -620,7 +580,7 @@ static void start_worker_process(struct sandbox *sb, int slot) * and the 'read' end of the result pipe. */ sb->pids[slot] = p; sb->running[slot] = 1; - add_pipe(sb->reader, stream_pipe[0]); + add_pipe(sb, stream_pipe[0]); close(stream_pipe[1]); } @@ -752,7 +712,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, int i; struct sigaction sa; int r; - pthread_t reader_thread; struct sandbox *sb; size_t ll; struct stat s; @@ -770,15 +729,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return; } - sb->reader = calloc(1, sizeof(struct sb_reader)); - if ( sb->reader == NULL ) { - ERROR("Couldn't allocate memory for SB reader.\n"); - free(sb); - return; - } - - pthread_mutex_init(&sb->reader->lock, NULL); - sb->n_processed_last_stats = 0; sb->n_hadcrystals_last_stats = 0; sb->n_crystals_last_stats = 0; @@ -787,9 +737,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, sb->iargs = iargs; sb->serial = 1; - sb->reader->fds = NULL; - sb->reader->fhs = NULL; - sb->reader->stream = stream; + sb->fds = NULL; + sb->fhs = NULL; + sb->stream = stream; if ( setup_shm(sb) ) { ERROR("Failed to set up SHM.\n"); @@ -867,19 +817,12 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, start_worker_process(sb, i); } - /* Start reader thread after forking, so that things are definitely - * "running" */ - if ( pthread_create(&reader_thread, NULL, run_reader, - (void *)sb->reader) ) { - ERROR("Failed to create reader thread.\n"); - return; - } - do { int r; double tNow; + try_read(sb); sleep(5); /* Check for dead workers */ @@ -944,25 +887,19 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } for ( i=0; i<n_proc; i++ ) { int status; - waitpid(sb->pids[i], &status, 0); + while ( waitpid(sb->pids[i], &status, WNOHANG) == 0 ) { + try_read(sb); + } } - /* Indicate to the reader thread that we are done */ - pthread_mutex_lock(&sb->reader->lock); - sb->reader->done = 1; - pthread_mutex_unlock(&sb->reader->lock); - - pthread_join(reader_thread, NULL); - - for ( i=0; i<sb->reader->n_read; i++ ) { - fclose(sb->reader->fhs[i]); + for ( i=0; i<sb->n_read; i++ ) { + fclose(sb->fhs[i]); } - free(sb->reader->fhs); - free(sb->reader->fds); + free(sb->fhs); + free(sb->fds); free(sb->running); free(sb->pids); free(sb->tmpdir); - free(sb->reader); STATUS("Final: %i images processed, %i had crystals (%.1f%%)," " %i crystals overall.\n", |