From a22d0dc84c4411ca1b4583ac7857d5301c690f7c Mon Sep 17 00:00:00 2001 From: Thomas White Date: Mon, 11 Oct 2010 10:57:46 +0200 Subject: indexamajig: Use new thread pool --- src/Makefile.am | 2 +- src/Makefile.in | 6 +- src/calibrate_detector.c | 2 +- src/cubeit.c | 2 +- src/indexamajig.c | 345 +++++++++++++++++------------------------------ src/reintegrate.c | 2 +- src/thread-pool.c | 8 +- src/thread-pool.h | 10 +- 8 files changed, 143 insertions(+), 234 deletions(-) (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 46fb77b9..2b5c11d0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -24,7 +24,7 @@ process_hkl_LDADD = @LIBS@ indexamajig_SOURCES = indexamajig.c hdf5-file.c utils.c cell.c image.c \ peaks.c index.c filters.c diffraction.c detector.c \ sfac.c dirax.c reflections.c templates.c symmetry.c \ - geometry.c + geometry.c thread-pool.c indexamajig_LDADD = @LIBS@ if HAVE_OPENCL indexamajig_SOURCES += diffraction-gpu.c cl-utils.c diff --git a/src/Makefile.in b/src/Makefile.in index 4ffa7701..6c47670f 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -91,7 +91,7 @@ hdfsee_DEPENDENCIES = am__indexamajig_SOURCES_DIST = indexamajig.c hdf5-file.c utils.c \ cell.c image.c peaks.c index.c filters.c diffraction.c \ detector.c sfac.c dirax.c reflections.c templates.c symmetry.c \ - geometry.c diffraction-gpu.c cl-utils.c + geometry.c thread-pool.c diffraction-gpu.c cl-utils.c @HAVE_OPENCL_TRUE@am__objects_1 = diffraction-gpu.$(OBJEXT) \ @HAVE_OPENCL_TRUE@ cl-utils.$(OBJEXT) am_indexamajig_OBJECTS = indexamajig.$(OBJEXT) hdf5-file.$(OBJEXT) \ @@ -99,7 +99,7 @@ am_indexamajig_OBJECTS = indexamajig.$(OBJEXT) hdf5-file.$(OBJEXT) \ index.$(OBJEXT) filters.$(OBJEXT) diffraction.$(OBJEXT) \ detector.$(OBJEXT) sfac.$(OBJEXT) dirax.$(OBJEXT) \ reflections.$(OBJEXT) templates.$(OBJEXT) symmetry.$(OBJEXT) \ - geometry.$(OBJEXT) $(am__objects_1) + geometry.$(OBJEXT) thread-pool.$(OBJEXT) $(am__objects_1) indexamajig_OBJECTS = $(am_indexamajig_OBJECTS) indexamajig_DEPENDENCIES = am__pattern_sim_SOURCES_DIST = pattern_sim.c diffraction.c utils.c \ @@ -273,7 +273,7 @@ process_hkl_LDADD = @LIBS@ indexamajig_SOURCES = indexamajig.c hdf5-file.c utils.c cell.c image.c \ peaks.c index.c filters.c diffraction.c detector.c sfac.c \ dirax.c reflections.c templates.c symmetry.c geometry.c \ - $(am__append_3) + thread-pool.c $(am__append_3) indexamajig_LDADD = @LIBS@ @HAVE_GTK_TRUE@hdfsee_SOURCES = hdfsee.c displaywindow.c render.c hdf5-file.c utils.c image.c \ @HAVE_GTK_TRUE@ filters.c diff --git a/src/calibrate_detector.c b/src/calibrate_detector.c index ccf577dd..0d613849 100644 --- a/src/calibrate_detector.c +++ b/src/calibrate_detector.c @@ -389,7 +389,7 @@ int main(int argc, char *argv[]) do { n_done = run_threads(nthreads, add_image, get_image, - (void *)&qargs, chunk_size); + (void *)&qargs, NULL, chunk_size); n_images += n_done; diff --git a/src/cubeit.c b/src/cubeit.c index a8e969ff..ebb21151 100644 --- a/src/cubeit.c +++ b/src/cubeit.c @@ -568,7 +568,7 @@ int main(int argc, char *argv[]) qargs.static_args.bes = &bes; qargs.static_args.gas = &gas; - n_images = run_threads(nthreads, sum_image, get_image, &qargs, 0); + n_images = run_threads(nthreads, sum_image, get_image, NULL, &qargs, 0); fclose(fh); diff --git a/src/indexamajig.c b/src/indexamajig.c index a866ac3b..4af459bc 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -35,9 +35,7 @@ #include "sfac.h" #include "filters.h" #include "reflections.h" - - -#define MAX_THREADS (96) +#include "thread-pool.h" enum { @@ -46,11 +44,9 @@ enum { }; -struct process_args +/* Information about the indexing process which is common to all patterns */ +struct static_index_args { - /* Input */ - char *filename; - int id; pthread_mutex_t *gpu_mutex; /* Protects "gctx" */ UnitCell *cell; int config_cmfilter; @@ -76,20 +72,37 @@ struct process_args struct gpu_context *gctx; int peaks; - /* Thread control and output */ - pthread_mutex_t control_mutex; /* Protects the scary stuff below */ - int start; - int finish; - int done; - int indexable; - int sane; - /* Output stream */ pthread_mutex_t *output_mutex; /* Protects the output stream */ FILE *ofh; }; +/* Information about the indexing process for one pattern */ +struct index_args +{ + /* "Input" */ + char *filename; + struct static_index_args static_args; + + /* "Output" */ + int indexable; + int sane; +}; + + +/* Information needed to choose the next task and dispatch it */ +struct queue_args +{ + FILE *fh; + char *prefix; + struct static_index_args static_args; + + int n_indexable; + int n_sane; +}; + + static void show_help(const char *s) { printf("Syntax: %s [options]\n\n", s); @@ -100,6 +113,7 @@ static void show_help(const char *s) "\n" " -i, --input= Specify file containing list of images to process.\n" " '-' means stdin, which is the default.\n" +" -o, --output= Write indexed stream to this file. '-' for stdout.\n" "\n" " --indexing= Use 'method' for indexing. Choose from:\n" " none : no indexing (default)\n" @@ -273,38 +287,39 @@ static void simulate_and_write(struct image *simage, struct gpu_context **gctx, } -static void process_image(struct process_args *pargs) +static void process_image(void *pp, int cookie) { + struct index_args *pargs = pp; struct hdfile *hdfile; struct image image; struct image *simage; float *data_for_measurement; size_t data_size; char *filename = pargs->filename; - UnitCell *cell = pargs->cell; - int config_cmfilter = pargs->config_cmfilter; - int config_noisefilter = pargs->config_noisefilter; - int config_writedrx = pargs->config_writedrx; - int config_dumpfound = pargs->config_dumpfound; - int config_verbose = pargs->config_verbose; - int config_alternate = pargs->config_alternate; - int config_nearbragg = pargs->config_nearbragg; - int config_gpu = pargs->config_gpu; - int config_simulate = pargs->config_simulate; - int config_nomatch = pargs->config_nomatch; - int config_polar = pargs->config_polar; - IndexingMethod indm = pargs->indm; - const double *intensities = pargs->intensities; - struct gpu_context *gctx = pargs->gctx; + UnitCell *cell = pargs->static_args.cell; + int config_cmfilter = pargs->static_args.config_cmfilter; + int config_noisefilter = pargs->static_args.config_noisefilter; + int config_writedrx = pargs->static_args.config_writedrx; + int config_dumpfound = pargs->static_args.config_dumpfound; + int config_verbose = pargs->static_args.config_verbose; + int config_alternate = pargs->static_args.config_alternate; + int config_nearbragg = pargs->static_args.config_nearbragg; + int config_gpu = pargs->static_args.config_gpu; + int config_simulate = pargs->static_args.config_simulate; + int config_nomatch = pargs->static_args.config_nomatch; + int config_polar = pargs->static_args.config_polar; + IndexingMethod indm = pargs->static_args.indm; + const double *intensities = pargs->static_args.intensities; + struct gpu_context *gctx = pargs->static_args.gctx; image.features = NULL; image.data = NULL; image.indexed_cell = NULL; - image.id = pargs->id; + image.id = cookie; image.filename = filename; image.hits = NULL; image.n_hits = 0; - image.det = pargs->det; + image.det = pargs->static_args.det; /* View head-on (unit cell is tilted) */ image.orientation.w = 1.0; @@ -325,7 +340,7 @@ static void process_image(struct process_args *pargs) return; } - hdf5_read(hdfile, &image, pargs->config_satcorr); + hdf5_read(hdfile, &image, pargs->static_args.config_satcorr); if ( config_cmfilter ) { filter_cm(&image); @@ -342,7 +357,7 @@ static void process_image(struct process_args *pargs) memcpy(data_for_measurement, image.data, data_size); } - switch ( pargs->peaks ) + switch ( pargs->static_args.peaks ) { case PEAK_HDF5 : /* Get peaks from HDF5 */ @@ -352,7 +367,7 @@ static void process_image(struct process_args *pargs) } break; case PEAK_ZAEF : - search_peaks(&image, pargs->threshold); + search_peaks(&image, pargs->static_args.threshold); break; } @@ -362,7 +377,8 @@ static void process_image(struct process_args *pargs) image.data = data_for_measurement; if ( config_dumpfound ) { - dump_peaks(&image, pargs->ofh, pargs->output_mutex); + dump_peaks(&image, pargs->static_args.ofh, + pargs->static_args.output_mutex); } /* Not indexing nor writing xfel.drx? @@ -374,7 +390,7 @@ static void process_image(struct process_args *pargs) /* Calculate orientation matrix (by magic) */ if ( config_writedrx || (indm != INDEXING_NONE) ) { index_pattern(&image, cell, indm, config_nomatch, - config_verbose, pargs->ipriv); + config_verbose, pargs->static_args.ipriv); } /* No cell at this point? Then we're done. */ @@ -382,7 +398,7 @@ static void process_image(struct process_args *pargs) pargs->indexable = 1; /* Sanity check */ - if ( pargs->config_sanity + if ( pargs->static_args.config_sanity && !peak_sanity_check(&image, image.indexed_cell, 0, 0.1) ) { STATUS("Failed peak sanity check.\n"); goto done; @@ -393,9 +409,10 @@ static void process_image(struct process_args *pargs) /* Measure intensities if requested */ if ( config_nearbragg ) { output_intensities(&image, image.indexed_cell, - pargs->output_mutex, config_polar, - pargs->config_sa, pargs->config_closer, - pargs->ofh, 0, 0.1); + pargs->static_args.output_mutex, + config_polar, pargs->static_args.config_sa, + pargs->static_args.config_closer, + pargs->static_args.ofh, 0, 0.1); } simage = get_simage(&image, config_alternate); @@ -403,10 +420,10 @@ static void process_image(struct process_args *pargs) /* Simulate if requested */ if ( config_simulate ) { if ( config_gpu ) { - pthread_mutex_lock(pargs->gpu_mutex); + pthread_mutex_lock(pargs->static_args.gpu_mutex); simulate_and_write(simage, &gctx, intensities, image.indexed_cell); - pthread_mutex_unlock(pargs->gpu_mutex); + pthread_mutex_unlock(pargs->static_args.gpu_mutex); } else { simulate_and_write(simage, NULL, intensities, image.indexed_cell); @@ -430,37 +447,40 @@ done: } -static void *worker_thread(void *pargsv) +static void *get_image(void *qp) { - struct process_args *pargs = pargsv; - int finish; + char line[1024]; + struct index_args *pargs; + char *rval; + struct queue_args *qargs = qp; - do { + /* Get the next filename */ + rval = fgets(line, 1023, qargs->fh); + if ( rval == NULL ) return NULL; - int wakeup; + pargs = malloc(sizeof(struct index_args)); - process_image(pargs); + memcpy(&pargs->static_args, &qargs->static_args, + sizeof(struct static_index_args)); - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 1; - pargs->start = 0; - pthread_mutex_unlock(&pargs->control_mutex); + chomp(line); + pargs->filename = malloc(strlen(qargs->prefix) + strlen(line) + 1); + snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line); - /* Go to sleep until told to exit or process next image */ - do { + return pargs; +} - pthread_mutex_lock(&pargs->control_mutex); - /* Either of these can result in the thread waking up */ - wakeup = pargs->start || pargs->finish; - finish = pargs->finish; - pthread_mutex_unlock(&pargs->control_mutex); - usleep(20000); - } while ( !wakeup ); +static void finalise_image(void *qp, void *pp) +{ + struct queue_args *qargs = qp; + struct index_args *pargs = pp; - } while ( !pargs->finish ); + qargs->n_indexable += pargs->indexable; + qargs->n_sane += pargs->sane; - return NULL; + free(pargs->filename); + free(pargs); } @@ -474,8 +494,6 @@ int main(int argc, char *argv[]) FILE *ofh; char *rval = NULL; int n_images; - int n_indexable; - int n_sane; int config_noindex = 0; int config_dumpfound = 0; int config_nearbragg = 0; @@ -506,15 +524,13 @@ int main(int argc, char *argv[]) char *speaks = NULL; int peaks; int nthreads = 1; - pthread_t workers[MAX_THREADS]; - struct process_args *worker_args[MAX_THREADS]; - int worker_active[MAX_THREADS]; int i; pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t gpu_mutex = PTHREAD_MUTEX_INITIALIZER; char prepare_line[1024]; char prepare_filename[1024]; IndexingPrivate *ipriv; + struct queue_args qargs; /* Long options */ const struct option longopts[] = { @@ -670,7 +686,7 @@ int main(int argc, char *argv[]) } } - if ( (nthreads == 0) || (nthreads > MAX_THREADS) ) { + if ( nthreads == 0 ) { ERROR("Invalid number of threads.\n"); return 1; } @@ -740,158 +756,41 @@ int main(int argc, char *argv[]) } gsl_set_error_handler_off(); - n_images = 0; - n_indexable = 0; - n_sane = 0; - - for ( i=0; ifilename = malloc(1024); - worker_args[i]->ofh = ofh; - worker_args[i]->peaks = peaks; - worker_active[i] = 0; - } - - /* Start threads off */ - for ( i=0; i 0 ) { - strcpy(line, prepare_line); - prepare_line[0] = '\0'; - } else { - rval = fgets(line, 1023, fh); - if ( rval == NULL ) continue; - } - chomp(line); - snprintf(pargs->filename, 1023, "%s%s", prefix, line); - - n_images++; - - pargs->output_mutex = &output_mutex; - pargs->gpu_mutex = &gpu_mutex; - pthread_mutex_init(&pargs->control_mutex, NULL); - pargs->config_cmfilter = config_cmfilter; - pargs->config_noisefilter = config_noisefilter; - pargs->config_writedrx = config_writedrx; - pargs->config_dumpfound = config_dumpfound; - pargs->config_verbose = config_verbose; - pargs->config_alternate = config_alternate; - pargs->config_nearbragg = config_nearbragg; - pargs->config_gpu = config_gpu; - pargs->config_simulate = config_simulate; - pargs->config_nomatch = config_nomatch; - pargs->config_polar = config_polar; - pargs->config_sanity = config_sanity; - pargs->config_satcorr = config_satcorr; - pargs->config_sa = config_sa; - pargs->config_closer = config_closer; - pargs->cell = cell; - pargs->det = det; - pargs->ipriv = ipriv; - pargs->indm = indm; - pargs->intensities = intensities; - pargs->gctx = gctx; - pargs->threshold = threshold; - pargs->id = i; - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pargs->finish = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, worker_thread, pargs); - if ( r != 0 ) { - worker_active[i] = 0; - ERROR("Couldn't start thread %i\n", i); - } - - } - - /* Keep threads busy until the end of the data */ - do { - - int i; - - for ( i=0; icontrol_mutex); - done = pargs->done; - pthread_mutex_unlock(&pargs->control_mutex); - if ( !done ) continue; - - /* Results will be processed after checking if - * there are any more images to process. */ - - /* Get next filename */ - rval = fgets(line, 1023, fh); - /* In this case, the result of the last file - * file will be processed when the thread is - * joined. */ - if ( rval == NULL ) break; - - /* Record the result */ - n_indexable += pargs->indexable; - n_sane += pargs->sane; - - chomp(line); - snprintf(pargs->filename, 1023, "%s%s", prefix, line); - - n_images++; - - /* Wake the thread up ... */ - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - } - - } while ( rval != NULL ); - - /* Join threads */ - for ( i=0; icontrol_mutex); - pargs->finish = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Wait for it to join */ - pthread_join(workers[i], NULL); - worker_active[i] = 0; - - n_indexable += pargs->indexable; - n_sane += pargs->sane; - - free: - if ( worker_args[i]->filename != NULL ) { - free(worker_args[i]->filename); - } - free(worker_args[i]); - - } + qargs.static_args.gpu_mutex = &gpu_mutex; + qargs.static_args.cell = cell; + qargs.static_args.config_cmfilter = config_cmfilter; + qargs.static_args.config_noisefilter = config_noisefilter; + qargs.static_args.config_writedrx = config_writedrx; + qargs.static_args.config_dumpfound = config_dumpfound; + qargs.static_args.config_verbose = config_verbose; + qargs.static_args.config_alternate = config_alternate; + qargs.static_args.config_nearbragg = config_nearbragg; + qargs.static_args.config_gpu = config_gpu; + qargs.static_args.config_simulate = config_simulate; + qargs.static_args.config_nomatch = config_nomatch; + qargs.static_args.config_polar = config_polar; + qargs.static_args.config_sanity = config_sanity; + qargs.static_args.config_satcorr = config_satcorr; + qargs.static_args.config_sa = config_sa; + qargs.static_args.config_closer = config_closer; + qargs.static_args.threshold = threshold; + qargs.static_args.det = det; + qargs.static_args.indm = indm; + qargs.static_args.ipriv = ipriv; + qargs.static_args.intensities = intensities; + qargs.static_args.gctx = gctx; + qargs.static_args.peaks = peaks; + qargs.static_args.output_mutex = &output_mutex; + qargs.static_args.ofh = ofh; + + qargs.fh = fh; + qargs.prefix = prefix; + qargs.n_indexable = 0; + qargs.n_sane = 0; + + n_images = run_threads(nthreads, process_image, get_image, + finalise_image, &qargs, 0); cleanup_indexing(ipriv); @@ -902,7 +801,7 @@ int main(int argc, char *argv[]) fclose(fh); STATUS("There were %i images. %i could be indexed, of which %i" - " looked sane.\n", n_images, n_indexable, n_sane); + " looked sane.\n", n_images, qargs.n_indexable, qargs.n_sane); if ( gctx != NULL ) { cleanup_gpu(gctx); diff --git a/src/reintegrate.c b/src/reintegrate.c index 50e73877..b45ef9c5 100644 --- a/src/reintegrate.c +++ b/src/reintegrate.c @@ -214,7 +214,7 @@ static void integrate_all(int nthreads, struct detector *det, FILE *fh, qargs.static_args.config_sanity = qargs.static_args.config_sanity; qargs.static_args.output_mutex = &output_mutex; - run_threads(nthreads, process_image, get_image, &qargs, 0); + run_threads(nthreads, process_image, get_image, NULL, &qargs, 0); } diff --git a/src/thread-pool.c b/src/thread-pool.c index 6705aa18..6228004b 100644 --- a/src/thread-pool.c +++ b/src/thread-pool.c @@ -150,6 +150,7 @@ struct task_queue int *cookies; void *(*get_task)(void *); + void (*finalise)(void *, void *); void *queue_args; void (*work)(void *, int); }; @@ -200,6 +201,9 @@ static void *task_worker(void *pargsv) pthread_mutex_lock(&q->lock); q->n_completed++; q->cookies[mycookie] = 0; + if ( q->finalise ) { + q->finalise(q, task); + } pthread_mutex_unlock(&q->lock); } while ( 1 ); @@ -209,7 +213,8 @@ static void *task_worker(void *pargsv) int run_threads(int n_threads, void (*work)(void *, int), - void *(*get_task)(void *), void *queue_args, int max) + void *(*get_task)(void *), void (*final)(void *, void *), + void *queue_args, int max) { pthread_t *workers; int i; @@ -220,6 +225,7 @@ int run_threads(int n_threads, void (*work)(void *, int), pthread_mutex_init(&q.lock, NULL); q.work = work; q.get_task = get_task; + q.finalise = final; q.queue_args = queue_args; q.n_started = 0; q.n_completed = 0; diff --git a/src/thread-pool.h b/src/thread-pool.h index 3376b8fe..05e2f55a 100644 --- a/src/thread-pool.h +++ b/src/thread-pool.h @@ -20,7 +20,7 @@ /* work() will be called with a number and work_args. The number will be * unique and in the range 0..n_tasks. A progress bar will be shown using - * "text" and the progress through the tasks. */ + * "text" and the progress through the tasks, unless "text" is NULL. */ extern void run_thread_range(int n_tasks, int n_threads, const char *text, void (*work)(int, void *), void *work_args); @@ -28,10 +28,14 @@ extern void run_thread_range(int n_tasks, int n_threads, const char *text, /* get_task() will be called every time a worker is idle. It returns either * NULL, indicating that no further work is available, or a pointer which will * be passed to work(). Work will stop after 'max' tasks have been processed. - * get_task() does not need to be re-entrant. + * final() will be called once per image, and will be given both queue_args + * and the last task pointer. + * get_task() and final() do NOT need to be re-entrant. + * If "max" is zero, all tasks will be processed. * Returns: the number of tasks processed. */ extern int run_threads(int n_threads, void (*work)(void *, int), - void *(*get_task)(void *), void *queue_args, int max); + void *(*get_task)(void *), void (*final)(void *, void *), + void *queue_args, int max); #endif /* THREAD_POOL_H */ -- cgit v1.2.3