diff options
author | Thomas White <taw@bitwiz.org.uk> | 2010-10-10 20:32:41 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2012-02-22 15:27:02 +0100 |
commit | 33d4fbfbbee784f734632e543a75222f38bc807d (patch) | |
tree | c9281f611a9da26433e4c3770ffce440ef25d3f9 /src/calibrate_detector.c | |
parent | a9adbc73158e8f8b225cd59d62ad5ade648c8241 (diff) |
calibrate_detecotr: Use new thread pool
Diffstat (limited to 'src/calibrate_detector.c')
-rw-r--r-- | src/calibrate_detector.c | 287 |
1 files changed, 72 insertions, 215 deletions
diff --git a/src/calibrate_detector.c b/src/calibrate_detector.c index e5945015..2c68f645 100644 --- a/src/calibrate_detector.c +++ b/src/calibrate_detector.c @@ -27,6 +27,7 @@ #include "hdf5-file.h" #include "filters.h" #include "peaks.h" +#include "thread-pool.h" #define INTEGRATION_RADIUS (10) @@ -39,10 +40,9 @@ typedef enum SUM_PEAKS } SumMethod; -struct process_args +struct sum_args { char *filename; - int id; int config_cmfilter; int config_noisefilter; double *sum; @@ -50,12 +50,20 @@ struct process_args int h; SumMethod sum_method; double threshold; +}; + - /* Thread control */ - pthread_mutex_t control_mutex; /* Protects the scary stuff below */ - int start; - int finish; - int done; +struct queue_args +{ + FILE *fh; + char *prefix; + int config_cmfilter; + int config_noisefilter; + double *sum; + int w; + int h; + SumMethod sum_method; + double threshold; }; @@ -151,8 +159,9 @@ static void sum_threshold(struct image *image, double *sum, double threshold) } -static void process_image(struct process_args *pargs) +static void add_image(void *args) { + struct sum_args *pargs = args; struct hdfile *hdfile; struct image image; @@ -160,7 +169,6 @@ static void process_image(struct process_args *pargs) image.data = NULL; image.flags = NULL; image.indexed_cell = NULL; - image.id = pargs->id; image.filename = pargs->filename; image.hits = NULL; image.n_hits = 0; @@ -215,71 +223,38 @@ out: image_feature_list_free(image.features); if ( image.flags != NULL ) free(image.flags); hdfile_close(hdfile); -} - - -static void *worker_thread(void *pargsv) -{ - struct process_args *pargs = pargsv; - int finish; - - do { - - int wakeup; - - process_image(pargs); - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 1; - pargs->start = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Go to sleep until told to exit or process next image */ - do { - - pthread_mutex_lock(&pargs->control_mutex); - /* Either of these can result in the thread waking up */ - wakeup = pargs->start || pargs->finish; - finish = pargs->finish; - pthread_mutex_unlock(&pargs->control_mutex); - usleep(20000); - - } while ( !wakeup ); - - } while ( !pargs->finish ); - - return NULL; + free(pargs->filename); + free(pargs); } -static void dump_to_file(struct process_args *worker_args[], int nthreads, - int w, int h, int n, const char *stem) +static void *get_image(void *qp) { - int i; - double *total; - char outfile[256]; - - total = calloc(w*h, sizeof(double)); - - /* Add the individual sums to the 0th sum */ - for ( i=0; i<nthreads; i++ ) { - - int x, y; - - for ( x=0; x<w; x++ ) { - for ( y=0; y<h; y++ ) { - double val = worker_args[i]->sum[x+w*y]; - total[x+w*y] += val; - } - } - - } - - snprintf(outfile, 255, "%s-%i.h5", stem, n); - - hdf5_write(outfile, total, w, h, H5T_NATIVE_DOUBLE); - - free(total); + char line[1024]; + struct sum_args *pargs; + char *rval; + struct queue_args *qargs = qp; + + /* Get the next filename */ + rval = fgets(line, 1023, qargs->fh); + if ( rval == NULL ) return NULL; + + pargs = malloc(sizeof(struct sum_args)); + + pargs->w = qargs->w; + pargs->h = qargs->h; + pargs->sum_method = qargs->sum_method; + pargs->threshold = qargs->threshold; + pargs->config_cmfilter = qargs->config_cmfilter; + pargs->config_noisefilter = qargs->config_noisefilter; + pargs->sum = qargs->sum; + + chomp(line); + pargs->filename = malloc(strlen(qargs->prefix) + strlen(line) + 1); + snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line); + + return pargs; } @@ -289,8 +264,7 @@ int main(int argc, char *argv[]) char *filename = NULL; char *outfile = NULL; FILE *fh; - char *rval = NULL; - int n_images; + int n_images = 0; int config_cmfilter = 0; int config_noisefilter = 0; char *prefix = NULL; @@ -299,12 +273,9 @@ int main(int argc, char *argv[]) double threshold = 400.0; SumMethod sum; int nthreads = 1; - pthread_t workers[MAX_THREADS]; - struct process_args *worker_args[MAX_THREADS]; - int worker_active[MAX_THREADS]; - int i; - const int w = 1024; /* FIXME! */ - const int h = 1024; /* FIXME! */ + struct queue_args qargs; + int n_done; + const int chunk_size = 1000; /* Long options */ const struct option longopts[] = { @@ -402,157 +373,43 @@ int main(int argc, char *argv[]) outfile = strdup("summed.h5"); } - if ( (nthreads == 0) || (nthreads > MAX_THREADS) ) { + if ( nthreads == 0 ) { ERROR("Invalid number of threads.\n"); return 1; } - /* Initialise worker arguments */ - for ( i=0; i<nthreads; i++ ) { - - worker_args[i] = malloc(sizeof(struct process_args)); - worker_args[i]->filename = malloc(1024); - worker_args[i]->sum = calloc(w*h, sizeof(double)); - worker_active[i] = 0; - - worker_args[i]->w = w; - worker_args[i]->h = h; - worker_args[i]->sum_method = sum; - worker_args[i]->threshold = threshold; - - } - - n_images = 0; - - /* Start threads off */ - for ( i=0; i<nthreads; i++ ) { - - char line[1024]; - struct process_args *pargs; - int r; - - pargs = worker_args[i]; + qargs.w = 1024; /* FIXME! */ + qargs.h = 1024; /* FIXME! */ + qargs.sum_method = sum; + qargs.threshold = threshold; + qargs.config_cmfilter = config_cmfilter; + qargs.config_noisefilter = config_noisefilter; + qargs.sum = calloc(qargs.w*qargs.h, sizeof(double)); + qargs.prefix = prefix; + qargs.fh = fh; - rval = fgets(line, 1023, fh); - if ( rval == NULL ) continue; - chomp(line); - snprintf(pargs->filename, 1023, "%s%s", prefix, line); - - n_images++; - - pthread_mutex_init(&pargs->control_mutex, NULL); - pargs->config_cmfilter = config_cmfilter; - pargs->config_noisefilter = config_noisefilter; - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pargs->finish = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - - worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, worker_thread, pargs); - if ( r != 0 ) { - worker_active[i] = 0; - ERROR("Couldn't start thread %i\n", i); - } - - } - - /* Keep threads busy until the end of the data */ do { - int i; - - for ( i=0; i<nthreads; i++ ) { - - char line[1024]; - struct process_args *pargs; - int done; - - /* Spend time working, not managing threads */ - usleep(100000); - - /* Are we using this thread record at all? */ - if ( !worker_active[i] ) continue; - - /* Has the thread finished yet? */ - pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - done = pargs->done; - pthread_mutex_unlock(&pargs->control_mutex); - if ( !done ) continue; - - /* Get the next filename */ - rval = fgets(line, 1023, fh); - if ( rval == NULL ) break; - chomp(line); - snprintf(pargs->filename, 1023, "%s%s", prefix, line); + n_done = run_threads(nthreads, add_image, get_image, + (void *)&qargs, chunk_size); - n_images++; - - STATUS("Done %i images\n", n_images); - - /* Wake the thread up ... */ - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - if ( n_images % 1000 == 0 ) { - if ( intermediate != NULL ) { - dump_to_file(worker_args, nthreads, - w, h, n_images, - intermediate); - } - } + n_images += n_done; + /* Write intermediate sum if requested */ + if ( (intermediate != NULL) && (n_done == chunk_size) ) { + char outfile[1024]; + snprintf(outfile, 1023, "%s-%i.h5", + intermediate, n_images); + hdf5_write(outfile, qargs.sum, qargs.w, qargs.h, + H5T_NATIVE_DOUBLE); } - } while ( rval != NULL ); - - /* Join threads */ - for ( i=0; i<nthreads; i++ ) { - - if ( !worker_active[i] ) goto free; - - - /* Tell the thread to exit */ - struct process_args *pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - pargs->finish = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Wait for it to join */ - pthread_join(workers[i], NULL); - - free: - if ( worker_args[i]->filename != NULL ) { - free(worker_args[i]->filename); - } - - } - - /* Add the individual sums to the 0th sum */ - for ( i=1; i<nthreads; i++ ) { - - int x, y; - - for ( x=0; x<w; x++ ) { - for ( y=0; y<h; y++ ) { - double val = worker_args[i]->sum[x+w*y]; - worker_args[0]->sum[x+w*y] += val; - } - } - free(worker_args[i]->sum); - free(worker_args[i]); - - } + } while ( n_done == chunk_size ); - hdf5_write(outfile, worker_args[0]->sum, w, h, H5T_NATIVE_DOUBLE); + /* Write the final output */ + hdf5_write(outfile, qargs.sum, qargs.w, qargs.h, H5T_NATIVE_DOUBLE); - free(worker_args[0]->sum); - free(worker_args[0]); + free(qargs.sum); free(prefix); free(outfile); if ( intermediate != NULL ) free(intermediate); |