diff options
author | Valerio Mariani <valerio.mariani@desy.de> | 2014-05-09 11:02:17 +0200 |
---|---|---|
committer | Thomas White <taw@physics.org> | 2014-09-05 18:12:38 +0200 |
commit | 45492b842c3af2af542256417a8bab5bbc7bd5f7 (patch) | |
tree | 53fc320ad0734940c5a3fe2d075ae7417787432a /src/im-sandbox.c | |
parent | ae9fa9e6bfd1ed98a2b146d2e228c69a9cd651cc (diff) |
Multi-event mode
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r-- | src/im-sandbox.c | 386 |
1 files changed, 335 insertions, 51 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 42e4a090..48518b82 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -10,6 +10,7 @@ * * Authors: * 2010-2014 Thomas White <taw@physics.org> + * 2014 Valerio Mariani * 2011 Richard Kirian * 2012 Lorenzo Galli * 2012 Chunhong Yoon @@ -46,6 +47,7 @@ #include <fcntl.h> #include <signal.h> #include <sys/stat.h> +#include <assert.h> #ifdef HAVE_CLOCK_GETTIME #include <time.h> @@ -53,6 +55,10 @@ #include <sys/time.h> #endif +#include <events.h> +#include <hdf5-file.h> +#include <detector.h> + #include "im-sandbox.h" #include "process_image.h" @@ -101,7 +107,7 @@ struct sandbox FILE **result_fhs; int *filename_pipes; int *stream_pipe_write; - char **last_filename; + struct filename_plus_event **last_filename; char *tmpdir; @@ -125,48 +131,210 @@ static void unlock_sandbox(struct sandbox *sb) } -static char *get_pattern(FILE *fh, int config_basename, const char *prefix) +static struct filename_plus_event *get_pattern + (FILE *fh, int config_basename, struct detector *det, + const char *prefix) { - char *line; - char *filename; + char *line = NULL; size_t len; + struct filename_plus_event *fne; + struct hdfile *hdfile; + char filename_buf[2014]; + char event_buf[2014]; - do { + static char *filename = NULL; + static struct event_list *ev_list = NULL; + static int event_index = -1; - /* Get the next filename */ - char *rval; + line = malloc(1024*sizeof(char)); - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, fh); - if ( rval == NULL ) { + while ( event_index == -1 ) { + + int scan_check; + + do { + + /* Get the next filename */ + char *rval; + + rval = fgets(line, 1023, fh); + if ( rval == NULL ) { + free(line); + return NULL; + } + + chomp(line); + + } while ( strlen(line) == 0 ); + + if ( config_basename ) { + char *tmp; + tmp = safe_basename(line); free(line); - return NULL; + line = tmp; } - chomp(line); + scan_check = sscanf(line, "%s %s", filename_buf, event_buf ); - } while ( strlen(line) == 0 ); + len = strlen(prefix)+strlen(filename_buf)+1; - if ( config_basename ) { - char *tmp; - tmp = safe_basename(line); - free(line); - line = tmp; - } + /* Round the length of the buffer, too keep Valgrind quiet when it gets + * given to write() a bit later on */ + len += 4 - (len % 4); + + if ( filename == NULL ) { + filename = malloc(len); + } else { + char *new_filename; + new_filename = realloc(filename, len*sizeof(char)); + if ( filename == NULL ) { + return NULL; + } + filename = new_filename; + } + + snprintf(filename, 1023, "%s%s", prefix, filename_buf); + + if ( det->path_dim != 0 || det->dim_dim != 0 ) { + + ev_list = initialize_event_list(); - len = strlen(prefix)+strlen(line)+1; + if ( scan_check == 1) { - /* Round the length of the buffer, too keep Valgrind quiet when it gets - * given to write() a bit later on */ - len += 4 - (len % 4); + hdfile = hdfile_open(filename); + if ( hdfile == NULL ) { + ERROR("Failed to open file %s\n", filename); + free(line); + return NULL; + } + + if ( ev_list != NULL ) { + free_event_list(ev_list); + } - filename = malloc(len); + ev_list = fill_event_list(hdfile, det); + + if ( ev_list->num_events == 0 ) { + event_index = -1; + } else { + event_index = 0; + } + + hdfile_close(hdfile); + + } else { + + struct event *ev_to_add; + + ev_to_add = get_event_from_event_string(event_buf); + append_event_to_event_list(ev_list, ev_to_add); + free_event(ev_to_add); + event_index = 0; + + } + } else { - snprintf(filename, 1023, "%s%s", prefix, line); + event_index = 0; + + } + } + + fne = malloc(sizeof(struct filename_plus_event)); + fne->filename = strdup(filename); + + if ( det->path_dim !=0 || det->dim_dim !=0 ) { + fne->ev = copy_event(ev_list->events[event_index]); + if ( event_index != ev_list->num_events-1 ) { + event_index += 1; + } else { + event_index = -1; + } + } else { + fne->ev = NULL; + event_index = -1; + } free(line); + return fne; +} + + +struct buffer_data +{ + char *rbuffer; + char *line; + int fd; + int rbufpos; + int rbuflen; +}; + + +static int read_fpe_data(struct buffer_data *bd) +{ + int rval; + int no_line = 0; + + rval = read(bd->fd, bd->rbuffer+bd->rbufpos, bd->rbuflen-bd->rbufpos); + if ( (rval == -1) || (rval == 0) ) return 1; + bd->rbufpos += rval; + assert(bd->rbufpos <= bd->rbuflen); + + while ( (!no_line) && (bd->rbufpos > 0) ) { + + int i; + int line_ready = 0; + int line_end = 0; + + /* See if there's a full line in the buffer yet */ + for ( i=0; i<bd->rbufpos; i++ ) { + + /* Is there a line in the buffer? */ + if ( strncmp(&bd->rbuffer[i] ,"\n" ,1 ) == 0 ) { + line_end = i; + line_ready = 1; + break; + } + + } + + if ( line_ready ) { + + int new_rbuflen; + + if ( bd->line != NULL ) { + free(bd->line); + } + + bd->line = strdup(bd->rbuffer); + + /* Now the block's been parsed, it should be + * forgotten about */ + memmove(bd->rbuffer, + bd->rbuffer + line_end + 2, + bd->rbuflen - line_end - 2); + + /* Subtract the number of bytes removed */ + bd->rbufpos = bd->rbufpos - line_end - 1; + new_rbuflen = bd->rbuflen - line_end - 2 ; + if ( new_rbuflen == 0 ) new_rbuflen = 256; + bd->rbuffer = realloc(bd->rbuffer, new_rbuflen*sizeof(char)); + bd->rbuflen = new_rbuflen; + + return 1; + + } else { + + if ( bd->rbufpos == bd->rbuflen ) { + bd->rbuffer = realloc(bd->rbuffer, bd->rbuflen + 256); + bd->rbuflen = bd->rbuflen + 256; + } + no_line = 1; + + } + + } - return filename; + return 0; } @@ -174,9 +342,19 @@ static void run_work(const struct index_args *iargs, int filename_pipe, int results_pipe, Stream *st, int cookie, const char *tmpdir) { - int allDone = 0; FILE *fh; + int allDone = 0; int w; + unsigned int opts; + struct buffer_data *bd; + + bd = malloc(sizeof(struct buffer_data)); + bd->rbuffer = malloc(256*sizeof(char)); + bd->rbuflen = 256; + bd->rbufpos = 0; + bd->line = NULL; + bd->fd = 0; + fh = fdopen(filename_pipe, "r"); if ( fh == NULL ) { @@ -189,36 +367,101 @@ static void run_work(const struct index_args *iargs, ERROR("Failed to send request for first filename.\n"); } + bd->fd = fileno(fh); + + /* Set non-blocking */ + opts = fcntl(bd->fd, F_GETFL); + fcntl(bd->fd, F_SETFL, opts | O_NONBLOCK); + while ( !allDone ) { struct pattern_args pargs; int c; - char *line; - char *rval; + int error; + int rval; char buf[1024]; - line = malloc(1024*sizeof(char)); - rval = fgets(line, 1023, fh); - if ( rval == NULL ) { + error = 0; + pargs.filename_p_e = initialize_filename_plus_event(); - ERROR("Read error!\n"); - free(line); + rval =0; + + do { + + fd_set fds; + struct timeval tv; + int sval; + + FD_ZERO(&fds); + FD_SET(bd->fd, &fds); + + tv.tv_sec = 30; + tv.tv_usec = 0; + + sval = select(bd->fd+1, &fds, NULL, NULL, &tv); + + if ( sval == -1 ) { + + const int err = errno; + + switch ( err ) { + + case EINTR: + STATUS("Restarting select()\n"); + break; + + default: + ERROR("select() failed: %s\n", strerror(err)); + rval = 1; + + } + + } else if ( sval != 0 ) { + rval = read_fpe_data(bd); + } else { + ERROR("No data sent from main process..\n"); + rval = 1; + error = 1; + } + + } while ( !rval ); + + if ( error == 1 ) { allDone = 1; continue; - } - chomp(line); + chomp(bd->line); - if ( strlen(line) == 0 ) { + if ( strlen(bd->line) == 0 ) { allDone = 1; } else { - pargs.filename = line; - pargs.n_crystals = 0; + char filename[1024]; + char event_str[1024]; + struct event* ev; + + sscanf(bd->line, "%s %s", filename, event_str); + pargs.filename_p_e->filename = strdup(filename); + + if ( strcmp(event_str, "/") != 0 ) { + + ev = get_event_from_event_string(event_str); + if ( ev == NULL ) { + ERROR("Error in event recovery\n"); + } + pargs.filename_p_e->ev = ev; + + } else { + + pargs.filename_p_e->ev = NULL; + + } + + pargs.n_crystals = 0; process_image(iargs, &pargs, st, cookie, tmpdir, results_pipe); @@ -229,12 +472,16 @@ static void run_work(const struct index_args *iargs, ERROR("write P0\n"); } - } + free_filename_plus_event(pargs.filename_p_e); - free(line); + } } + free(bd->line); + free(bd->rbuffer); + free(bd); + cleanup_indexing(iargs->indm, iargs->ipriv); free(iargs->indm); free(iargs->ipriv); @@ -603,8 +850,9 @@ static void handle_zombie(struct sandbox *sb) if ( WIFSIGNALED(status) ) { STATUS("Worker %i was killed by signal %i\n", i, WTERMSIG(status)); - STATUS("Last filename was: %s\n", - sb->last_filename[i]); + STATUS("Last filename was: %s (%s)\n", + sb->last_filename[i]->filename, + get_event_string(sb->last_filename[i]->ev) ); sb->n_processed++; start_worker_process(sb, i); } @@ -804,7 +1052,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, lock_sandbox(sb); for ( i=0; i<n_proc; i++ ) { - char *nextImage; + struct filename_plus_event *nextImage; char results[1024]; char *rval; int fd; @@ -857,27 +1105,62 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } /* Send next filename */ - nextImage = get_pattern(fh, config_basename, prefix); + nextImage = get_pattern(fh, config_basename, iargs->det, prefix); + + if ( sb->last_filename[i] != NULL ) { + free_filename_plus_event(sb->last_filename[i]); + } - free(sb->last_filename[i]); sb->last_filename[i] = nextImage; if ( nextImage == NULL ) { + /* No more images */ r = write(sb->filename_pipes[i], "\n", 1); if ( r < 0 ) { ERROR("Write pipe\n"); } + } else { - r = write(sb->filename_pipes[i], nextImage, - strlen(nextImage)); - r -= write(sb->filename_pipes[i], "\n", 1); + + r = write(sb->filename_pipes[i], nextImage->filename, + strlen(nextImage->filename)); + if ( r < 0 ) { ERROR("write pipe\n"); } - } + r = write(sb->filename_pipes[i], " ", 1); + if ( r < 0 ) { + ERROR("write pipe\n"); + } + + if ( nextImage->ev != NULL ) { + + r = write(sb->filename_pipes[i], + get_event_string(nextImage->ev), + strlen(get_event_string(nextImage->ev))); + if ( r < 0 ) { + ERROR("write pipe\n"); + } + + } else { + + r = write(sb->filename_pipes[i], "/", 1); + if ( r < 0 ) { + ERROR("write pipe\n"); + } + + } + + r = write(sb->filename_pipes[i], "\n", 1); + if ( r < 0 ) { + ERROR("write pipe\n"); + } + + } } + unlock_sandbox(sb); /* Update progress */ @@ -891,7 +1174,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix, "%4i crystals so far. " "%4i images processed since the last message.\n", sb->n_hadcrystals, sb->n_processed, - 100.0 * sb->n_hadcrystals / sb->n_processed, + (sb->n_processed == 0 ? 0 : + 100.0 * sb->n_hadcrystals / sb->n_processed), sb->n_crystals, sb->n_processed - sb->n_processed_last_stats); |