aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2016-12-05 10:10:24 +0100
committerThomas White <taw@physics.org>2017-02-02 11:45:16 +0100
commitc7abdfb3e404a8a238e8352ea5e195f864efefd2 (patch)
treec0c1dd6e68062b1561cb2d97e3a6dd2475d002ae /src
parent526e5ab3d6fd21b8fe3c435f3984866c7e145a84 (diff)
indexamajig: Add ping mechanism to avoid timing out when trying lots of indexers
Diffstat (limited to 'src')
-rw-r--r--src/im-sandbox.c12
-rw-r--r--src/im-sandbox.h5
-rw-r--r--src/process_image.c15
3 files changed, 27 insertions, 5 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 979fc136..df8ed1e2 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -78,6 +78,7 @@ struct sandbox
pid_t *pids;
int *running;
time_t *last_response;
+ int last_ping[MAX_NUM_WORKERS];
/* Streams to read from (NB not the same indices as the above) */
int n_read;
@@ -123,6 +124,7 @@ static time_t get_monotonic_seconds()
static void stamp_response(struct sandbox *sb, int n)
{
sb->last_response[n] = get_monotonic_seconds();
+ sb->last_ping[n] = sb->shared->pings[n];
}
@@ -131,13 +133,20 @@ static void check_hung_workers(struct sandbox *sb)
int i;
time_t tnow = get_monotonic_seconds();
for ( i=0; i<sb->n_proc; i++ ) {
+
if ( !sb->running[i] ) continue;
+
+ if ( sb->shared->pings[i] != sb->last_ping[i] ) {
+ stamp_response(sb, i);
+ }
+
if ( tnow - sb->last_response[i] > 240 ) {
STATUS("Worker %i did not respond for 240 seconds - "
"sending it SIGKILL.\n", i);
kill(sb->pids[i], SIGKILL);
stamp_response(sb, i);
}
+
}
}
@@ -534,6 +543,9 @@ static void start_worker_process(struct sandbox *sb, int slot)
return;
}
+ sb->shared->pings[slot] = 0;
+ sb->last_ping[slot] = 0;
+
p = fork();
if ( p == -1 ) {
ERROR("fork() failed!\n");
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index b39f9566..b58e4b9e 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -3,13 +3,13 @@
*
* Sandbox for indexing
*
- * Copyright © 2012-2015 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2017 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2015 Thomas White <taw@physics.org>
+ * 2010-2017 Thomas White <taw@physics.org>
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
@@ -61,6 +61,7 @@ struct sb_shm
char queue[QUEUE_SIZE][MAX_EV_LEN];
int no_more;
char last_ev[MAX_NUM_WORKERS][MAX_EV_LEN];
+ int pings[MAX_NUM_WORKERS];
pthread_mutex_t totals_lock;
int n_processed;
diff --git a/src/process_image.c b/src/process_image.c
index 62ef732c..bcaee543 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -3,11 +3,11 @@
*
* The processing pipeline for one image
*
- * Copyright © 2012-2015 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2017 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
*
* Authors:
- * 2010-2015 Thomas White <taw@physics.org>
+ * 2010-2017 Thomas White <taw@physics.org>
* 2014 Valerio Mariani
*
* This file is part of CrystFEL.
@@ -124,6 +124,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
image.indexed_by = INDEXING_NONE;
time_accounts_set(taccs, TACC_HDF5OPEN);
+ sb_shared->pings[cookie]++;
hdfile = hdfile_open(image.filename);
if ( hdfile == NULL ) {
ERROR("Couldn't open file: %s\n", image.filename);
@@ -131,6 +132,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
}
time_accounts_set(taccs, TACC_HDF5READ);
+ sb_shared->pings[cookie]++;
check = hdf5_read2(hdfile, &image, image.event, 0);
if ( check ) {
return;
@@ -138,6 +140,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
/* Take snapshot of image before applying horrible noise filters */
time_accounts_set(taccs, TACC_FILTER);
+ sb_shared->pings[cookie]++;
prefilter = backup_image_data(image.dp, image.det);
if ( iargs->median_filter > 0 ) {
@@ -149,9 +152,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
}
time_accounts_set(taccs, TACC_RESRANGE);
+ sb_shared->pings[cookie]++;
mark_resolution_range_as_bad(&image, iargs->highres, +INFINITY);
time_accounts_set(taccs, TACC_PEAKSEARCH);
+ sb_shared->pings[cookie]++;
switch ( iargs->peaks ) {
case PEAK_HDF5:
@@ -214,7 +219,8 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
/* Index the pattern */
time_accounts_set(taccs, TACC_INDEXING);
- index_pattern(&image, iargs->indm, iargs->ipriv);
+ index_pattern_2(&image, iargs->indm, iargs->ipriv,
+ &sb_shared->pings[cookie]);
r = chdir(rn);
if ( r ) {
@@ -247,6 +253,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
/* Integrate! */
time_accounts_set(taccs, TACC_INTEGRATION);
+ sb_shared->pings[cookie]++;
integrate_all_4(&image, iargs->int_meth, PMODEL_SCSPHERE,
iargs->push_res,
iargs->ir_inn, iargs->ir_mid, iargs->ir_out,
@@ -255,6 +262,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
&sb_shared->term_lock);
time_accounts_set(taccs, TACC_WRITESTREAM);
+ sb_shared->pings[cookie]++;
ret = write_chunk(st, &image, hdfile,
iargs->stream_peaks, iargs->stream_refls,
pargs->filename_p_e->ev);
@@ -274,6 +282,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
/* Count crystals which are still good */
time_accounts_set(taccs, TACC_TOTALS);
+ sb_shared->pings[cookie]++;
pthread_mutex_lock(&sb_shared->totals_lock);
any_crystals = 0;
for ( i=0; i<image.n_crystals; i++ ) {