From c7abdfb3e404a8a238e8352ea5e195f864efefd2 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Mon, 5 Dec 2016 10:10:24 +0100 Subject: indexamajig: Add ping mechanism to avoid timing out when trying lots of indexers --- src/im-sandbox.c | 12 ++++++++++++ src/im-sandbox.h | 5 +++-- src/process_image.c | 15 ++++++++++++--- 3 files changed, 27 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 979fc136..df8ed1e2 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -78,6 +78,7 @@ struct sandbox pid_t *pids; int *running; time_t *last_response; + int last_ping[MAX_NUM_WORKERS]; /* Streams to read from (NB not the same indices as the above) */ int n_read; @@ -123,6 +124,7 @@ static time_t get_monotonic_seconds() static void stamp_response(struct sandbox *sb, int n) { sb->last_response[n] = get_monotonic_seconds(); + sb->last_ping[n] = sb->shared->pings[n]; } @@ -131,13 +133,20 @@ static void check_hung_workers(struct sandbox *sb) int i; time_t tnow = get_monotonic_seconds(); for ( i=0; in_proc; i++ ) { + if ( !sb->running[i] ) continue; + + if ( sb->shared->pings[i] != sb->last_ping[i] ) { + stamp_response(sb, i); + } + if ( tnow - sb->last_response[i] > 240 ) { STATUS("Worker %i did not respond for 240 seconds - " "sending it SIGKILL.\n", i); kill(sb->pids[i], SIGKILL); stamp_response(sb, i); } + } } @@ -534,6 +543,9 @@ static void start_worker_process(struct sandbox *sb, int slot) return; } + sb->shared->pings[slot] = 0; + sb->last_ping[slot] = 0; + p = fork(); if ( p == -1 ) { ERROR("fork() failed!\n"); diff --git a/src/im-sandbox.h b/src/im-sandbox.h index b39f9566..b58e4b9e 100644 --- a/src/im-sandbox.h +++ b/src/im-sandbox.h @@ -3,13 +3,13 @@ * * Sandbox for indexing * - * Copyright © 2012-2015 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2017 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * Copyright © 2012 Richard Kirian * Copyright © 2012 Lorenzo Galli * * Authors: - * 2010-2015 Thomas White + * 2010-2017 Thomas White * 2011 Richard Kirian * 2012 Lorenzo Galli * 2012 Chunhong Yoon @@ -61,6 +61,7 @@ struct sb_shm char queue[QUEUE_SIZE][MAX_EV_LEN]; int no_more; char last_ev[MAX_NUM_WORKERS][MAX_EV_LEN]; + int pings[MAX_NUM_WORKERS]; pthread_mutex_t totals_lock; int n_processed; diff --git a/src/process_image.c b/src/process_image.c index 62ef732c..bcaee543 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -3,11 +3,11 @@ * * The processing pipeline for one image * - * Copyright © 2012-2015 Deutsches Elektronen-Synchrotron DESY, + * Copyright © 2012-2017 Deutsches Elektronen-Synchrotron DESY, * a research centre of the Helmholtz Association. * * Authors: - * 2010-2015 Thomas White + * 2010-2017 Thomas White * 2014 Valerio Mariani * * This file is part of CrystFEL. @@ -124,6 +124,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, image.indexed_by = INDEXING_NONE; time_accounts_set(taccs, TACC_HDF5OPEN); + sb_shared->pings[cookie]++; hdfile = hdfile_open(image.filename); if ( hdfile == NULL ) { ERROR("Couldn't open file: %s\n", image.filename); @@ -131,6 +132,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, } time_accounts_set(taccs, TACC_HDF5READ); + sb_shared->pings[cookie]++; check = hdf5_read2(hdfile, &image, image.event, 0); if ( check ) { return; @@ -138,6 +140,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, /* Take snapshot of image before applying horrible noise filters */ time_accounts_set(taccs, TACC_FILTER); + sb_shared->pings[cookie]++; prefilter = backup_image_data(image.dp, image.det); if ( iargs->median_filter > 0 ) { @@ -149,9 +152,11 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, } time_accounts_set(taccs, TACC_RESRANGE); + sb_shared->pings[cookie]++; mark_resolution_range_as_bad(&image, iargs->highres, +INFINITY); time_accounts_set(taccs, TACC_PEAKSEARCH); + sb_shared->pings[cookie]++; switch ( iargs->peaks ) { case PEAK_HDF5: @@ -214,7 +219,8 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, /* Index the pattern */ time_accounts_set(taccs, TACC_INDEXING); - index_pattern(&image, iargs->indm, iargs->ipriv); + index_pattern_2(&image, iargs->indm, iargs->ipriv, + &sb_shared->pings[cookie]); r = chdir(rn); if ( r ) { @@ -247,6 +253,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, /* Integrate! */ time_accounts_set(taccs, TACC_INTEGRATION); + sb_shared->pings[cookie]++; integrate_all_4(&image, iargs->int_meth, PMODEL_SCSPHERE, iargs->push_res, iargs->ir_inn, iargs->ir_mid, iargs->ir_out, @@ -255,6 +262,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, &sb_shared->term_lock); time_accounts_set(taccs, TACC_WRITESTREAM); + sb_shared->pings[cookie]++; ret = write_chunk(st, &image, hdfile, iargs->stream_peaks, iargs->stream_refls, pargs->filename_p_e->ev); @@ -274,6 +282,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, /* Count crystals which are still good */ time_accounts_set(taccs, TACC_TOTALS); + sb_shared->pings[cookie]++; pthread_mutex_lock(&sb_shared->totals_lock); any_crystals = 0; for ( i=0; i