diff options
author | Thomas White <taw@physics.org> | 2010-10-11 10:57:46 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2012-02-22 15:27:02 +0100 |
commit | a22d0dc84c4411ca1b4583ac7857d5301c690f7c (patch) | |
tree | b7639b8813ef6c8f97160fb8d8a8391a27664eea /src/indexamajig.c | |
parent | a7d2cab127719eeef82584664b1abbbee06656c4 (diff) |
indexamajig: Use new thread pool
Diffstat (limited to 'src/indexamajig.c')
-rw-r--r-- | src/indexamajig.c | 345 |
1 files changed, 122 insertions, 223 deletions
diff --git a/src/indexamajig.c b/src/indexamajig.c index a866ac3b..4af459bc 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -35,9 +35,7 @@ #include "sfac.h" #include "filters.h" #include "reflections.h" - - -#define MAX_THREADS (96) +#include "thread-pool.h" enum { @@ -46,11 +44,9 @@ enum { }; -struct process_args +/* Information about the indexing process which is common to all patterns */ +struct static_index_args { - /* Input */ - char *filename; - int id; pthread_mutex_t *gpu_mutex; /* Protects "gctx" */ UnitCell *cell; int config_cmfilter; @@ -76,20 +72,37 @@ struct process_args struct gpu_context *gctx; int peaks; - /* Thread control and output */ - pthread_mutex_t control_mutex; /* Protects the scary stuff below */ - int start; - int finish; - int done; - int indexable; - int sane; - /* Output stream */ pthread_mutex_t *output_mutex; /* Protects the output stream */ FILE *ofh; }; +/* Information about the indexing process for one pattern */ +struct index_args +{ + /* "Input" */ + char *filename; + struct static_index_args static_args; + + /* "Output" */ + int indexable; + int sane; +}; + + +/* Information needed to choose the next task and dispatch it */ +struct queue_args +{ + FILE *fh; + char *prefix; + struct static_index_args static_args; + + int n_indexable; + int n_sane; +}; + + static void show_help(const char *s) { printf("Syntax: %s [options]\n\n", s); @@ -100,6 +113,7 @@ static void show_help(const char *s) "\n" " -i, --input=<filename> Specify file containing list of images to process.\n" " '-' means stdin, which is the default.\n" +" -o, --output=<filename> Write indexed stream to this file. '-' for stdout.\n" "\n" " --indexing=<method> Use 'method' for indexing. Choose from:\n" " none : no indexing (default)\n" @@ -273,38 +287,39 @@ static void simulate_and_write(struct image *simage, struct gpu_context **gctx, } -static void process_image(struct process_args *pargs) +static void process_image(void *pp, int cookie) { + struct index_args *pargs = pp; struct hdfile *hdfile; struct image image; struct image *simage; float *data_for_measurement; size_t data_size; char *filename = pargs->filename; - UnitCell *cell = pargs->cell; - int config_cmfilter = pargs->config_cmfilter; - int config_noisefilter = pargs->config_noisefilter; - int config_writedrx = pargs->config_writedrx; - int config_dumpfound = pargs->config_dumpfound; - int config_verbose = pargs->config_verbose; - int config_alternate = pargs->config_alternate; - int config_nearbragg = pargs->config_nearbragg; - int config_gpu = pargs->config_gpu; - int config_simulate = pargs->config_simulate; - int config_nomatch = pargs->config_nomatch; - int config_polar = pargs->config_polar; - IndexingMethod indm = pargs->indm; - const double *intensities = pargs->intensities; - struct gpu_context *gctx = pargs->gctx; + UnitCell *cell = pargs->static_args.cell; + int config_cmfilter = pargs->static_args.config_cmfilter; + int config_noisefilter = pargs->static_args.config_noisefilter; + int config_writedrx = pargs->static_args.config_writedrx; + int config_dumpfound = pargs->static_args.config_dumpfound; + int config_verbose = pargs->static_args.config_verbose; + int config_alternate = pargs->static_args.config_alternate; + int config_nearbragg = pargs->static_args.config_nearbragg; + int config_gpu = pargs->static_args.config_gpu; + int config_simulate = pargs->static_args.config_simulate; + int config_nomatch = pargs->static_args.config_nomatch; + int config_polar = pargs->static_args.config_polar; + IndexingMethod indm = pargs->static_args.indm; + const double *intensities = pargs->static_args.intensities; + struct gpu_context *gctx = pargs->static_args.gctx; image.features = NULL; image.data = NULL; image.indexed_cell = NULL; - image.id = pargs->id; + image.id = cookie; image.filename = filename; image.hits = NULL; image.n_hits = 0; - image.det = pargs->det; + image.det = pargs->static_args.det; /* View head-on (unit cell is tilted) */ image.orientation.w = 1.0; @@ -325,7 +340,7 @@ static void process_image(struct process_args *pargs) return; } - hdf5_read(hdfile, &image, pargs->config_satcorr); + hdf5_read(hdfile, &image, pargs->static_args.config_satcorr); if ( config_cmfilter ) { filter_cm(&image); @@ -342,7 +357,7 @@ static void process_image(struct process_args *pargs) memcpy(data_for_measurement, image.data, data_size); } - switch ( pargs->peaks ) + switch ( pargs->static_args.peaks ) { case PEAK_HDF5 : /* Get peaks from HDF5 */ @@ -352,7 +367,7 @@ static void process_image(struct process_args *pargs) } break; case PEAK_ZAEF : - search_peaks(&image, pargs->threshold); + search_peaks(&image, pargs->static_args.threshold); break; } @@ -362,7 +377,8 @@ static void process_image(struct process_args *pargs) image.data = data_for_measurement; if ( config_dumpfound ) { - dump_peaks(&image, pargs->ofh, pargs->output_mutex); + dump_peaks(&image, pargs->static_args.ofh, + pargs->static_args.output_mutex); } /* Not indexing nor writing xfel.drx? @@ -374,7 +390,7 @@ static void process_image(struct process_args *pargs) /* Calculate orientation matrix (by magic) */ if ( config_writedrx || (indm != INDEXING_NONE) ) { index_pattern(&image, cell, indm, config_nomatch, - config_verbose, pargs->ipriv); + config_verbose, pargs->static_args.ipriv); } /* No cell at this point? Then we're done. */ @@ -382,7 +398,7 @@ static void process_image(struct process_args *pargs) pargs->indexable = 1; /* Sanity check */ - if ( pargs->config_sanity + if ( pargs->static_args.config_sanity && !peak_sanity_check(&image, image.indexed_cell, 0, 0.1) ) { STATUS("Failed peak sanity check.\n"); goto done; @@ -393,9 +409,10 @@ static void process_image(struct process_args *pargs) /* Measure intensities if requested */ if ( config_nearbragg ) { output_intensities(&image, image.indexed_cell, - pargs->output_mutex, config_polar, - pargs->config_sa, pargs->config_closer, - pargs->ofh, 0, 0.1); + pargs->static_args.output_mutex, + config_polar, pargs->static_args.config_sa, + pargs->static_args.config_closer, + pargs->static_args.ofh, 0, 0.1); } simage = get_simage(&image, config_alternate); @@ -403,10 +420,10 @@ static void process_image(struct process_args *pargs) /* Simulate if requested */ if ( config_simulate ) { if ( config_gpu ) { - pthread_mutex_lock(pargs->gpu_mutex); + pthread_mutex_lock(pargs->static_args.gpu_mutex); simulate_and_write(simage, &gctx, intensities, image.indexed_cell); - pthread_mutex_unlock(pargs->gpu_mutex); + pthread_mutex_unlock(pargs->static_args.gpu_mutex); } else { simulate_and_write(simage, NULL, intensities, image.indexed_cell); @@ -430,37 +447,40 @@ done: } -static void *worker_thread(void *pargsv) +static void *get_image(void *qp) { - struct process_args *pargs = pargsv; - int finish; + char line[1024]; + struct index_args *pargs; + char *rval; + struct queue_args *qargs = qp; - do { + /* Get the next filename */ + rval = fgets(line, 1023, qargs->fh); + if ( rval == NULL ) return NULL; - int wakeup; + pargs = malloc(sizeof(struct index_args)); - process_image(pargs); + memcpy(&pargs->static_args, &qargs->static_args, + sizeof(struct static_index_args)); - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 1; - pargs->start = 0; - pthread_mutex_unlock(&pargs->control_mutex); + chomp(line); + pargs->filename = malloc(strlen(qargs->prefix) + strlen(line) + 1); + snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line); - /* Go to sleep until told to exit or process next image */ - do { + return pargs; +} - pthread_mutex_lock(&pargs->control_mutex); - /* Either of these can result in the thread waking up */ - wakeup = pargs->start || pargs->finish; - finish = pargs->finish; - pthread_mutex_unlock(&pargs->control_mutex); - usleep(20000); - } while ( !wakeup ); +static void finalise_image(void *qp, void *pp) +{ + struct queue_args *qargs = qp; + struct index_args *pargs = pp; - } while ( !pargs->finish ); + qargs->n_indexable += pargs->indexable; + qargs->n_sane += pargs->sane; - return NULL; + free(pargs->filename); + free(pargs); } @@ -474,8 +494,6 @@ int main(int argc, char *argv[]) FILE *ofh; char *rval = NULL; int n_images; - int n_indexable; - int n_sane; int config_noindex = 0; int config_dumpfound = 0; int config_nearbragg = 0; @@ -506,15 +524,13 @@ int main(int argc, char *argv[]) char *speaks = NULL; int peaks; int nthreads = 1; - pthread_t workers[MAX_THREADS]; - struct process_args *worker_args[MAX_THREADS]; - int worker_active[MAX_THREADS]; int i; pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t gpu_mutex = PTHREAD_MUTEX_INITIALIZER; char prepare_line[1024]; char prepare_filename[1024]; IndexingPrivate *ipriv; + struct queue_args qargs; /* Long options */ const struct option longopts[] = { @@ -670,7 +686,7 @@ int main(int argc, char *argv[]) } } - if ( (nthreads == 0) || (nthreads > MAX_THREADS) ) { + if ( nthreads == 0 ) { ERROR("Invalid number of threads.\n"); return 1; } @@ -740,158 +756,41 @@ int main(int argc, char *argv[]) } gsl_set_error_handler_off(); - n_images = 0; - n_indexable = 0; - n_sane = 0; - - for ( i=0; i<nthreads; i++ ) { - worker_args[i] = malloc(sizeof(struct process_args)); - worker_args[i]->filename = malloc(1024); - worker_args[i]->ofh = ofh; - worker_args[i]->peaks = peaks; - worker_active[i] = 0; - } - - /* Start threads off */ - for ( i=0; i<nthreads; i++ ) { - - char line[1024]; - struct process_args *pargs; - int r; - - pargs = worker_args[i]; - if ( strlen(prepare_line) > 0 ) { - strcpy(line, prepare_line); - prepare_line[0] = '\0'; - } else { - rval = fgets(line, 1023, fh); - if ( rval == NULL ) continue; - } - chomp(line); - snprintf(pargs->filename, 1023, "%s%s", prefix, line); - - n_images++; - - pargs->output_mutex = &output_mutex; - pargs->gpu_mutex = &gpu_mutex; - pthread_mutex_init(&pargs->control_mutex, NULL); - pargs->config_cmfilter = config_cmfilter; - pargs->config_noisefilter = config_noisefilter; - pargs->config_writedrx = config_writedrx; - pargs->config_dumpfound = config_dumpfound; - pargs->config_verbose = config_verbose; - pargs->config_alternate = config_alternate; - pargs->config_nearbragg = config_nearbragg; - pargs->config_gpu = config_gpu; - pargs->config_simulate = config_simulate; - pargs->config_nomatch = config_nomatch; - pargs->config_polar = config_polar; - pargs->config_sanity = config_sanity; - pargs->config_satcorr = config_satcorr; - pargs->config_sa = config_sa; - pargs->config_closer = config_closer; - pargs->cell = cell; - pargs->det = det; - pargs->ipriv = ipriv; - pargs->indm = indm; - pargs->intensities = intensities; - pargs->gctx = gctx; - pargs->threshold = threshold; - pargs->id = i; - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pargs->finish = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, worker_thread, pargs); - if ( r != 0 ) { - worker_active[i] = 0; - ERROR("Couldn't start thread %i\n", i); - } - - } - - /* Keep threads busy until the end of the data */ - do { - - int i; - - for ( i=0; i<nthreads; i++ ) { - - char line[1024]; - struct process_args *pargs; - int done; - - /* Spend CPU time indexing, not checking results */ - usleep(100000); - - /* Are we using this thread record at all? */ - if ( !worker_active[i] ) continue; - - /* Has the thread finished yet? */ - pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - done = pargs->done; - pthread_mutex_unlock(&pargs->control_mutex); - if ( !done ) continue; - - /* Results will be processed after checking if - * there are any more images to process. */ - - /* Get next filename */ - rval = fgets(line, 1023, fh); - /* In this case, the result of the last file - * file will be processed when the thread is - * joined. */ - if ( rval == NULL ) break; - - /* Record the result */ - n_indexable += pargs->indexable; - n_sane += pargs->sane; - - chomp(line); - snprintf(pargs->filename, 1023, "%s%s", prefix, line); - - n_images++; - - /* Wake the thread up ... */ - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - } - - } while ( rval != NULL ); - - /* Join threads */ - for ( i=0; i<nthreads; i++ ) { - - if ( !worker_active[i] ) goto free; - - /* Tell the thread to exit */ - struct process_args *pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - pargs->finish = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Wait for it to join */ - pthread_join(workers[i], NULL); - worker_active[i] = 0; - - n_indexable += pargs->indexable; - n_sane += pargs->sane; - - free: - if ( worker_args[i]->filename != NULL ) { - free(worker_args[i]->filename); - } - free(worker_args[i]); - - } + qargs.static_args.gpu_mutex = &gpu_mutex; + qargs.static_args.cell = cell; + qargs.static_args.config_cmfilter = config_cmfilter; + qargs.static_args.config_noisefilter = config_noisefilter; + qargs.static_args.config_writedrx = config_writedrx; + qargs.static_args.config_dumpfound = config_dumpfound; + qargs.static_args.config_verbose = config_verbose; + qargs.static_args.config_alternate = config_alternate; + qargs.static_args.config_nearbragg = config_nearbragg; + qargs.static_args.config_gpu = config_gpu; + qargs.static_args.config_simulate = config_simulate; + qargs.static_args.config_nomatch = config_nomatch; + qargs.static_args.config_polar = config_polar; + qargs.static_args.config_sanity = config_sanity; + qargs.static_args.config_satcorr = config_satcorr; + qargs.static_args.config_sa = config_sa; + qargs.static_args.config_closer = config_closer; + qargs.static_args.threshold = threshold; + qargs.static_args.det = det; + qargs.static_args.indm = indm; + qargs.static_args.ipriv = ipriv; + qargs.static_args.intensities = intensities; + qargs.static_args.gctx = gctx; + qargs.static_args.peaks = peaks; + qargs.static_args.output_mutex = &output_mutex; + qargs.static_args.ofh = ofh; + + qargs.fh = fh; + qargs.prefix = prefix; + qargs.n_indexable = 0; + qargs.n_sane = 0; + + n_images = run_threads(nthreads, process_image, get_image, + finalise_image, &qargs, 0); cleanup_indexing(ipriv); @@ -902,7 +801,7 @@ int main(int argc, char *argv[]) fclose(fh); STATUS("There were %i images. %i could be indexed, of which %i" - " looked sane.\n", n_images, n_indexable, n_sane); + " looked sane.\n", n_images, qargs.n_indexable, qargs.n_sane); if ( gctx != NULL ) { cleanup_gpu(gctx); |