aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChuck <chuck@cfel-chuck.desy.de>2012-05-09 16:19:29 +0200
committerChuck <chuck@cfel-chuck.desy.de>2012-05-09 16:19:29 +0200
commit8aae37896eab7414ab5f384e4152f72e57d0c788 (patch)
tree5428c40b1db305399c88a3f417b8488dadc028af /src
parent982508966ca8817616b54b720565103446c736d5 (diff)
Multiprocess indexamajig
Diffstat (limited to 'src')
-rw-r--r--src/indexamajig.c550
1 files changed, 344 insertions, 206 deletions
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 281cac43..9372c447 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -12,6 +12,7 @@
* 2010-2012 Thomas White <taw@physics.org>
* 2011 Richard Kirian
* 2012 Lorenzo Galli
+ * 2012 Chunhong Yoon
*
* This file is part of CrystFEL.
*
@@ -44,6 +45,8 @@
#include <hdf5.h>
#include <gsl/gsl_errno.h>
#include <pthread.h>
+#include <sys/wait.h>
+#include <fcntl.h>
#ifdef HAVE_CLOCK_GETTIME
#include <time.h>
@@ -51,22 +54,25 @@
#include <sys/time.h>
#endif
-#include "utils.h"
-#include "hdf5-file.h"
-#include "index.h"
-#include "peaks.h"
-#include "detector.h"
-#include "filters.h"
-#include "thread-pool.h"
-#include "beam-parameters.h"
-#include "geometry.h"
-#include "stream.h"
-#include "reflist-utils.h"
+#include <crystfel/utils.h>
+#include <crystfel/hdf5-file.h>
+#include <crystfel/index.h>
+#include <crystfel/peaks.h>
+#include <crystfel/detector.h>
+#include <crystfel/filters.h>
+#include <crystfel/thread-pool.h>
+#include <crystfel/beam-parameters.h>
+#include <crystfel/geometry.h>
+#include <crystfel/stream.h>
+#include <crystfel/reflist-utils.h>
/* Write statistics at APPROXIMATELY this interval */
#define STATS_EVERY_N_SECONDS (5)
+#define LINE_LENGTH 1024
+
+#define BUFFER PIPE_BUF
enum {
PEAK_ZAEF,
@@ -107,6 +113,7 @@ struct static_index_args
pthread_mutex_t *output_mutex; /* Protects the output stream */
FILE *ofh;
const struct copy_hdf5_field *copyme;
+ char *outfile;
};
@@ -137,6 +144,7 @@ struct queue_args
int n_indexable_last_stats;
int n_processed_last_stats;
int t_last_stats;
+ int updateReader;
};
@@ -228,9 +236,8 @@ static void show_help(const char *s)
"\n"
"\nOptions you probably won't need:\n\n"
" --no-check-prefix Don't attempt to correct the --prefix.\n"
-" --no-closer-peak Don't integrate from the location of a nearby peak\n"
-" instead of the position closest to the reciprocal\n"
-" lattice point.\n"
+" --closer-peak Don't integrate from the location of a nearby peak\n"
+" instead of the predicted spot. Don't use.\n"
" --insane Don't check that the reduced cell accounts for at\n"
" least 10%% of the located peaks.\n"
" --no-bg-sub Don't subtract local background estimates from\n"
@@ -246,83 +253,94 @@ static void show_help(const char *s)
}
-static void process_image(void *pp, int cookie)
+// Get next pattern in .lst
+char* get_pattern(FILE *fh) {
+ char *rval;
+ char line[LINE_LENGTH];
+ rval = fgets(line, LINE_LENGTH - 1, fh);
+ if (ferror(fh)) {
+ printf("Read error\n");
+ rval = NULL;
+ }
+ return rval;
+}
+
+
+static void process_image(void *qp, void *pp, int cookie)
{
struct index_args *pargs = pp;
- struct hdfile *hdfile;
- struct image image;
+ struct queue_args *qargs = qp;
float *data_for_measurement;
size_t data_size;
- char *filename = pargs->filename;
- UnitCell *cell = pargs->static_args.cell;
- int config_cmfilter = pargs->static_args.config_cmfilter;
- int config_noisefilter = pargs->static_args.config_noisefilter;
- int config_verbose = pargs->static_args.config_verbose;
- IndexingMethod *indm = pargs->static_args.indm;
- struct beam_params *beam = pargs->static_args.beam;
+ UnitCell *cell = qargs->static_args.cell;
+ int config_cmfilter = qargs->static_args.config_cmfilter;
+ int config_noisefilter = qargs->static_args.config_noisefilter;
+ int config_verbose = qargs->static_args.config_verbose;
+ IndexingMethod *indm = qargs->static_args.indm;
+ struct beam_params *beam = qargs->static_args.beam;
+ int r, check;
+ struct hdfile *hdfile;
+ char *outfile = qargs->static_args.outfile;
+ struct image image;
image.features = NULL;
image.data = NULL;
image.flags = NULL;
image.indexed_cell = NULL;
- image.id = cookie;
- image.filename = filename;
- image.det = copy_geom(pargs->static_args.det);
- image.copyme = pargs->static_args.copyme;
+ image.det = copy_geom(qargs->static_args.det);
+ image.copyme = qargs->static_args.copyme;
image.beam = beam;
+ image.id = cookie; // MUST SET ID FOR MOSFLM TO WORK PROPERLY
- pargs->indexable = 0;
+ if (beam == NULL) {
+ ERROR("Warning: no beam parameters file.\n");
+ ERROR("I'm going to assume 1 ADU per photon, which is almost");
+ ERROR(" certainly wrong. Peak sigmas will be incorrect.\n");
+ }
+ char *filename = NULL;
+ char *imagename = pargs->filename;
+ chomp(imagename);
+ filename = malloc(strlen(qargs->prefix) + strlen(imagename) + 1);
+ snprintf(filename, LINE_LENGTH - 1, "%s%s", qargs->prefix, imagename);
+ image.filename = filename;
hdfile = hdfile_open(filename);
- if ( hdfile == NULL ) return;
-
- if ( pargs->static_args.element != NULL ) {
-
- int r;
- r = hdfile_set_image(hdfile, pargs->static_args.element);
- if ( r ) {
- ERROR("Couldn't select path '%s'\n",
- pargs->static_args.element);
- hdfile_close(hdfile);
- return;
- }
-
- } else {
-
- int r;
- r = hdfile_set_first_image(hdfile, "/");
- if ( r ) {
- ERROR("Couldn't select first path\n");
- hdfile_close(hdfile);
- return;
- }
+ if (hdfile == NULL) return;
+ r = hdfile_set_first_image(hdfile, "/"); // Need this to read hdf5 files
+ if (r) {
+ ERROR("Couldn't select first path\n");
+ hdfile_close(hdfile);
+ return;
}
- hdf5_read(hdfile, &image, pargs->static_args.config_satcorr);
+ check = hdf5_read(hdfile, &image, 1);
+ if (check == 1) {
+ hdfile_close(hdfile);
+ return;
+ }
- if ( (image.width != image.det->max_fs+1)
- || (image.height != image.det->max_ss+1) )
- {
+ if ((image.width != image.det->max_fs + 1)
+ || (image.height != image.det->max_ss + 1)) {
ERROR("Image size doesn't match geometry size"
- " - rejecting image.\n");
+ " - rejecting image.\n");
ERROR("Image size: %i,%i. Geometry size: %i,%i\n",
- image.width, image.height,
- image.det->max_fs+1, image.det->max_ss+1);
+ image.width, image.height,
+ image.det->max_fs + 1, image.det->max_ss + 1);
hdfile_close(hdfile);
free_detector_geometry(image.det);
return;
}
- if ( image.lambda < 0.0 ) {
- if ( beam != NULL ) {
+ if (image.lambda < 0.0) {
+ if (beam != NULL) {
ERROR("Using nominal photon energy of %.2f eV\n",
- beam->photon_energy);
+ beam->photon_energy);
image.lambda = ph_en_to_lambda(
- eV_to_J(beam->photon_energy));
+ eV_to_J(beam->photon_energy));
} else {
ERROR("No wavelength in file, so you need to give "
- "a beam parameters file with -b.\n");
+ "a beam parameters file with -b.\n");
hdfile_close(hdfile);
free_detector_geometry(image.det);
return;
@@ -330,40 +348,37 @@ static void process_image(void *pp, int cookie)
}
fill_in_values(image.det, hdfile);
- if ( config_cmfilter ) {
+ if (config_cmfilter) {
filter_cm(&image);
}
- /* Take snapshot of image after CM subtraction but before
- * the aggressive noise filter. */
- data_size = image.width*image.height*sizeof(float);
+ // Take snapshot of image after CM subtraction but before
+ // the aggressive noise filter.
+ data_size = image.width * image.height * sizeof (float);
data_for_measurement = malloc(data_size);
- if ( config_noisefilter ) {
+ if (config_noisefilter) {
filter_noise(&image, data_for_measurement);
} else {
memcpy(data_for_measurement, image.data, data_size);
}
- switch ( pargs->static_args.peaks )
- {
- case PEAK_HDF5 :
- /* Get peaks from HDF5 */
- if ( get_peaks(&image, hdfile,
- pargs->static_args.hdf5_peak_path) )
- {
- ERROR("Failed to get peaks from HDF5 file.\n");
- }
- break;
-
- case PEAK_ZAEF :
- search_peaks(&image, pargs->static_args.threshold,
- pargs->static_args.min_gradient,
- pargs->static_args.min_snr,
- pargs->static_args.ir_inn,
- pargs->static_args.ir_mid,
- pargs->static_args.ir_out);
- break;
+ switch (qargs->static_args.peaks) {
+ case PEAK_HDF5:
+ // Get peaks from HDF5
+ if (get_peaks(&image, hdfile,
+ qargs->static_args.hdf5_peak_path)) {
+ ERROR("Failed to get peaks from HDF5 file.\n");
+ }
+ break;
+ case PEAK_ZAEF:
+ search_peaks(&image, qargs->static_args.threshold,
+ qargs->static_args.min_gradient,
+ qargs->static_args.min_snr,
+ qargs->static_args.ir_inn,
+ qargs->static_args.ir_mid,
+ qargs->static_args.ir_out);
+ break;
}
/* Get rid of noise-filtered version at this point
@@ -375,40 +390,66 @@ static void process_image(void *pp, int cookie)
image.div = beam->divergence;
image.bw = beam->bandwidth;
image.profile_radius = 0.0001e9;
- index_pattern(&image, cell, indm, pargs->static_args.cellr,
- config_verbose, pargs->static_args.ipriv,
- pargs->static_args.config_insane,
- pargs->static_args.tols);
- if ( image.indexed_cell != NULL ) {
+ /* RUN INDEXING HERE */
+ index_pattern(&image, cell, indm, qargs->static_args.cellr,
+ config_verbose, qargs->static_args.ipriv,
+ qargs->static_args.config_insane, qargs->static_args.tols);
+ if (image.indexed_cell != NULL) {
pargs->indexable = 1;
-
image.reflections = find_intersections(&image,
- image.indexed_cell);
-
- if ( image.reflections != NULL ) {
-
+ image.indexed_cell);
+ if (image.reflections != NULL) {
integrate_reflections(&image,
- pargs->static_args.config_closer,
- pargs->static_args.config_bgsub,
- pargs->static_args.min_int_snr,
- pargs->static_args.ir_inn,
- pargs->static_args.ir_mid,
- pargs->static_args.ir_out);
-
+ qargs->static_args.config_closer,
+ qargs->static_args.config_bgsub,
+ qargs->static_args.min_int_snr,
+ qargs->static_args.ir_inn,
+ qargs->static_args.ir_mid,
+ qargs->static_args.ir_out);
}
-
} else {
-
image.reflections = NULL;
+ }
+ /* Write Lock */
+ struct flock fl = {F_WRLCK, SEEK_SET, 0, 0, 0};
+ int fd;
+ fl.l_pid = getpid();
+
+ char *outfilename = NULL;
+ chomp(outfile);
+ outfilename = malloc(strlen(outfile) + 1);
+ snprintf(outfilename, LINE_LENGTH - 1, "%s", outfile);
+ if ((fd = open(outfilename, O_WRONLY)) == -1) {
+ perror("Error on opening\n");
+ exit(1);
+ }
+ if (fcntl(fd, F_SETLKW, &fl) == -1) {
+ perror("Error on setting lock wait\n");
+ exit(1);
+ }
+
+ /* LOCKED! Write chunk */
+ FILE *fh;
+ fh = fopen(outfilename, "a");
+ if (fh == NULL) {
+ perror("Error inside lock\n");
+ }
+ write_chunk(fh, &image, hdfile, qargs->static_args.stream_flags);
+ fclose(fh);
+
+ /* Unlock stream for other processes */
+ fl.l_type = F_UNLCK; /* set to unlock same region */
+ if (fcntl(fd, F_SETLK, &fl) == -1) {
+ perror("fcntl");
+ exit(1);
}
+ close(fd);
- pthread_mutex_lock(pargs->static_args.output_mutex);
- write_chunk(pargs->static_args.ofh, &image, hdfile,
- pargs->static_args.stream_flags);
- pthread_mutex_unlock(pargs->static_args.output_mutex);
+ qargs->n_indexable += pargs->indexable;
+ qargs->n_processed++;
/* Only free cell if found */
cell_free(image.indexed_cell);
@@ -422,54 +463,6 @@ static void process_image(void *pp, int cookie)
}
-static void *get_image(void *qp)
-{
- char *line;
- struct index_args *pargs;
- char *rval;
- struct queue_args *qargs = qp;
-
- /* Initialise new task arguments */
- pargs = malloc(sizeof(struct index_args));
- memcpy(&pargs->static_args, &qargs->static_args,
- sizeof(struct static_index_args));
-
- /* Get the next filename */
- if ( qargs->use_this_one_instead != NULL ) {
-
- line = qargs->use_this_one_instead;
- qargs->use_this_one_instead = NULL;
-
- } else {
-
- line = malloc(1024*sizeof(char));
- rval = fgets(line, 1023, qargs->fh);
- if ( rval == NULL ) {
- free(pargs);
- free(line);
- return NULL;
- }
- chomp(line);
-
- }
-
- if ( qargs->config_basename ) {
- char *tmp;
- tmp = safe_basename(line);
- free(line);
- line = tmp;
- }
-
- pargs->filename = malloc(strlen(qargs->prefix)+strlen(line)+1);
-
- snprintf(pargs->filename, 1023, "%s%s", qargs->prefix, line);
-
- free(line);
-
- return pargs;
-}
-
-
#ifdef HAVE_CLOCK_GETTIME
static time_t get_monotonic_seconds()
@@ -493,34 +486,6 @@ static time_t get_monotonic_seconds()
#endif
-static void finalise_image(void *qp, void *pp)
-{
- struct queue_args *qargs = qp;
- struct index_args *pargs = pp;
- time_t monotonic_seconds;
-
- qargs->n_indexable += pargs->indexable;
- qargs->n_processed++;
-
- monotonic_seconds = get_monotonic_seconds();
- if ( monotonic_seconds >= qargs->t_last_stats+STATS_EVERY_N_SECONDS ) {
-
- STATUS("%i out of %i indexed so far,"
- " %i out of %i since the last message.\n",
- qargs->n_indexable, qargs->n_processed,
- qargs->n_indexable - qargs->n_indexable_last_stats,
- qargs->n_processed - qargs->n_processed_last_stats);
-
- qargs->n_processed_last_stats = qargs->n_processed;
- qargs->n_indexable_last_stats = qargs->n_indexable;
- qargs->t_last_stats = monotonic_seconds;
-
- }
-
- free(pargs->filename);
- free(pargs);
-}
-
static int parse_cell_reduction(const char *scellr, int *err,
int *reduction_needs_cell)
@@ -554,14 +519,13 @@ int main(int argc, char *argv[])
FILE *fh;
FILE *ofh;
char *rval = NULL;
- int n_images;
int config_noindex = 0;
int config_cmfilter = 0;
int config_noisefilter = 0;
int config_verbose = 0;
int config_satcorr = 1;
int config_checkprefix = 1;
- int config_closer = 1;
+ int config_closer = 0;
int config_insane = 0;
int config_bgsub = 1;
int config_basename = 0;
@@ -585,11 +549,11 @@ int main(int argc, char *argv[])
float tols[4] = {5.0, 5.0, 5.0, 1.5}; /* a,b,c,angles (%,%,%,deg) */
int cellr;
int peaks;
- int nthreads = 1;
- pthread_mutex_t output_mutex = PTHREAD_MUTEX_INITIALIZER;
+ int nProcesses = 1;
char *prepare_line;
- char prepare_filename[1024];
+ char prepare_filename[LINE_LENGTH];
struct queue_args qargs;
+ struct index_args pargs;
struct beam_params *beam = NULL;
char *element = NULL;
double nominal_photon_energy;
@@ -630,6 +594,7 @@ int main(int argc, char *argv[])
{"threshold", 1, NULL, 't'},
{"no-check-prefix", 0, &config_checkprefix, 0},
{"no-closer-peak", 0, &config_closer, 0},
+ {"closer-peak", 0, &config_closer, 1},
{"insane", 0, &config_insane, 1},
{"image", 1, NULL, 'e'},
{"basename", 0, &config_basename, 1},
@@ -653,7 +618,7 @@ int main(int argc, char *argv[])
};
/* Short options */
- while ((c = getopt_long(argc, argv, "hi:wp:j:x:g:t:o:b:e:",
+ while ((c = getopt_long(argc, argv, "hi:o:z:p:x:j:g:t:b:e:",
longopts, NULL)) != -1)
{
switch (c) {
@@ -683,7 +648,7 @@ int main(int argc, char *argv[])
break;
case 'j' :
- nthreads = atoi(optarg);
+ nProcesses = atoi(optarg);
break;
case 'g' :
@@ -826,7 +791,6 @@ int main(int argc, char *argv[])
ERROR("Failed to open output file '%s'\n", outfile);
return 1;
}
- free(outfile);
if ( hdf5_peak_path == NULL ) {
hdf5_peak_path = strdup("/processing/hitfinder/peakinfo");
@@ -859,8 +823,8 @@ int main(int argc, char *argv[])
}
}
- if ( nthreads == 0 ) {
- ERROR("Invalid number of threads.\n");
+ if ( nProcesses == 0 ) {
+ ERROR("Invalid number of processes.\n");
return 1;
}
@@ -976,8 +940,8 @@ int main(int argc, char *argv[])
}
/* Get first filename and use it to set up the indexing */
- prepare_line = malloc(1024*sizeof(char));
- rval = fgets(prepare_line, 1023, fh);
+ prepare_line = malloc(LINE_LENGTH*sizeof(char));
+ rval = fgets(prepare_line, LINE_LENGTH-1, fh);
if ( rval == NULL ) {
ERROR("Failed to get filename to prepare indexing.\n");
return 1;
@@ -989,7 +953,7 @@ int main(int argc, char *argv[])
free(prepare_line);
prepare_line = tmp;
}
- snprintf(prepare_filename, 1023, "%s%s", prefix, prepare_line);
+ snprintf(prepare_filename, LINE_LENGTH-1, "%s%s", prefix, prepare_line);
qargs.use_this_one_instead = prepare_line;
/* Prepare the indexer */
@@ -1027,7 +991,6 @@ int main(int argc, char *argv[])
qargs.static_args.indm = indm;
qargs.static_args.ipriv = ipriv;
qargs.static_args.peaks = peaks;
- qargs.static_args.output_mutex = &output_mutex;
qargs.static_args.ofh = ofh;
qargs.static_args.beam = beam;
qargs.static_args.element = element;
@@ -1045,11 +1008,185 @@ int main(int argc, char *argv[])
qargs.n_processed = 0;
qargs.n_indexable_last_stats = 0;
qargs.n_processed_last_stats = 0;
+ qargs.updateReader = 0; /* first process updates */
qargs.t_last_stats = get_monotonic_seconds();
- n_images = run_threads(nthreads, process_image, get_image,
- finalise_image, &qargs, 0,
- cpu_num, cpu_groupsize, cpu_offset);
+ /* Read .lst file */
+ register int i;
+ rewind(fh); /* make sure to read from start */
+
+ /* Clear output file content */
+ char *myOutfilename = NULL;
+ chomp(prefix);
+ chomp(outfile);
+ myOutfilename = malloc(strlen(outfile) + 1);
+ snprintf(myOutfilename, LINE_LENGTH - 1, "%s", outfile);
+ FILE *tfh;
+ tfh = fopen(myOutfilename, "a+");
+ if (tfh == NULL) {
+ ERROR("No output filename\n");
+ }
+ fclose(tfh);
+ qargs.static_args.outfile = outfile;
+ int ready_fd;
+ int buff_count;
+ fd_set fdset,tmpset;
+ char buffR[BUFFER], buffW[BUFFER];
+ int fd_pipeIn[nProcesses][2]; /* Process0 In */
+ int fd_pipeOut[nProcesses][2]; /* Process0 Out */
+ unsigned int opts;
+
+ FD_ZERO(&fdset); /* clear the fd_set */
+ /* set pipeIn as non-blocking */
+ for ( i=0; i<nProcesses; i++ ) {
+ opts = fcntl(fd_pipeIn[i][0], F_GETFL);
+ fcntl(fd_pipeIn[i][0], F_SETFL, opts | O_NONBLOCK);
+ }
+
+ /**** PIPING ****/
+ for ( i=0; i<nProcesses; i++ ) {
+ pipe(fd_pipeIn[i]);
+ pipe(fd_pipeOut[i]);
+ }
+
+ int max_fd = 0;
+ for ( i=0; i<nProcesses; i++ ) {
+ FD_SET(fd_pipeIn[i][0], &fdset);
+ if (fd_pipeIn[i][0] > max_fd) { /* find max_fd */
+ max_fd = fd_pipeIn[i][0];
+ }
+ }
+ max_fd = max_fd+1;
+ /* copy file set to tmpset */
+ memcpy((void *) &tmpset,(void *) &fdset, sizeof(fd_set));
+
+ /**** FORKING ****/
+ int power = 10; /* 2^power must be larger than nProcesses */
+ int pid[power];
+ double num = 0;
+ int batchNum = 0;
+ /* Fork 2^power times */
+ for ( i=0; i<power; i++ ) {
+ pid[i] = fork();
+ }
+ /* Assign id */
+ for ( i=0; i<power; i++ ) {
+ if (pid[i] == 0) { /* keep parents and kill off children */
+ num += pow(2, i);
+ }
+ }
+ /* Kill if batchNum too high */
+ if (num >= nProcesses + 1) {
+ exit(0); /* kill */
+ }
+ batchNum = (int) num;
+
+ /**** PLUMBING ****/
+ if (batchNum == qargs.updateReader) {
+ for ( i=0; i<nProcesses; i++ ) {
+ close(fd_pipeIn[i][1]); /* close all write pipes In */
+ close(fd_pipeOut[i][0]); /* close all read pipes Out */
+ }
+ } else {
+ for ( i=0; i<nProcesses; i++ ) {
+ if (i == batchNum - 1) { /* batchNum = 1,2,3 ... */
+ close(fd_pipeIn[i][0]); /* close read pipe In */
+ close(fd_pipeOut[i][1]); /* close write pipe Out */
+ } else {
+ close(fd_pipeIn[i][0]); // close remaining pipes In
+ close(fd_pipeIn[i][1]);
+ close(fd_pipeOut[i][0]); // close remaining pipes Out
+ close(fd_pipeOut[i][1]);
+ }
+ }
+ }
+ /**** INDEXING ****/
+ double tStart, tEnd;
+ tStart = get_monotonic_seconds();
+ int allDone = 0;
+ if (batchNum == qargs.updateReader){
+ char *nextImage = NULL;
+ for ( i=0; i<nProcesses; i++ ) { /* Send out image to all processes*/
+ nextImage = get_pattern(fh);
+ buff_count = sprintf(buffW, "%s",nextImage);
+ write (fd_pipeOut[i][1], buffW, buff_count);
+ }
+ int nFinished = 0;
+ while (!allDone) {
+ /* select from file set for reading */
+ if ((ready_fd = select(max_fd,&fdset,NULL,NULL,NULL)) < 0)
+ perror("select");
+ if (ready_fd > 0) {
+ for ( i=0; i<nProcesses; i++ ) {
+ /* is in file set that raised flag? */
+ if (FD_ISSET(fd_pipeIn[i][0],&fdset)) {
+ /* read from pipe and return number of bytes read */
+ if ((buff_count=read(fd_pipeIn[i][0],&buffR,BUFFER))<0) {
+ perror("read");
+ } else {
+ qargs.n_indexable += atoi(buffR);
+ qargs.n_processed++;
+ /* write to pipe */
+ if ((nextImage = get_pattern(fh)) == NULL){
+ nFinished++; /* no more images */
+ if ( nFinished == nProcesses )
+ allDone = 1; /* EXIT */
+ } else {
+ /* send out image */
+ buff_count = sprintf(buffW, "%s",nextImage);
+ if (write (fd_pipeOut[i][1], buffW, buff_count)<0)
+ perror("write pipe");
+ }
+ }
+ }
+ }
+ }
+ /* file set is modified, so copy original from tmpset */
+ memcpy((void *) &fdset,(void *) &tmpset, sizeof(fd_set));
+
+ /* Update to screen */
+ double tNow = get_monotonic_seconds();
+ if ( tNow >= qargs.t_last_stats+STATS_EVERY_N_SECONDS ) {
+ STATUS("%i out of %i indexed so far,"
+ " %i out of %i since the last message.\n\n",
+ qargs.n_indexable, qargs.n_processed,
+ qargs.n_indexable - qargs.n_indexable_last_stats,
+ qargs.n_processed - qargs.n_processed_last_stats);
+
+ qargs.n_indexable_last_stats = qargs.n_indexable;
+ qargs.n_processed_last_stats = qargs.n_processed;
+ qargs.t_last_stats = tNow;
+ }
+ }
+ /* close my pipes */
+ for ( i=0; i<nProcesses; i++ ) {
+ close(fd_pipeIn[i][0]);
+ close(fd_pipeOut[i][1]);
+ }
+ tEnd = get_monotonic_seconds();
+ printf("Compute Time: %.2fs\n", tEnd - tStart);
+ } else {
+ while(!allDone){
+ /* read from pipe and return number of bytes read */
+ if ((buff_count=read(fd_pipeOut[batchNum-1][0],&buffR,BUFFER))<0) {
+ perror("read1");
+ } else if (buff_count > 0) {
+ /* process image */
+ pargs.filename = buffR;
+ pargs.indexable = 0;
+ process_image(&qargs, &pargs, batchNum);
+ /* request another image */
+ buff_count = sprintf(buffW, "%d\n", pargs.indexable);
+ if(write (fd_pipeIn[batchNum-1][1], buffW, buff_count)<0)
+ perror("write P0");
+ } else if (buff_count == 0) {
+ allDone = 1; /* EXIT */
+ }
+ }
+ /* close my pipes */
+ close(fd_pipeIn[batchNum-1][1]);
+ close(fd_pipeOut[batchNum-1][0]);
+ }
cleanup_indexing(ipriv);
@@ -1065,8 +1202,9 @@ int main(int argc, char *argv[])
if ( fh != stdin ) fclose(fh);
if ( ofh != stdout ) fclose(ofh);
- STATUS("There were %i images, of which %i could be indexed.\n",
- n_images, qargs.n_indexable);
-
+ if (batchNum == qargs.updateReader) {
+ STATUS("There were %i images, of which %i could be indexed.\n",
+ qargs.n_processed, qargs.n_indexable);
+ }
return 0;
}