diff options
author | Thomas White <taw@physics.org> | 2010-07-06 18:31:20 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2012-02-22 15:26:52 +0100 |
commit | ae5c7478b4e5e0fe0ef421c3be7cd7f452913739 (patch) | |
tree | a9509b257b13270f2b88d2488f1eee07bc2e6ce0 /src/calibrate_detector.c | |
parent | 68b8e4272dc7cb808323e7b37bd02170403332bd (diff) |
calibrate_detector: Rework multithreading not to use pthread_timedjoin_np()
Diffstat (limited to 'src/calibrate_detector.c')
-rw-r--r-- | src/calibrate_detector.c | 108 |
1 files changed, 79 insertions, 29 deletions
diff --git a/src/calibrate_detector.c b/src/calibrate_detector.c index 70b70c71..68c9782b 100644 --- a/src/calibrate_detector.c +++ b/src/calibrate_detector.c @@ -51,6 +51,12 @@ struct process_args int h; SumMethod sum_method; double threshold; + + /* Thread control */ + pthread_mutex_t control_mutex; /* Protects the scary stuff below */ + int start; + int finish; + int done; }; @@ -142,9 +148,8 @@ static void sum_threshold(struct image *image, double *sum, double threshold) } -static void *process_image(void *pargsv) +static void process_image(struct process_args *pargs) { - struct process_args *pargs = pargsv; struct hdfile *hdfile; struct image image; @@ -168,11 +173,11 @@ static void *process_image(void *pargsv) hdfile = hdfile_open(pargs->filename); if ( hdfile == NULL ) { - return NULL; + return; } else if ( hdfile_set_first_image(hdfile, "/") ) { ERROR("Couldn't select path\n"); hdfile_close(hdfile); - return NULL; + return; } hdf5_read(hdfile, &image, 1); @@ -206,6 +211,38 @@ out: free(image.data); if ( image.flags != NULL ) free(image.flags); hdfile_close(hdfile); +} + + +static void *worker_thread(void *pargsv) +{ + struct process_args *pargs = pargsv; + int finish; + + do { + + int wakeup; + + process_image(pargs); + + pthread_mutex_lock(&pargs->control_mutex); + pargs->done = 1; + pargs->start = 0; + pthread_mutex_unlock(&pargs->control_mutex); + + /* Go to sleep until told to exit or process next image */ + do { + + pthread_mutex_lock(&pargs->control_mutex); + /* Either of these can result in the thread waking up */ + wakeup = pargs->start || pargs->finish; + finish = pargs->finish; + pthread_mutex_unlock(&pargs->control_mutex); + usleep(20000); + + } while ( !wakeup ); + + } while ( !pargs->finish ); return NULL; } @@ -383,7 +420,7 @@ int main(int argc, char *argv[]) n_images = 0; - /* Initially, fire off the full number of threads */ + /* Start threads off */ for ( i=0; i<nthreads; i++ ) { char line[1024]; @@ -399,11 +436,18 @@ int main(int argc, char *argv[]) n_images++; + pthread_mutex_init(&pargs->control_mutex, NULL); pargs->config_cmfilter = config_cmfilter; pargs->config_noisefilter = config_noisefilter; + pthread_mutex_lock(&pargs->control_mutex); + pargs->done = 0; + pargs->start = 1; + pargs->finish = 0; + pthread_mutex_unlock(&pargs->control_mutex); + worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, process_image, pargs); + r = pthread_create(&workers[i], NULL, worker_thread, pargs); if ( r != 0 ) { worker_active[i] = 0; ERROR("Couldn't start thread %i\n", i); @@ -411,7 +455,7 @@ int main(int argc, char *argv[]) } - /* Start new threads as old ones finish */ + /* Keep threads busy until the end of the data */ do { int i; @@ -419,39 +463,38 @@ int main(int argc, char *argv[]) for ( i=0; i<nthreads; i++ ) { char line[1024]; - int r; - struct timespec t; - struct timeval tv; struct process_args *pargs; + int done; + + /* Spend time working, not managing threads */ + usleep(100000); + /* Are we using this thread record at all? */ if ( !worker_active[i] ) continue; + /* Has the thread finished yet? */ pargs = worker_args[i]; + pthread_mutex_lock(&pargs->control_mutex); + done = pargs->done; + pthread_mutex_unlock(&pargs->control_mutex); + if ( !done ) continue; - gettimeofday(&tv, NULL); - t.tv_sec = tv.tv_sec; - t.tv_nsec = tv.tv_usec * 1000 + 20000; - - r = pthread_timedjoin_np(workers[i], NULL, &t); - if ( r != 0 ) continue; /* Not ready yet */ - - worker_active[i] = 0; - + /* Get the next filename */ rval = fgets(line, 1023, fh); if ( rval == NULL ) break; chomp(line); snprintf(pargs->filename, 1023, "%s%s", prefix, line); - worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, process_image, pargs); - if ( r != 0 ) { - worker_active[i] = 0; - ERROR("Couldn't start thread %i\n", i); - } - n_images++; + STATUS("Done %i images\n", n_images); + /* Wake the thread up ... */ + pthread_mutex_lock(&pargs->control_mutex); + pargs->done = 0; + pargs->start = 1; + pthread_mutex_unlock(&pargs->control_mutex); + if ( n_images % 1000 == 0 ) { if ( intermediate != NULL ) { dump_to_file(worker_args, nthreads, @@ -459,18 +502,25 @@ int main(int argc, char *argv[]) intermediate); } } + } } while ( rval != NULL ); - /* Catch all remaining threads */ + /* Join threads */ for ( i=0; i<nthreads; i++ ) { if ( !worker_active[i] ) goto free; - pthread_join(workers[i], NULL); - worker_active[i] = 0; + /* Tell the thread to exit */ + struct process_args *pargs = worker_args[i]; + pthread_mutex_lock(&pargs->control_mutex); + pargs->finish = 1; + pthread_mutex_unlock(&pargs->control_mutex); + + /* Wait for it to join */ + pthread_join(workers[i], NULL); free: if ( worker_args[i]->filename != NULL ) { |