aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@bitwiz.org.uk>2012-07-15 11:46:25 -0400
committerThomas White <taw@bitwiz.org.uk>2012-07-15 11:46:25 -0400
commit14100fed56471e4331f83acc46c2fccd67125911 (patch)
treecca80bfc5b44583dc9608fd6934cd9852db367e9
parent16b97e9eb6e453d518bd081c94b54c373410ac01 (diff)
Add locking
-rw-r--r--src/im-sandbox.c174
1 files changed, 89 insertions, 85 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 85576ba1..02c90cdc 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -75,6 +75,8 @@
struct sandbox
{
+ pthread_mutex_t lock;
+
int n_indexable;
int n_processed;
int n_indexable_last_stats;
@@ -86,6 +88,7 @@ struct sandbox
int n_proc;
pid_t *pids;
FILE *ofh;
+ FILE **fhs;
int *running;
FILE **result_fhs;
@@ -367,8 +370,7 @@ static void run_work(const struct index_args *iargs,
}
- /* close my pipes */
- fclose(fh);
+ STATUS("Got command to exit. Shutting down!\n");
cleanup_indexing(iargs->ipriv);
free(iargs->indm);
@@ -379,6 +381,7 @@ static void run_work(const struct index_args *iargs,
free(iargs->hdf5_peak_path);
free_copy_hdf5_field_list(iargs->copyme);
cell_free(iargs->cell);
+ fclose(fh);
}
@@ -450,22 +453,6 @@ static void *run_reader(void *sbv)
{
struct sandbox *sb = sbv;
int done = 0;
- FILE **fhs;
- int i;
-
- fhs = calloc(sb->n_proc, sizeof(FILE *));
- if ( fhs == NULL ) {
- ERROR("Couldn't allocate memory for file handles!\n");
- return NULL;
- }
-
- for ( i=0; i<sb->n_proc; i++ ) {
- fhs[i] = fdopen(sb->stream_pipe_read[i], "r");
- if ( fhs[i] == NULL ) {
- ERROR("Couldn't fdopen() stream!\n");
- return NULL;
- }
- }
while ( !done ) {
@@ -479,6 +466,7 @@ static void *run_reader(void *sbv)
FD_ZERO(&fds);
fdmax = 0;
+ pthread_mutex_lock(&sb->lock);
for ( i=0; i<sb->n_proc; i++ ) {
int fd;
@@ -492,6 +480,8 @@ static void *run_reader(void *sbv)
}
+ pthread_mutex_unlock(&sb->lock);
+
r = select(fdmax+1, &fds, NULL, NULL, &tv);
if ( r == -1 ) {
@@ -503,13 +493,14 @@ static void *run_reader(void *sbv)
if ( r == 0 ) continue; /* Nothing this time. Try again */
+ pthread_mutex_lock(&sb->lock);
for ( i=0; i<sb->n_proc; i++ ) {
if ( !sb->running[i] ) continue;
if ( !FD_ISSET(sb->stream_pipe_read[i], &fds) ) continue;
- if ( pump_chunk(fhs[i], sb->ofh) ) {
+ if ( pump_chunk(sb->fhs[i], sb->ofh) ) {
sb->running[i] = 0;
}
@@ -519,14 +510,10 @@ static void *run_reader(void *sbv)
for ( i=0; i<sb->n_proc; i++ ) {
if ( sb->running[i] ) done = 0;
}
+ pthread_mutex_unlock(&sb->lock);
}
- for ( i=0; i<sb->n_proc; i++ ) {
- fclose(fhs[i]);
- }
- free(fhs);
-
return NULL;
}
@@ -547,9 +534,11 @@ static void start_worker_process(struct sandbox *sb, int slot)
return;
}
+ pthread_mutex_lock(&sb->lock);
p = fork();
if ( p == -1 ) {
ERROR("fork() failed!\n");
+ pthread_mutex_unlock(&sb->lock);
return;
}
@@ -560,6 +549,9 @@ static void start_worker_process(struct sandbox *sb, int slot)
struct sigaction sa;
int r;
+ /* FIXME: Is lock inherited? */
+ pthread_mutex_unlock(&sb->lock);
+
/* First, disconnect the signal handler */
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
@@ -597,8 +589,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
sfh, slot);
fclose(sfh);
- free(sb->stream_pipe_write);
- close(filename_pipe[0]);
+ //close(filename_pipe[0]);
close(result_pipe[1]);
exit(0);
@@ -612,12 +603,21 @@ static void start_worker_process(struct sandbox *sb, int slot)
close(filename_pipe[0]);
close(result_pipe[1]);
sb->filename_pipes[slot] = filename_pipe[1];
+ sb->fhs[slot] = fdopen(sb->stream_pipe_read[slot], "r");
+ if ( sb->fhs[slot] == NULL ) {
+ ERROR("Couldn't fdopen() stream!\n");
+ pthread_mutex_unlock(&sb->lock);
+ return;
+ }
sb->result_fhs[slot] = fdopen(result_pipe[0], "r");
if ( sb->result_fhs[slot] == NULL ) {
ERROR("fdopen() failed.\n");
+ pthread_mutex_unlock(&sb->lock);
return;
}
+
+ pthread_mutex_unlock(&sb->lock);
}
@@ -625,12 +625,17 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v)
{
int i, found;
+ STATUS("Signal!\n");
+
if ( si->si_signo != SIGCHLD ) {
ERROR("Unhandled signal %i?\n", si->si_signo);
return;
}
found = 0;
+ STATUS("Getting lock...\n"); fflush(stderr);
+ pthread_mutex_lock(&sb->lock);
+ STATUS("Got it.\n"); fflush(stderr);
for ( i=0; i<sb->n_proc; i++ ) {
if ( (sb->running[i]) && (sb->pids[i] == si->si_pid) ) {
found = 1;
@@ -640,22 +645,29 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v)
if ( !found ) {
ERROR("SIGCHLD from unknown child %i?\n", si->si_pid);
+ pthread_mutex_unlock(&sb->lock);
return;
}
if ( (si->si_code == CLD_TRAPPED) || (si->si_code == CLD_STOPPED)
- || (si->si_code == CLD_CONTINUED) ) return;
+ || (si->si_code == CLD_CONTINUED) )
+ {
+ pthread_mutex_unlock(&sb->lock);
+ return;
+ }
if ( si->si_code == CLD_EXITED )
{
sb->running[i] = 0;
STATUS("Worker process %i exited normally.\n", i);
+ pthread_mutex_unlock(&sb->lock);
return;
}
if ( (si->si_code != CLD_DUMPED) && (si->si_code != CLD_KILLED) ) {
ERROR("Unhandled si_code %i (worker process %i).\n",
si->si_code, i);
+ pthread_mutex_unlock(&sb->lock);
return;
}
@@ -663,8 +675,8 @@ static void signal_handler(int sig, siginfo_t *si, void *uc_v)
ERROR(" -> Signal %i, last filename %s.\n",
si->si_signo, sb->last_filename[i]);
- sb->running[i] = 0;
- //start_worker_process(sb, i);
+ pthread_mutex_unlock(&sb->lock);
+ start_worker_process(sb, i);
}
@@ -693,6 +705,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->ofh = ofh;
sb->iargs = iargs;
+ pthread_mutex_init(&sb->lock, NULL);
+
sb->stream_pipe_read = calloc(n_proc, sizeof(int));
sb->stream_pipe_write = calloc(n_proc, sizeof(int));
if ( sb->stream_pipe_read == NULL ) {
@@ -718,25 +732,12 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
- if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) {
- ERROR("Failed to create reader thread.\n");
- return;
- }
-
- /* Set up signal handler to take action if any children die */
- sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP;
- sigemptyset(&sa.sa_mask);
- sa.sa_sigaction = signal_handler;
- r = sigaction(SIGCHLD, &sa, NULL);
- if ( r == -1 ) {
- ERROR("Failed to set signal handler!\n");
- return;
- }
-
+ pthread_mutex_lock(&sb->lock);
sb->filename_pipes = calloc(n_proc, sizeof(int));
sb->result_fhs = calloc(n_proc, sizeof(FILE *));
sb->pids = calloc(n_proc, sizeof(pid_t));
sb->running = calloc(n_proc, sizeof(int));
+ sb->fhs = calloc(sb->n_proc, sizeof(FILE *));
if ( sb->filename_pipes == NULL ) {
ERROR("Couldn't allocate memory for pipes.\n");
return;
@@ -759,43 +760,30 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
ERROR("Couldn't allocate memory for last filename list.\n");
return;
}
-
- /* Fork the right number of times */
- for ( i=0; i<n_proc; i++ ) {
- start_worker_process(sb, i);
+ if ( sb->fhs == NULL ) {
+ ERROR("Couldn't allocate memory for file handles!\n");
+ return;
}
+ pthread_mutex_unlock(&sb->lock);
- /* Send first image to all children */
- for ( i=0; i<n_proc; i++ ) {
-
- char *nextImage;
-
- nextImage = get_pattern(fh, &use_this_one_instead,
- config_basename, prefix);
-
- if ( nextImage != NULL ) {
-
- free(sb->last_filename[i]);
- sb->last_filename[i] = strdup(nextImage);
-
- write(sb->filename_pipes[i], nextImage,
- strlen(nextImage));
- write(sb->filename_pipes[i], "\n", 1);
-
- free(nextImage);
-
- } else {
-
- int r;
-
- /* No more files to process.. already? */
- r = write(sb->filename_pipes[i], "\n", 1);
- if ( r < 0 ) {
- ERROR("Write pipe\n");
- }
+ if ( pthread_create(&reader_thread, NULL, run_reader, (void *)sb) ) {
+ ERROR("Failed to create reader thread.\n");
+ return;
+ }
- }
+ /* Set up signal handler to take action if any children die */
+ sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_sigaction = signal_handler;
+ r = sigaction(SIGCHLD, &sa, NULL);
+ if ( r == -1 ) {
+ ERROR("Failed to set signal handler!\n");
+ return;
+ }
+ /* Fork the right number of times */
+ for ( i=0; i<n_proc; i++ ) {
+ start_worker_process(sb, i);
}
allDone = 0;
@@ -812,11 +800,15 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
FD_ZERO(&fds);
fdmax = 0;
+ pthread_mutex_lock(&sb->lock);
for ( i=0; i<n_proc; i++ ) {
int fd;
- if ( !sb->running[i] ) continue;
+ if ( !sb->running[i] ) {
+ pthread_mutex_unlock(&sb->lock);
+ continue;
+ }
fd = fileno(sb->result_fhs[i]);
FD_SET(fd, &fds);
@@ -824,8 +816,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
}
+ pthread_mutex_unlock(&sb->lock);
r = select(fdmax+1, &fds, NULL, NULL, &tv);
-
if ( r == -1 ) {
if ( errno != EINTR ) {
ERROR("select() failed: %s\n", strerror(errno));
@@ -835,6 +827,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
if ( r == 0 ) continue; /* No progress this time. Try again */
+ pthread_mutex_lock(&sb->lock);
for ( i=0; i<n_proc; i++ ) {
char *nextImage;
@@ -844,16 +837,18 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int n;
char *eptr;
- if ( !sb->running[i] ) continue;
+ if ( !sb->running[i] ) {
+ continue;
+ }
fd = fileno(sb->result_fhs[i]);
- if ( !FD_ISSET(fd, &fds) ) continue;
+ if ( !FD_ISSET(fd, &fds) ) {
+ continue;
+ }
rval = fgets(results, 1024, sb->result_fhs[i]);
if ( rval == NULL ) {
- if ( feof(sb->result_fhs[i]) ) {
- ERROR("EOF from process %i.\n", i);
- } else {
+ if ( !feof(sb->result_fhs[i]) ) {
ERROR("fgets() failed: %s\n",
strerror(errno));
}
@@ -883,7 +878,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
ERROR("Write pipe\n");
}
} else {
-
r = write(sb->filename_pipes[i], nextImage,
strlen(nextImage));
r -= write(sb->filename_pipes[i], "\n", 1);
@@ -916,10 +910,16 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
if ( sb->running[i] ) allDone = 0;
}
+ pthread_mutex_unlock(&sb->lock);
+
}
+ STATUS("Done. Waiting..\n");
+
fclose(fh);
+ pthread_mutex_destroy(&sb->lock);
+
for ( i=0; i<n_proc; i++ ) {
int status;
waitpid(sb->pids[i], &status, 0);
@@ -930,6 +930,10 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
fclose(sb->result_fhs[i]);
}
+ for ( i=0; i<sb->n_proc; i++ ) {
+ fclose(sb->fhs[i]);
+ }
+ free(sb->fhs);
free(sb->filename_pipes);
free(sb->result_fhs);
free(sb->pids);