diff options
author | Thomas White <taw@bitwiz.org.uk> | 2010-10-10 21:23:33 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2012-02-22 15:27:02 +0100 |
commit | e05bf79a5d1e1c1c2f55164485704d7bc3a8ec95 (patch) | |
tree | 1c16586d4928c9571b3ebcb58fe3d3507ad330e9 | |
parent | 33d4fbfbbee784f734632e543a75222f38bc807d (diff) |
cubeit: Use new thread pool
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/Makefile.in | 4 | ||||
-rw-r--r-- | src/cubeit.c | 355 |
3 files changed, 135 insertions, 226 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index ee20d0b3..6517a67d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -61,7 +61,7 @@ facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \ facetron_LDADD = @LIBS@ cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \ - filters.c image.c symmetry.c stream.c + filters.c image.c symmetry.c stream.c thread-pool.c cubeit_LDADD = @LIBS@ reintegrate_SOURCES = reintegrate.c cell.c hdf5-file.c utils.c detector.c \ diff --git a/src/Makefile.in b/src/Makefile.in index c7ed213a..7bf59480 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -67,7 +67,7 @@ compare_hkl_DEPENDENCIES = am_cubeit_OBJECTS = cubeit.$(OBJEXT) cell.$(OBJEXT) \ hdf5-file.$(OBJEXT) utils.$(OBJEXT) detector.$(OBJEXT) \ render.$(OBJEXT) filters.$(OBJEXT) image.$(OBJEXT) \ - symmetry.$(OBJEXT) stream.$(OBJEXT) + symmetry.$(OBJEXT) stream.$(OBJEXT) thread-pool.$(OBJEXT) cubeit_OBJECTS = $(am_cubeit_OBJECTS) cubeit_DEPENDENCIES = am_facetron_OBJECTS = facetron.$(OBJEXT) cell.$(OBJEXT) \ @@ -302,7 +302,7 @@ facetron_SOURCES = facetron.c cell.c hdf5-file.c utils.c detector.c peaks.c \ facetron_LDADD = @LIBS@ cubeit_SOURCES = cubeit.c cell.c hdf5-file.c utils.c detector.c render.c \ - filters.c image.c symmetry.c stream.c + filters.c image.c symmetry.c stream.c thread-pool.c cubeit_LDADD = @LIBS@ reintegrate_SOURCES = reintegrate.c cell.c hdf5-file.c utils.c detector.c \ diff --git a/src/cubeit.c b/src/cubeit.c index 4bc466a6..ef892340 100644 --- a/src/cubeit.c +++ b/src/cubeit.c @@ -20,11 +20,10 @@ #include <string.h> #include <unistd.h> #include <getopt.h> -#include <pthread.h> -#include <sys/time.h> #include <assert.h> #include <png.h> #include <fenv.h> +#include <pthread.h> #include "utils.h" #include "hdf5-file.h" @@ -32,30 +31,28 @@ #include "render.h" #include "symmetry.h" #include "stream.h" +#include "thread-pool.h" -#define MAX_THREADS (256) - -struct process_args +struct static_sum_args { - char *filename; - int id; - - /* Thread control */ - pthread_mutex_t control_mutex; /* Protects the scary stuff below */ - int start; - int finish; - int done; - - UnitCell *cell; - pthread_mutex_t vals_mutex; /* Protects "vals" */ + pthread_mutex_t *vals_mutex; /* Protects "vals" */ double *vals; int xs; int ys; int zs; int config_angles; - pthread_mutex_t angles_mutex; /* Protects "angles" */ + pthread_mutex_t *angles_mutex; /* Protects "angles" */ unsigned int *angles; + + pthread_mutex_t *cell_mutex; /* Protects "angles" */ + double *as; + double *bs; + double *cs; + double *als; + double *bes; + double *gas; + struct detector *det; signed int ht; signed int kt; @@ -64,6 +61,25 @@ struct process_args }; +struct sum_args +{ + char *filename; + UnitCell *cell; + + struct static_sum_args static_args; +}; + + +struct queue_args +{ + FILE *fh; + char *prefix; + int config_basename; + + struct static_sum_args static_args; +}; + + static void show_help(const char *s) { printf("Syntax: %s [options]\n\n", s); @@ -82,6 +98,24 @@ static void show_help(const char *s) } +static void add_to_mean(UnitCell *cell, double *ast, double *bst, double *cst, + double *alst, double *best, double *gast) +{ + double asx, asy, asz; + double bsx, bsy, bsz; + double csx, csy, csz; + + cell_get_reciprocal(cell, &asx, &asy, &asz, &bsx, &bsy, &bsz, + &csx, &csy, &csz); + *ast += modulus(asx, asy, asz); + *bst += modulus(bsx, bsy, bsz); + *cst += modulus(csx, csy, csz); + *alst += angle_between(bsx, bsy, bsz, csx, csy, csz); + *best += angle_between(asx, asy, asz, csx, csy, csz); + *gast += angle_between(asx, asy, asz, bsx, bsy, bsz); +} + + static void interpolate_linear(double *vals, double v, int xs, int ys, int zs, int xv, int yv, double zv) @@ -166,8 +200,10 @@ static void interpolate_onto_grid(double *vals, double v, } -static void process_image(struct process_args *pargs) +static void sum_image(void *pg) { + struct sum_args *apargs = pg; + struct static_sum_args *pargs = &apargs->static_args; struct hdfile *hdfile; struct image image; double ax, ay, az; @@ -179,8 +215,7 @@ static void process_image(struct process_args *pargs) image.data = NULL; image.flags = NULL; image.indexed_cell = NULL; - image.id = pargs->id; - image.filename = pargs->filename; + image.filename = apargs->filename; image.hits = NULL; image.n_hits = 0; image.det = pargs->det; @@ -191,9 +226,9 @@ static void process_image(struct process_args *pargs) image.orientation.y = 0.0; image.orientation.z = 0.0; - STATUS("Processing '%s'\n", pargs->filename); + STATUS("Processing '%s'\n", apargs->filename); - hdfile = hdfile_open(pargs->filename); + hdfile = hdfile_open(apargs->filename); if ( hdfile == NULL ) { return; } else if ( hdfile_set_first_image(hdfile, "/") ) { @@ -204,7 +239,7 @@ static void process_image(struct process_args *pargs) hdf5_read(hdfile, &image, 1); - cell_get_cartesian(pargs->cell, &ax, &ay, &az, &bx, &by, + cell_get_cartesian(apargs->cell, &ax, &ay, &az, &bx, &by, &bz, &cx, &cy, &cz); fesetround(1); /* Round towards nearest */ @@ -243,10 +278,10 @@ static void process_image(struct process_args *pargs) double v = image.data[x+image.width*y]; - pthread_mutex_lock(&pargs->vals_mutex); + pthread_mutex_lock(pargs->vals_mutex); interpolate_onto_grid(pargs->vals, v, pargs->xs, pargs->ys, pargs->zs, dh, dk, dl); - pthread_mutex_unlock(&pargs->vals_mutex); + pthread_mutex_unlock(pargs->vals_mutex); } } @@ -259,56 +294,30 @@ static void process_image(struct process_args *pargs) double ang; int bin; - cell_get_reciprocal(pargs->cell, &asx, &asy, &asz, - &bsx, &bsy, &bsz, - &csx, &csy, &csz); + cell_get_reciprocal(apargs->cell, &asx, &asy, &asz, + &bsx, &bsy, &bsz, + &csx, &csy, &csz); ang = angle_between(csx, csy, csz, 0.0, 0.0, 1.0); ang = rad2deg(ang); /* 0->180 deg */ bin = rint(ang); - pthread_mutex_lock(&pargs->vals_mutex); + pthread_mutex_lock(pargs->angles_mutex); pargs->angles[bin]++; - pthread_mutex_unlock(&pargs->vals_mutex); + pthread_mutex_unlock(pargs->angles_mutex); } + pthread_mutex_lock(pargs->cell_mutex); + add_to_mean(apargs->cell, pargs->as, pargs->bs, pargs->cs, + pargs->als, pargs->bes, pargs->gas); + pthread_mutex_unlock(pargs->cell_mutex); + free(image.data); - cell_free(pargs->cell); + cell_free(apargs->cell); if ( image.flags != NULL ) free(image.flags); hdfile_close(hdfile); -} - - -static void *worker_thread(void *pargsv) -{ - struct process_args *pargs = pargsv; - int finish; - - do { - - int wakeup; - - process_image(pargs); - - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 1; - pargs->start = 0; - pthread_mutex_unlock(&pargs->control_mutex); - /* Go to sleep until told to exit or process next image */ - do { - - pthread_mutex_lock(&pargs->control_mutex); - /* Either of these can result in the thread waking up */ - wakeup = pargs->start || pargs->finish; - finish = pargs->finish; - pthread_mutex_unlock(&pargs->control_mutex); - usleep(20000); - - } while ( !wakeup ); - - } while ( !pargs->finish ); - - return NULL; + free(apargs->filename); + free(pargs); } @@ -384,21 +393,35 @@ static void write_slice(const char *filename, double *vals, int z, } -static void add_to_mean(UnitCell *cell, double *ast, double *bst, double *cst, - double *alst, double *best, double *gast) +static void *get_image(void *qp) { - double asx, asy, asz; - double bsx, bsy, bsz; - double csx, csy, csz; + struct sum_args *pargs; + struct queue_args *qargs = qp; + UnitCell *cell; + char *filename; - cell_get_reciprocal(cell, &asx, &asy, &asz, &bsx, &bsy, &bsz, - &csx, &csy, &csz); - *ast += modulus(asx, asy, asz); - *bst += modulus(bsx, bsy, bsz); - *cst += modulus(csx, csy, csz); - *alst += angle_between(bsx, bsy, bsz, csx, csy, csz); - *best += angle_between(asx, asy, asz, csx, csy, csz); - *gast += angle_between(asx, asy, asz, bsx, bsy, bsz); + /* Get the next filename */ + if ( find_chunk(qargs->fh, &cell, &filename) ) { + return NULL; + } + + pargs = malloc(sizeof(struct sum_args)); + + if ( qargs->config_basename ) { + char *tmp; + tmp = strdup(basename(filename)); + free(filename); + filename = tmp; + } + + memcpy(&pargs->static_args, &qargs->static_args, + sizeof(struct static_sum_args)); + + pargs->cell = cell; + pargs->filename = malloc(1024); + snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, filename); + + return pargs; } @@ -408,13 +431,9 @@ int main(int argc, char *argv[]) char *infile = NULL; char *geomfile = NULL; FILE *fh; - int rval; int n_images; char *prefix = NULL; int nthreads = 1; - pthread_t workers[MAX_THREADS]; - struct process_args *worker_args[MAX_THREADS]; - int worker_active[MAX_THREADS]; int config_basename = 0; int config_checkprefix = 1; struct detector *det; @@ -425,7 +444,16 @@ int main(int argc, char *argv[]) int config_angles = 0; signed int ht, kt, lt; char *sym = NULL; - double as, bs, cs, als, bes, gas; + struct queue_args qargs; + pthread_mutex_t vals_mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_t angles_mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_t cell_mutex = PTHREAD_MUTEX_INITIALIZER; + double as; + double bs; + double cs; + double als; + double bes; + double gas; /* Long options */ const struct option longopts[] = { @@ -509,7 +537,7 @@ int main(int argc, char *argv[]) /* Initialise shape transform array */ vals = calloc(gs*gs*gs, sizeof(double)); - if ( (nthreads == 0) || (nthreads > MAX_THREADS) ) { + if ( nthreads == 0 ) { ERROR("Invalid number of threads.\n"); return 1; } @@ -519,149 +547,30 @@ int main(int argc, char *argv[]) as = 0.0; bs = 0.0; cs = 0.0; als = 0.0; bes = 0.0; gas = 0.0; - /* Initialise worker arguments */ - for ( i=0; i<nthreads; i++ ) { - - worker_args[i] = malloc(sizeof(struct process_args)); - worker_args[i]->filename = malloc(1024); - worker_active[i] = 0; - worker_args[i]->xs = gs; - worker_args[i]->ys = gs; - worker_args[i]->zs = gs; - worker_args[i]->config_angles = config_angles; - worker_args[i]->vals = vals; - worker_args[i]->angles = angles; - worker_args[i]->det = det; - pthread_mutex_init(&worker_args[i]->control_mutex, NULL); - pthread_mutex_init(&worker_args[i]->vals_mutex, NULL); - pthread_mutex_init(&worker_args[i]->angles_mutex, NULL); - worker_args[i]->ht = ht; - worker_args[i]->kt = kt; - worker_args[i]->lt = lt; - worker_args[i]->sym = sym; - - } + qargs.static_args.xs = gs; + qargs.static_args.ys = gs; + qargs.static_args.zs = gs; + qargs.static_args.config_angles = config_angles; + qargs.static_args.vals = vals; + qargs.static_args.angles = angles; + qargs.static_args.det = det; + qargs.static_args.vals_mutex = &vals_mutex; + qargs.static_args.angles_mutex = &angles_mutex; + qargs.static_args.ht = ht; + qargs.static_args.kt = kt; + qargs.static_args.lt = lt; + qargs.static_args.sym = sym; + qargs.static_args.cell_mutex = &cell_mutex; + qargs.static_args.as = &as; + qargs.static_args.bs = &bs; + qargs.static_args.cs = &cs; + qargs.static_args.als = &als; + qargs.static_args.bes = &bes; + qargs.static_args.gas = &gas; n_images = 0; - /* Start threads off */ - for ( i=0; i<nthreads; i++ ) { - - struct process_args *pargs; - int r; - int rval; - char *filename; - UnitCell *cell; - - pargs = worker_args[i]; - - /* Get the next filename */ - rval = find_chunk(fh, &cell, &filename); - if ( rval == 1 ) break; - add_to_mean(cell, &as, &bs, &cs, &als, &bes, &gas); - if ( config_basename ) { - char *tmp; - tmp = strdup(basename(filename)); - free(filename); - filename = tmp; - } - snprintf(pargs->filename, 1023, "%s%s", - prefix, filename); - pargs->cell = cell; - free(filename); - - n_images++; - - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pargs->finish = 0; - pthread_mutex_unlock(&pargs->control_mutex); - - worker_active[i] = 1; - r = pthread_create(&workers[i], NULL, worker_thread, pargs); - if ( r != 0 ) { - worker_active[i] = 0; - ERROR("Couldn't start thread %i\n", i); - } - - } - - /* Keep threads busy until the end of the data */ - do { - - int i; - rval = 0; - - for ( i=0; i<nthreads; i++ ) { - - struct process_args *pargs; - int done; - char *filename; - UnitCell *cell; - - /* Spend time working, not managing threads */ - usleep(100000); - - /* Are we using this thread record at all? */ - if ( !worker_active[i] ) continue; - - /* Has the thread finished yet? */ - pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - done = pargs->done; - pthread_mutex_unlock(&pargs->control_mutex); - if ( !done ) continue; - - /* Get the next filename */ - rval = find_chunk(fh, &cell, &filename); - if ( rval == 1 ) break; - add_to_mean(cell, &as, &bs, &cs, &als, &bes, &gas); - if ( config_basename ) { - char *tmp; - tmp = strdup(basename(filename)); - free(filename); - filename = tmp; - } - snprintf(pargs->filename, 1023, "%s%s", - prefix, filename); - pargs->cell = cell; - free(filename); - - n_images++; - - STATUS("Done %i images\n", n_images); - - /* Wake the thread up ... */ - pthread_mutex_lock(&pargs->control_mutex); - pargs->done = 0; - pargs->start = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - } - - } while ( rval == 0 ); - - /* Join threads */ - for ( i=0; i<nthreads; i++ ) { - - if ( !worker_active[i] ) goto free; - - /* Tell the thread to exit */ - struct process_args *pargs = worker_args[i]; - pthread_mutex_lock(&pargs->control_mutex); - pargs->finish = 1; - pthread_mutex_unlock(&pargs->control_mutex); - - /* Wait for it to join */ - pthread_join(workers[i], NULL); - - free: - if ( worker_args[i]->filename != NULL ) { - free(worker_args[i]->filename); - } - - } + run_threads(nthreads, sum_image, get_image, &qargs, 0); fclose(fh); |