diff options
author | Thomas White <taw@physics.org> | 2010-07-05 18:39:07 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2012-02-22 15:26:52 +0100 |
commit | 8cb69e84552e0c2f144e145881a78f0db674201f (patch) | |
tree | dcda1ef1ff4cf54d4106e40bf97d71b25a9b7ba1 | |
parent | 6aee185fbdaa5c5bb1216066e53ed343f468b4ae (diff) |
indexamajig: Rework threading not to use pthread_timedjoin_np()
-rw-r--r-- | src/indexamajig.c | 140 |
1 files changed, 92 insertions, 48 deletions
diff --git a/src/indexamajig.c b/src/indexamajig.c index 87dfb6dc..548d60dc 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -42,6 +42,7 @@ struct process_args { + /* Input */ char *filename; int id; pthread_mutex_t *output_mutex; /* Protects stdout */ @@ -66,8 +67,17 @@ struct process_args const double *intensities; const unsigned int *counts; struct gpu_context *gctx; + + /* Thread control and output */ + pthread_mutex_t control_mutex; /* Protects the scary stuff below */ + int start; + int finish; + int done; + int hit; + int peaks_sane; }; + struct process_result { int hit; @@ -237,13 +247,13 @@ static void simulate_and_write(struct image *simage, struct gpu_context **gctx, } -static void *process_image(void *pargsv) +static struct process_result process_image(struct process_args *pargs) { - struct process_args *pargs = pargsv; struct hdfile *hdfile; struct image image; struct image *simage; float *data_for_measurement; + struct process_result result; size_t data_size; const char *filename = pargs->filename; UnitCell *cell = pargs->cell; @@ -262,7 +272,6 @@ static void *process_image(void *pargsv) const double *intensities = pargs->intensities; const unsigned int *counts = pargs->counts; struct gpu_context *gctx = pargs->gctx; - struct process_result *result; image.features = NULL; image.data = NULL; @@ -281,10 +290,8 @@ static void *process_image(void *pargsv) STATUS("Processing '%s'\n", image.filename); - result = malloc(sizeof(*result)); - if ( result == NULL ) return NULL; - result->peaks_sane = 0; - result->hit = 0; + result.peaks_sane = 0; + result.hit = 0; hdfile = hdfile_open(filename); if ( hdfile == NULL ) { @@ -344,7 +351,7 @@ static void *process_image(void *pargsv) STATUS("Failed peak sanity check.\n"); goto done; } else { - result->peaks_sane = 1; + result.peaks_sane = 1; } /* Measure intensities if requested */ @@ -385,14 +392,51 @@ done: hdfile_close(hdfile); if ( image.indexed_cell == NULL ) { - result->hit = 0; + result.hit = 0; } else { - result->hit = 1; + result.hit = 1; } return result; } +static void *worker_thread(void *pargsv) +{ + struct process_args *pargs = pargsv; + int finish; + + do { + + struct process_result result; + int wakeup; + + result = process_image(pargs); + + pthread_mutex_lock(&pargs->control_mutex); + pargs->hit = result.hit; + pargs->peaks_sane = result.peaks_sane; + pargs->done = 1; + pargs->start = 0; + pthread_mutex_unlock(&pargs->control_mutex); + + /* Go to sleep until told to exit or process next image */ + do { + + pthread_mutex_lock(&pargs->control_mutex); + /* Either of these can result in the thread waking up */ + wakeup = pargs->start || pargs->finish; + finish = pargs->finish; + pthread_mutex_unlock(&pargs->control_mutex); + usleep(20000); + + } while ( !wakeup ); + + } while ( !pargs->finish ); + + return NULL; +} + + int main(int argc, char *argv[]) { int c; @@ -590,7 +634,7 @@ int main(int argc, char *argv[]) worker_active[i] = 0; } - /* Initially, fire off the full number of threads */ + /* Start threads off */ for ( i=0; i<nthreads; i++ ) { char line[1024]; @@ -608,6 +652,7 @@ int main(int argc, char *argv[]) pargs->output_mutex = &output_mutex; pargs->gpu_mutex = &gpu_mutex; + pthread_mutex_init(&pargs->control_mutex, NULL); pargs->config_cmfilter = config_cmfilter; pargs->config_noisefilter = config_noisefilter; pargs->config_writedrx = config_writedrx; @@ -629,9 +674,14 @@ int main(int argc, char *argv[]) pargs->counts = counts; pargs->gctx = gctx; pargs->id = i; + pthread_mutex_lock(&pargs->control_mutex); + pargs->done = 0; + pargs->start = 1; + pargs->finish = 0; + pthread_mutex_unlock(&pargs->control_mutex); worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, process_image, pargs); + r = pthread_create(&workers[i], NULL, worker_thread, pargs); if ( r != 0 ) { worker_active[i] = 0; ERROR("Couldn't start thread %i\n", i); @@ -639,7 +689,7 @@ int main(int argc, char *argv[]) } - /* Start new threads as old ones finish */ + /* Keep threads busy until the end of the data */ do { int i; @@ -647,65 +697,59 @@ int main(int argc, char *argv[]) for ( i=0; i<nthreads; i++ ) { char line[1024]; - int r; - struct process_result *result = NULL; - struct timespec t; - struct timeval tv; struct process_args *pargs; + int done; + + /* Spend CPU time indexing, not checking results */ + usleep(100000); + /* Are we using this thread record at all? */ if ( !worker_active[i] ) continue; + /* Has the thread finished yet? */ pargs = worker_args[i]; + pthread_mutex_lock(&pargs->control_mutex); + done = pargs->done; + pthread_mutex_unlock(&pargs->control_mutex); + if ( !done ) continue; - gettimeofday(&tv, NULL); - t.tv_sec = tv.tv_sec; - t.tv_nsec = tv.tv_usec * 1000 + 20000; - - r = pthread_timedjoin_np(workers[i], (void *)&result, - &t); - if ( r != 0 ) continue; /* Not ready yet */ - - worker_active[i] = 0; - - if ( result != NULL ) { - n_hits += result->hit; - n_sane += result->peaks_sane; - free(result); - } + /* Record the result */ + n_hits += pargs->hit; + n_sane += pargs->peaks_sane; + /* Get next filename */ rval = fgets(line, 1023, fh); if ( rval == NULL ) break; chomp(line); snprintf(pargs->filename, 1023, "%s%s", prefix, line); - worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, process_image, - pargs); - if ( r != 0 ) { - worker_active[i] = 0; - ERROR("Couldn't start thread %i\n", i); - } - n_images++; + + /* Wake the thread up ... */ + pthread_mutex_lock(&pargs->control_mutex); + pargs->done = 0; + pargs->start = 1; + pthread_mutex_unlock(&pargs->control_mutex); + } } while ( rval != NULL ); - /* Catch all remaining threads */ + /* Join threads */ for ( i=0; i<nthreads; i++ ) { - struct process_result *result = NULL; - if ( !worker_active[i] ) goto free; - pthread_join(workers[i], (void *)&result); + /* Tell the thread to exit */ + struct process_args *pargs = worker_args[i]; + pargs->finish = 1; + /* Wait for it to join */ + pthread_join(workers[i], NULL); worker_active[i] = 0; - if ( result != NULL ) { - n_hits += result->hit; - free(result); - } + n_hits += pargs->hit; + n_sane += pargs->peaks_sane; free: if ( worker_args[i]->filename != NULL ) { |