aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/cubeit.c3
-rw-r--r--src/indexamajig.c59
-rw-r--r--src/partialator.c2
-rw-r--r--src/sum_stack.c2
-rw-r--r--src/thread-pool.c66
-rw-r--r--src/thread-pool.h6
6 files changed, 103 insertions, 35 deletions
diff --git a/src/cubeit.c b/src/cubeit.c
index f1f6286d..33442ac3 100644
--- a/src/cubeit.c
+++ b/src/cubeit.c
@@ -570,7 +570,8 @@ int main(int argc, char *argv[])
qargs.static_args.bes = &bes;
qargs.static_args.gas = &gas;
- n_images = run_threads(nthreads, sum_image, get_image, NULL, &qargs, 0);
+ n_images = run_threads(nthreads, sum_image, get_image, NULL, &qargs, 0,
+ 0, 0, 0);
fclose(fh);
diff --git a/src/indexamajig.c b/src/indexamajig.c
index e7406939..71fa98d1 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -191,6 +191,11 @@ static void show_help(const char *s)
" lattice point.\n"
" --insane Don't check that the reduced cell accounts for at\n"
" least 10%% of the located peaks.\n"
+"\n"
+"You can tune the CPU affinities for enhanced performance on NUMA machines:\n"
+" --cpus=<n> Specify number of CPUs.\n"
+" --cpugroup=<n> Batch threads in groups of this size.\n"
+" --cpuoffset=<n> Start using CPUs at this group number.\n"
);
}
@@ -468,6 +473,10 @@ int main(int argc, char *argv[])
double nominal_photon_energy;
int stream_flags = STREAM_INTEGRATED;
struct timespec tp;
+ int cpu_num = 0;
+ int cpu_groupsize = 1;
+ int cpu_offset = 0;
+ char *endptr;
/* Long options */
const struct option longopts[] = {
@@ -496,6 +505,9 @@ int main(int argc, char *argv[])
{"image", 1, NULL, 'e'},
{"basename", 0, &config_basename, 1},
{"record", 1, NULL, 5},
+ {"cpus", 1, NULL, 6},
+ {"cpugroup", 1, NULL, 7},
+ {"cpuoffset", 1, NULL, 8},
{0, 0, NULL, 0}
};
@@ -570,6 +582,42 @@ int main(int argc, char *argv[])
if ( stream_flags < 0 ) return 1;
break;
+ case 6 :
+ cpu_num = strtol(optarg, &endptr, 10);
+ if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) {
+ ERROR("Invalid number of CPUs ('%s')\n",
+ optarg);
+ return 1;
+ }
+ break;
+
+ case 7 :
+ cpu_groupsize = strtol(optarg, &endptr, 10);
+ if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) {
+ ERROR("Invalid CPU group size ('%s')\n",
+ optarg);
+ return 1;
+ }
+ if ( cpu_groupsize < 1 ) {
+ ERROR("CPU group size cannot be"
+ " less than 1.\n");
+ return 1;
+ }
+ break;
+
+ case 8 :
+ cpu_offset = strtol(optarg, &endptr, 10);
+ if ( !( (optarg[0] != '\0') && (endptr[0] == '\0') ) ) {
+ ERROR("Invalid CPU offset ('%s')\n",
+ optarg);
+ return 1;
+ }
+ if ( cpu_offset < 0 ) {
+ ERROR("CPU offset must be positive.\n");
+ return 1;
+ }
+ break;
+
case 0 :
break;
@@ -579,6 +627,13 @@ int main(int argc, char *argv[])
}
+ if ( (cpu_num > 0) && (cpu_num % cpu_groupsize != 0) ) {
+ ERROR("Number of CPUs must be divisible by"
+ " the CPU group size.\n");
+ return 1;
+ }
+
+
if ( filename == NULL ) {
filename = strdup("-");
}
@@ -792,9 +847,9 @@ int main(int argc, char *argv[])
clock_gettime(CLOCK_REALTIME, &tp);
qargs.t_last_stats = tp.tv_sec;
-
n_images = run_threads(nthreads, process_image, get_image,
- finalise_image, &qargs, 0);
+ finalise_image, &qargs, 0,
+ cpu_num, cpu_groupsize, cpu_offset);
cleanup_indexing(ipriv);
diff --git a/src/partialator.c b/src/partialator.c
index 664afd50..648b8f58 100644
--- a/src/partialator.c
+++ b/src/partialator.c
@@ -102,7 +102,7 @@ static void refine_all(struct image *images, int n_total_patterns,
}
run_thread_range(n_total_patterns, nthreads, "Refining",
- refine_image, tasks);
+ refine_image, tasks, 0, 0, 0);
free(tasks);
}
diff --git a/src/sum_stack.c b/src/sum_stack.c
index 84e3fd64..8d00a87f 100644
--- a/src/sum_stack.c
+++ b/src/sum_stack.c
@@ -457,7 +457,7 @@ int main(int argc, char *argv[])
do {
n_done = run_threads(nthreads, add_image, get_image,
- (void *)&qargs, NULL, chunk_size);
+ (void *)&qargs, NULL, chunk_size, 0, 0, 0);
n_images += n_done;
diff --git a/src/thread-pool.c b/src/thread-pool.c
index d672062c..7b6a8ba1 100644
--- a/src/thread-pool.c
+++ b/src/thread-pool.c
@@ -35,22 +35,33 @@
#ifdef HAVE_CPU_AFFINITY
-static int next_cpu(int cur)
+static void set_affinity(int n, int cpu_num, int cpu_groupsize, int cpu_offset)
{
- cur++;
+ cpu_set_t c;
+ int group;
+ int n_cpu_groups;
+ int i;
- if ( cur == 73 ) cur = 0;
+ if ( cpu_num == 0 ) return;
- return cur;
-}
+ CPU_ZERO(&c);
+ /* Work out which group this thread belongs to */
+ group = (n / cpu_groupsize) + cpu_offset;
-static void set_affinity(int cpu)
-{
- cpu_set_t c;
+ /* Work out which CPUs should be used for this group */
+ n_cpu_groups = cpu_num / cpu_groupsize;
+ group = group % n_cpu_groups;
+
+ /* Set flags */
+ for ( i=0; i<cpu_groupsize; i++ ) {
+
+ int cpu = cpu_groupsize*group + i;
+
+ CPU_SET(cpu, &c);
+
+ }
- CPU_ZERO(&c);
- CPU_SET(cpu, &c);
if ( sched_setaffinity(0, sizeof(cpu_set_t), &c) ) {
/* Cannot use ERROR() just yet */
@@ -58,23 +69,18 @@ static void set_affinity(int cpu)
} else {
- fprintf(stderr, "Successfully set CPU affinity to %i\n", cpu);
+ fprintf(stderr, "Successfully set CPU affinity.\n");
}
}
#else /* HAVE_CPU_AFFINITY */
-static int next_cpu(int cur)
+static void set_affinity(int n, int cpu_num, int cpu_groupsize, int cpu_offset)
{
return 0;
}
-
-static void set_affinity(int cpu)
-{
-}
-
#endif /* HAVE_CPU_AFFINITY */
@@ -89,7 +95,9 @@ struct worker_args
struct task_queue_range *tqr;
struct task_queue *tq;
int id;
- int cpu;
+ int cpu_num;
+ int cpu_groupsize;
+ int cpu_offset;
};
@@ -136,7 +144,7 @@ static void *range_worker(void *pargsv)
struct task_queue_range *q = w->tqr;
int *cookie;
- set_affinity(w->cpu);
+ set_affinity(w->id, w->cpu_num, w->cpu_groupsize, w->cpu_offset);
cookie = malloc(sizeof(int));
*cookie = w->id;
@@ -185,11 +193,11 @@ static void *range_worker(void *pargsv)
void run_thread_range(int n_tasks, int n_threads, const char *text,
- void (*work)(int, void *), void *work_args)
+ void (*work)(int, void *), void *work_args,
+ int cpu_num, int cpu_groupsize, int cpu_offset)
{
pthread_t *workers;
int i;
- int cpu = 0;
struct task_queue_range q;
/* The nation of CrystFEL prides itself on having 0% unemployment. */
@@ -224,8 +232,9 @@ void run_thread_range(int n_tasks, int n_threads, const char *text,
w->tqr = &q;
w->tq = NULL;
w->id = i;
- w->cpu = cpu;
- cpu = next_cpu(cpu);
+ w->cpu_num = cpu_num;
+ w->cpu_groupsize = cpu_groupsize;
+ w->cpu_offset = cpu_offset;
if ( pthread_create(&workers[i], NULL, range_worker, w) ) {
/* Not ERROR() here */
@@ -271,7 +280,7 @@ static void *task_worker(void *pargsv)
struct task_queue *q = w->tq;
int *cookie;
- set_affinity(w->cpu);
+ set_affinity(w->id, w->cpu_num, w->cpu_groupsize, w->cpu_offset);
cookie = malloc(sizeof(int));
*cookie = w->id;
@@ -322,12 +331,12 @@ static void *task_worker(void *pargsv)
int run_threads(int n_threads, void (*work)(void *, int),
void *(*get_task)(void *), void (*final)(void *, void *),
- void *queue_args, int max)
+ void *queue_args, int max,
+ int cpu_num, int cpu_groupsize, int cpu_offset)
{
pthread_t *workers;
int i;
struct task_queue q;
- int cpu = 0;
pthread_key_create(&status_label_key, NULL);
@@ -355,8 +364,9 @@ int run_threads(int n_threads, void (*work)(void *, int),
w->tq = &q;
w->tqr = NULL;
w->id = i;
- w->cpu = cpu;
- cpu = next_cpu(cpu);
+ w->cpu_num = cpu_num;
+ w->cpu_groupsize = cpu_groupsize;
+ w->cpu_offset = cpu_offset;
if ( pthread_create(&workers[i], NULL, task_worker, w) ) {
/* Not ERROR() here */
diff --git a/src/thread-pool.h b/src/thread-pool.h
index fefcef4a..04a9e19b 100644
--- a/src/thread-pool.h
+++ b/src/thread-pool.h
@@ -28,7 +28,8 @@ extern signed int get_status_label(void);
* unique and in the range 0..n_tasks. A progress bar will be shown using
* "text" and the progress through the tasks, unless "text" is NULL. */
extern void run_thread_range(int n_tasks, int n_threads, const char *text,
- void (*work)(int, void *), void *work_args);
+ void (*work)(int, void *), void *work_args,
+ int cpu_num, int cpu_groupsize, int cpu_offset);
/* get_task() will be called every time a worker is idle. It returns either
@@ -41,7 +42,8 @@ extern void run_thread_range(int n_tasks, int n_threads, const char *text,
* Returns: the number of tasks processed. */
extern int run_threads(int n_threads, void (*work)(void *, int),
void *(*get_task)(void *), void (*final)(void *, void *),
- void *queue_args, int max);
+ void *queue_args, int max,
+ int cpu_num, int cpu_groupsize, int cpu_offset);
#endif /* THREAD_POOL_H */