diff options
-rw-r--r-- | src/cubeit.c | 3 | ||||
-rw-r--r-- | src/indexamajig.c | 59 | ||||
-rw-r--r-- | src/partialator.c | 2 | ||||
-rw-r--r-- | src/sum_stack.c | 2 | ||||
-rw-r--r-- | src/thread-pool.c | 66 | ||||
-rw-r--r-- | src/thread-pool.h | 6 |
6 files changed, 103 insertions, 35 deletions
diff --git a/src/cubeit.c b/src/cubeit.c index f1f6286d..33442ac3 100644 --- a/src/cubeit.c +++ b/src/cubeit.c @@ -570,7 +570,8 @@ int main(int argc, char *argv[]) qargs.static_args.bes = &bes; qargs.static_args.gas = &gas; - n_images = run_threads(nthreads, sum_image, get_image, NULL, &qargs, 0); + n_images = run_threads(nthreads, sum_image, get_image, NULL, &qargs, 0, + 0, 0, 0); fclose(fh); diff --git a/src/indexamajig.c b/src/indexamajig.c index e7406939..71fa98d1 100644 --- a/src/indexamajig.c +++ b/src/indexamajig.c @@ -191,6 +191,11 @@ static void show_help(const char *s) " lattice point.\n" " --insane Don't check that the reduced cell accounts for at\n" " least 10%% of the located peaks.\n" +"\n" +"You can tune the CPU affinities for enhanced performance on NUMA machines:\n" +" --cpus=<n> Specify number of CPUs.\n" +" --cpugroup=<n> Batch threads in groups of this size.\n" +" --cpuoffset=<n> Start using CPUs at this group number.\n" ); } @@ -468,6 +473,10 @@ int main(int argc, char *argv[]) double nominal_photon_energy; int stream_flags = STREAM_INTEGRATED; struct timespec tp; + int cpu_num = 0; + int cpu_groupsize = 1; + int cpu_offset = 0; + char *endptr; /* Long options */ const struct option longopts[] = { @@ -496,6 +505,9 @@ int main(int argc, char *argv[]) {"image", 1, NULL, 'e'}, {"basename", 0, &config_basename, 1}, {"record", 1, NULL, 5}, + {"cpus", 1, NULL, 6}, + {"cpugroup", 1, NULL, 7}, + {"cpuoffset", 1, NULL, 8}, {0, 0, NULL, 0} }; @@ -570,6 +582,42 @@ int main(int argc, char *argv[]) if ( stream_flags < 0 ) return 1; break; + case 6 : + cpu_num = strtol(optarg, &endptr, 10); + if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) { + ERROR("Invalid number of CPUs ('%s')\n", + optarg); + return 1; + } + break; + + case 7 : + cpu_groupsize = strtol(optarg, &endptr, 10); + if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) { + ERROR("Invalid CPU group size ('%s')\n", + optarg); + return 1; + } + if ( cpu_groupsize < 1 ) { + ERROR("CPU group size cannot be" + " less than 1.\n"); + return 1; + } + break; + + case 8 : + cpu_offset = strtol(optarg, &endptr, 10); + if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) { + ERROR("Invalid CPU offset ('%s')\n", + optarg); + return 1; + } + if ( cpu_offset < 0 ) { + ERROR("CPU offset must be positive.\n"); + return 1; + } + break; + case 0 : break; @@ -579,6 +627,13 @@ int main(int argc, char *argv[]) } + if ( (cpu_num > 0) && (cpu_num % cpu_groupsize != 0) ) { + ERROR("Number of CPUs must be divisible by" + " the CPU group size.\n"); + return 1; + } + + if ( filename == NULL ) { filename = strdup("-"); } @@ -792,9 +847,9 @@ int main(int argc, char *argv[]) clock_gettime(CLOCK_REALTIME, &tp); qargs.t_last_stats = tp.tv_sec; - n_images = run_threads(nthreads, process_image, get_image, - finalise_image, &qargs, 0); + finalise_image, &qargs, 0, + cpu_num, cpu_groupsize, cpu_offset); cleanup_indexing(ipriv); diff --git a/src/partialator.c b/src/partialator.c index 664afd50..648b8f58 100644 --- a/src/partialator.c +++ b/src/partialator.c @@ -102,7 +102,7 @@ static void refine_all(struct image *images, int n_total_patterns, } run_thread_range(n_total_patterns, nthreads, "Refining", - refine_image, tasks); + refine_image, tasks, 0, 0, 0); free(tasks); } diff --git a/src/sum_stack.c b/src/sum_stack.c index 84e3fd64..8d00a87f 100644 --- a/src/sum_stack.c +++ b/src/sum_stack.c @@ -457,7 +457,7 @@ int main(int argc, char *argv[]) do { n_done = run_threads(nthreads, add_image, get_image, - (void *)&qargs, NULL, chunk_size); + (void *)&qargs, NULL, chunk_size, 0, 0, 0); n_images += n_done; diff --git a/src/thread-pool.c b/src/thread-pool.c index d672062c..7b6a8ba1 100644 --- a/src/thread-pool.c +++ b/src/thread-pool.c @@ -35,22 +35,33 @@ #ifdef HAVE_CPU_AFFINITY -static int next_cpu(int cur) +static void set_affinity(int n, int cpu_num, int cpu_groupsize, int cpu_offset) { - cur++; + cpu_set_t c; + int group; + int n_cpu_groups; + int i; - if ( cur == 73 ) cur = 0; + if ( cpu_num == 0 ) return; - return cur; -} + CPU_ZERO(&c); + /* Work out which group this thread belongs to */ + group = (n / cpu_groupsize) + cpu_offset; -static void set_affinity(int cpu) -{ - cpu_set_t c; + /* Work out which CPUs should be used for this group */ + n_cpu_groups = cpu_num / cpu_groupsize; + group = group % n_cpu_groups; + + /* Set flags */ + for ( i=0; i<cpu_groupsize; i++ ) { + + int cpu = cpu_groupsize*group + i; + + CPU_SET(cpu, &c); + + } - CPU_ZERO(&c); - CPU_SET(cpu, &c); if ( sched_setaffinity(0, sizeof(cpu_set_t), &c) ) { /* Cannot use ERROR() just yet */ @@ -58,23 +69,18 @@ static void set_affinity(int cpu) } else { - fprintf(stderr, "Successfully set CPU affinity to %i\n", cpu); + fprintf(stderr, "Successfully set CPU affinity.\n"); } } #else /* HAVE_CPU_AFFINITY */ -static int next_cpu(int cur) +static void set_affinity(int n, int cpu_num, int cpu_groupsize, int cpu_offset) { return 0; } - -static void set_affinity(int cpu) -{ -} - #endif /* HAVE_CPU_AFFINITY */ @@ -89,7 +95,9 @@ struct worker_args struct task_queue_range *tqr; struct task_queue *tq; int id; - int cpu; + int cpu_num; + int cpu_groupsize; + int cpu_offset; }; @@ -136,7 +144,7 @@ static void *range_worker(void *pargsv) struct task_queue_range *q = w->tqr; int *cookie; - set_affinity(w->cpu); + set_affinity(w->id, w->cpu_num, w->cpu_groupsize, w->cpu_offset); cookie = malloc(sizeof(int)); *cookie = w->id; @@ -185,11 +193,11 @@ static void *range_worker(void *pargsv) void run_thread_range(int n_tasks, int n_threads, const char *text, - void (*work)(int, void *), void *work_args) + void (*work)(int, void *), void *work_args, + int cpu_num, int cpu_groupsize, int cpu_offset) { pthread_t *workers; int i; - int cpu = 0; struct task_queue_range q; /* The nation of CrystFEL prides itself on having 0% unemployment. */ @@ -224,8 +232,9 @@ void run_thread_range(int n_tasks, int n_threads, const char *text, w->tqr = &q; w->tq = NULL; w->id = i; - w->cpu = cpu; - cpu = next_cpu(cpu); + w->cpu_num = cpu_num; + w->cpu_groupsize = cpu_groupsize; + w->cpu_offset = cpu_offset; if ( pthread_create(&workers[i], NULL, range_worker, w) ) { /* Not ERROR() here */ @@ -271,7 +280,7 @@ static void *task_worker(void *pargsv) struct task_queue *q = w->tq; int *cookie; - set_affinity(w->cpu); + set_affinity(w->id, w->cpu_num, w->cpu_groupsize, w->cpu_offset); cookie = malloc(sizeof(int)); *cookie = w->id; @@ -322,12 +331,12 @@ static void *task_worker(void *pargsv) int run_threads(int n_threads, void (*work)(void *, int), void *(*get_task)(void *), void (*final)(void *, void *), - void *queue_args, int max) + void *queue_args, int max, + int cpu_num, int cpu_groupsize, int cpu_offset) { pthread_t *workers; int i; struct task_queue q; - int cpu = 0; pthread_key_create(&status_label_key, NULL); @@ -355,8 +364,9 @@ int run_threads(int n_threads, void (*work)(void *, int), w->tq = &q; w->tqr = NULL; w->id = i; - w->cpu = cpu; - cpu = next_cpu(cpu); + w->cpu_num = cpu_num; + w->cpu_groupsize = cpu_groupsize; + w->cpu_offset = cpu_offset; if ( pthread_create(&workers[i], NULL, task_worker, w) ) { /* Not ERROR() here */ diff --git a/src/thread-pool.h b/src/thread-pool.h index fefcef4a..04a9e19b 100644 --- a/src/thread-pool.h +++ b/src/thread-pool.h @@ -28,7 +28,8 @@ extern signed int get_status_label(void); * unique and in the range 0..n_tasks. A progress bar will be shown using * "text" and the progress through the tasks, unless "text" is NULL. */ extern void run_thread_range(int n_tasks, int n_threads, const char *text, - void (*work)(int, void *), void *work_args); + void (*work)(int, void *), void *work_args, + int cpu_num, int cpu_groupsize, int cpu_offset); /* get_task() will be called every time a worker is idle. It returns either @@ -41,7 +42,8 @@ extern void run_thread_range(int n_tasks, int n_threads, const char *text, * Returns: the number of tasks processed. */ extern int run_threads(int n_threads, void (*work)(void *, int), void *(*get_task)(void *), void (*final)(void *, void *), - void *queue_args, int max); + void *queue_args, int max, + int cpu_num, int cpu_groupsize, int cpu_offset); #endif /* THREAD_POOL_H */ |