diff options
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | Makefile.in | 2 | ||||
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/Makefile.in | 5 | ||||
-rw-r--r-- | src/facetron.c | 251 | ||||
-rw-r--r-- | src/thread-pool.c | 123 | ||||
-rw-r--r-- | src/thread-pool.h | 25 |
7 files changed, 212 insertions, 198 deletions
diff --git a/Makefile.am b/Makefile.am index d57c60f5..28617e06 100644 --- a/Makefile.am +++ b/Makefile.am @@ -10,5 +10,5 @@ EXTRA_DIST = configure src/cell.h src/hdf5-file.h src/image.h \ data/sfac/Ca.nff data/sfac/C.nff data/sfac/Fe.nff data/sfac/H.nff \ data/sfac/Mg.nff data/sfac/N.nff data/sfac/O.nff data/sfac/P.nff \ data/sfac/S.nff data/sfac/f0_WaasKirf.dat src/render_hkl.h \ - src/stream.h + src/stream.h src/thread-pool.h SUBDIRS = src data doc doc/examples scripts diff --git a/Makefile.in b/Makefile.in index 5a2466e5..cb24fdaa 100644 --- a/Makefile.in +++ b/Makefile.in @@ -208,7 +208,7 @@ EXTRA_DIST = configure src/cell.h src/hdf5-file.h src/image.h \ data/sfac/Ca.nff data/sfac/C.nff data/sfac/Fe.nff data/sfac/H.nff \ data/sfac/Mg.nff data/sfac/N.nff data/sfac/O.nff data/sfac/P.nff \ data/sfac/S.nff data/sfac/f0_WaasKirf.dat src/render_hkl.h \ - src/stream.h + src/stream.h src/thread-pool.h SUBDIRS = src data doc doc/examples scripts all: config.h diff --git a/src/Makefile.am b/src/Makefile.am index 721c80f1..6733dc3a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -57,7 +57,7 @@ calibrate_detector_SOURCES = calibrate_detector.c utils.c hdf5-file.c image.c \ calibrate_detector_LDADD = @LIBS@ facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \ - image.c geometry.c reflections.c stream.c + image.c geometry.c reflections.c stream.c thread-pool.c facetron_LDADD = @LIBS@ cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \ diff --git a/src/Makefile.in b/src/Makefile.in index 7cfc3494..c27c0ec9 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -73,7 +73,7 @@ cubeit_DEPENDENCIES = am_facetron_OBJECTS = facetron.$(OBJEXT) cell.$(OBJEXT) \ hdf5-file.$(OBJEXT) utils.$(OBJEXT) detector.$(OBJEXT) \ peaks.$(OBJEXT) image.$(OBJEXT) geometry.$(OBJEXT) \ - reflections.$(OBJEXT) stream.$(OBJEXT) + reflections.$(OBJEXT) stream.$(OBJEXT) thread-pool.$(OBJEXT) facetron_OBJECTS = $(am_facetron_OBJECTS) facetron_DEPENDENCIES = am_get_hkl_OBJECTS = get_hkl.$(OBJEXT) sfac.$(OBJEXT) cell.$(OBJEXT) \ @@ -298,7 +298,7 @@ calibrate_detector_SOURCES = calibrate_detector.c utils.c hdf5-file.c image.c \ calibrate_detector_LDADD = @LIBS@ facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \ - image.c geometry.c reflections.c stream.c + image.c geometry.c reflections.c stream.c thread-pool.c facetron_LDADD = @LIBS@ cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \ @@ -458,6 +458,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/stream.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/symmetry.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/templates.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/thread-pool.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/utils.Po@am__quote@ .c.o: diff --git a/src/facetron.c b/src/facetron.c index 5d760b8a..3dc7d486 100644 --- a/src/facetron.c +++ b/src/facetron.c @@ -20,8 +20,6 @@ #include <string.h> #include <unistd.h> #include <getopt.h> -#include <pthread.h> -#include <sys/time.h> #include <assert.h> #include "utils.h" @@ -31,30 +29,7 @@ #include "stream.h" #include "geometry.h" #include "peaks.h" - - -#define MAX_THREADS (256) - -struct process_args -{ - struct image *image; - - /* Thread control */ - pthread_mutex_t control_mutex; /* Protects the scary stuff below */ - int start; - int finish; - int done; - - /* Analysis routine */ - void (*func)(struct process_args *); - - /* Analysis parameters */ - const char *sym; - pthread_mutex_t *list_lock; /* Protects 'obs', 'i_full' and 'cts' */ - ReflItemList *obs; - double *i_full; - unsigned int *cts; -}; +#include "thread-pool.h" static void show_help(const char *s) @@ -79,14 +54,38 @@ static void show_help(const char *s) } -static void refine_image(struct process_args *pargs) +struct refine_args { + const char *sym; + ReflItemList *obs; + double *i_full; + struct image *image; +}; + + +static void refine_image(int mytask, void *tasks) +{ + struct refine_args *all_args = tasks; + struct refine_args *pargs = &all_args[mytask]; /* Do, er, something. */ } -static void integrate_image(struct process_args *pargs) +struct integrate_args +{ + const char *sym; + ReflItemList *obs; + double *i_full; + unsigned int *cts; + pthread_mutex_t *list_lock; + struct image *image; +}; + + +static void integrate_image(int mytask, void *tasks) { + struct integrate_args *all_args = tasks; + struct integrate_args *pargs = &all_args[mytask]; struct reflhit *spots; int j, n; struct hdfile *hdfile; @@ -158,177 +157,27 @@ static void integrate_image(struct process_args *pargs) } -static void *worker_thread(void *pargsv) -{ - struct process_args *pargs = pargsv; - int finish; - - do { - - int wakeup; - - /* Acknowledge start */ - pthread_mutex_lock(&pargs->control_mutex); - pargs->start = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - pargs->func(pargs); - - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Go to sleep until told to exit or process next image */ - do { - - pthread_mutex_lock(&pargs->control_mutex); - /* Either of these can result in the thread waking up */ - wakeup = pargs->start || pargs->finish; - finish = pargs->finish; - pthread_mutex_unlock(&pargs->control_mutex); - usleep(20000); - - } while ( !wakeup ); - - } while ( !pargs->finish ); - - return NULL; -} - - -static void munch_threads(struct image *images, int n_total_patterns, - struct detector *det, const char *sym, - ReflItemList *obs, double *i_full, unsigned int *cts, - int nthreads, void (*func)(struct process_args *), - const char *text) +static void refine_all(struct image *images, int n_total_patterns, + struct detector *det, const char *sym, + ReflItemList *obs, double *i_full, int nthreads) { - pthread_t workers[MAX_THREADS]; - struct process_args *worker_args[MAX_THREADS]; - pthread_mutex_t list_lock = PTHREAD_MUTEX_INITIALIZER; - int worker_active[MAX_THREADS]; + struct refine_args *tasks; int i; - int n_done = 0; - int n_started = 0; - - /* Initialise worker arguments with the unchanging data */ - for ( i=0; i<nthreads; i++ ) { - - worker_args[i] = malloc(sizeof(struct process_args)); - worker_active[i] = 0; - pthread_mutex_init(&worker_args[i]->control_mutex, NULL); - worker_args[i]->sym = sym; - worker_args[i]->obs = obs; - worker_args[i]->i_full = i_full; - worker_args[i]->cts = cts; - worker_args[i]->list_lock = &list_lock; - worker_args[i]->func = func; - - } - - /* Start threads off */ - for ( i=0; i<nthreads; i++ ) { - - struct process_args *pargs; - int r; - - if ( n_started == n_total_patterns ) break; - - pargs = worker_args[i]; - pargs->image = &images[n_started++]; - - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pargs->finish = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, worker_thread, pargs); - if ( r != 0 ) { - worker_active[i] = 0; - ERROR("Couldn't start thread %i\n", i); - } - - } - - /* Keep threads busy until the end of the data */ - do { - - int i; - - for ( i=0; i<nthreads; i++ ) { - - struct process_args *pargs; - int done; - - /* Spend time working, not managing threads */ - usleep(100000); - - /* Are we using this thread record at all? */ - if ( !worker_active[i] ) continue; - - /* Has the thread finished yet? */ - pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - done = pargs->done; - pthread_mutex_unlock(&pargs->control_mutex); - if ( !done ) continue; - - /* Reset "done" flag */ - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - n_done++; - progress_bar(n_done, n_total_patterns, text); - /* If there are no more patterns, "done" will remain - * zero, so the last pattern will not be re-counted. */ - if ( n_started == n_total_patterns ) break; - - /* Start work on the next pattern */ - pargs->image = &images[n_started++]; - pthread_mutex_lock(&pargs->control_mutex); - pargs->start = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - } - - } while ( n_started < n_total_patterns ); - - /* Join threads */ - for ( i=0; i<nthreads; i++ ) { - - if ( !worker_active[i] ) continue; - - /* Tell the thread to exit */ - struct process_args *pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - pargs->finish = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Wait for it to join */ - pthread_join(workers[i], NULL); - - if ( pargs->done ) { - n_done++; - progress_bar(n_done, n_total_patterns, text); - } /* else this thread was not busy */ + tasks = malloc(n_total_patterns * sizeof(struct refine_args)); + for ( i=0; i<n_total_patterns; i++ ) { - } + tasks[i].sym = sym; + tasks[i].obs = obs; + tasks[i].i_full = i_full; + tasks[i].image = &images[i]; - for ( i=0; i<nthreads; i++ ) { - free(worker_args[i]); } -} + munch_threads(n_total_patterns, nthreads, "Refining", + refine_image, tasks); -static void refine_all(struct image *images, int n_total_patterns, - struct detector *det, const char *sym, - ReflItemList *obs, double *i_full, int nthreads) -{ - munch_threads(images, n_total_patterns, det, sym, obs, i_full, NULL, - nthreads, refine_image, "Refining"); + free(tasks); } @@ -338,12 +187,28 @@ static void estimate_full(struct image *images, int n_total_patterns, { int i; unsigned int *cts; + struct integrate_args *tasks; + pthread_mutex_t list_lock = PTHREAD_MUTEX_INITIALIZER; cts = new_list_count(); clear_items(obs); - munch_threads(images, n_total_patterns, det, sym, obs, i_full, cts, - nthreads, integrate_image, "Integrating"); + tasks = malloc(n_total_patterns * sizeof(struct integrate_args)); + for ( i=0; i<n_total_patterns; i++ ) { + + tasks[i].sym = sym; + tasks[i].obs = obs; + tasks[i].i_full = i_full; + tasks[i].cts = cts; + tasks[i].list_lock = &list_lock; + tasks[i].image = &images[i]; + + } + + munch_threads(n_total_patterns, nthreads, "Integrating", + integrate_image, tasks); + + free(tasks); /* Divide the totals to get the means */ for ( i=0; i<num_items(obs); i++ ) { diff --git a/src/thread-pool.c b/src/thread-pool.c new file mode 100644 index 00000000..b0d3d8c2 --- /dev/null +++ b/src/thread-pool.c @@ -0,0 +1,123 @@ +/* + * thread-pool.c + * + * A thread pool implementation + * + * (c) 2006-2010 Thomas White <taw@physics.org> + * + * Part of CrystFEL - crystallography with a FEL + * + */ + + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + + +#include <stdarg.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <pthread.h> + +#include "utils.h" + + +struct task_queue +{ + pthread_mutex_t lock; + + int n_tasks; + int *done; + int n_done; + + void (*work)(int, void *); + void *work_args; + + const char *text; +}; + + +static void *worker_thread(void *pargsv) +{ + struct task_queue *q = pargsv; + + do { + + int i; + int found = 0; + int mytask = -1; + + /* Get a task */ + pthread_mutex_lock(&q->lock); + for ( i=0; i<q->n_tasks; i++ ) { + if ( q->done[i] == 0 ) { + mytask = i; + found = 1; + } + } + pthread_mutex_unlock(&q->lock); + + /* No more tasks? */ + if ( !found ) break; + + q->work(mytask, q->work_args); + + /* Mark this task as done, update totals etc */ + pthread_mutex_lock(&q->lock); + q->done[mytask] = 1; + q->n_done++; + progress_bar(q->n_done, q->n_tasks, q->text); + pthread_mutex_unlock(&q->lock); + + } while ( 1 ); + + return NULL; +} + + +void munch_threads(int n_tasks, int n_threads, const char *text, + void (*work)(int, void *), void *work_args) +{ + pthread_t *workers; + int i; + struct task_queue q; + + /* The nation of CrystFEL prides itself on having 0% unemployment. */ + if ( n_threads > n_tasks ) n_threads = n_tasks; + + workers = malloc(n_threads * sizeof(pthread_t)); + + q.done = malloc(n_tasks * sizeof(int)); + pthread_mutex_init(&q.lock, NULL); + q.n_tasks = n_tasks; + q.work = work; + q.work_args = work_args; + q.n_done = 0; + q.text = text; + + for ( i=0; i<n_tasks; i++ ) { + q.done[i] = 0; + } + + /* Start threads */ + for ( i=0; i<n_threads; i++ ) { + + if ( pthread_create(&workers[i], NULL, worker_thread, &q) ) { + ERROR("Couldn't start thread %i\n", i); + n_threads = i; + break; + } + + } + + /* Join threads */ + for ( i=0; i<n_threads; i++ ) { + pthread_join(workers[i], NULL); + } + + free(q.done); + free(workers); +} diff --git a/src/thread-pool.h b/src/thread-pool.h new file mode 100644 index 00000000..4439f970 --- /dev/null +++ b/src/thread-pool.h @@ -0,0 +1,25 @@ +/* + * thread-pool.h + * + * A thread pool implementation + * + * (c) 2006-2010 Thomas White <taw@physics.org> + * + * Part of CrystFEL - crystallography with a FEL + * + */ + + +#ifndef THREAD_POOL_H +#define THREAD_POOL_H + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + + +extern void munch_threads(int n_tasks, int n_threads, const char *text, + void (*work)(int, void *), void *work_args); + + +#endif /* THREAD_POOL_H */ |