diff options
-rw-r--r-- | libcrystfel/src/image.c | 9 | ||||
-rw-r--r-- | libcrystfel/src/image.h | 1 | ||||
-rw-r--r-- | src/im-sandbox.c | 167 | ||||
-rw-r--r-- | src/process_image.c | 1 |
4 files changed, 94 insertions, 84 deletions
diff --git a/libcrystfel/src/image.c b/libcrystfel/src/image.c index ad651cae..b40819ef 100644 --- a/libcrystfel/src/image.c +++ b/libcrystfel/src/image.c @@ -669,7 +669,7 @@ int image_set_zero_mask(struct image *image, static int image_read_image_data(struct image *image, const DataTemplate *dtempl) { - if ( (image->filename != NULL) + if ( (image->data_block == NULL) && (!file_exists(image->filename)) ) { ERROR("File not found: %s (read data)\n", image->filename); @@ -1201,10 +1201,12 @@ struct image *image_read_data_block(const DataTemplate *dtempl, void *data_block, size_t data_block_size, DataSourceType type, + int serial, int no_image_data, int no_mask_data) { struct image *image; + char tmp[64]; if ( dtempl == NULL ) { ERROR("NULL data template!\n"); @@ -1217,8 +1219,9 @@ struct image *image_read_data_block(const DataTemplate *dtempl, return NULL; } - image->filename = NULL; - image->ev = NULL; + snprintf(tmp, 63, "datablock-%i", serial); + image->filename = strdup(tmp); + image->ev = strdup("//"); image->data_block = data_block; image->data_block_size = data_block_size; diff --git a/libcrystfel/src/image.h b/libcrystfel/src/image.h index 58228e35..045f3e77 100644 --- a/libcrystfel/src/image.h +++ b/libcrystfel/src/image.h @@ -216,6 +216,7 @@ extern struct image *image_read_data_block(const DataTemplate *dtempl, void *data_block, size_t data_block_size, DataSourceType type, + int serial, int no_image_data, int no_mask_data); extern void image_free(struct image *image); diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 7760693a..ae1fe92a 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -348,88 +348,87 @@ static int run_work(const struct index_args *iargs, Stream *st, struct pattern_args pargs; int ser; + char *line; + size_t len; + int i; + char *event_str = NULL; + char *ser_str = NULL; + int ok = 1; + + /* Wait until an event is ready */ + time_accounts_set(taccs, TACC_EVENTWAIT); + set_last_task(sb->shared->last_task[cookie], "wait_event"); + if ( sem_wait(sb->queue_sem) != 0 ) { + ERROR("Failed to wait on queue semaphore: %s\n", + strerror(errno)); + } - if ( !sb->zmq ) { - - char *line; - size_t len; - int i; - char *event_str = NULL; - char *ser_str = NULL; - int ok = 1; - - /* Wait until an event is ready */ - time_accounts_set(taccs, TACC_EVENTWAIT); - set_last_task(sb->shared->last_task[cookie], "wait_event"); - if ( sem_wait(sb->queue_sem) != 0 ) { - ERROR("Failed to wait on queue semaphore: %s\n", - strerror(errno)); - } - - /* Get the event from the queue */ - set_last_task(sb->shared->last_task[cookie], "read_queue"); - pthread_mutex_lock(&sb->shared->queue_lock); - if ( ((sb->shared->n_events==0) && (sb->shared->no_more)) - || (sb->shared->should_shutdown) ) - { - /* Queue is empty and no more are coming, - * or another process has initiated a shutdown. - * Either way, it's time to get out of here. */ - pthread_mutex_unlock(&sb->shared->queue_lock); - allDone = 1; - continue; - } - if ( sb->shared->n_events == 0 ) { - ERROR("Got the semaphore, but no events in queue!\n"); - ERROR("no_more = %i\n", sb->shared->no_more); - pthread_mutex_unlock(&sb->shared->queue_lock); - allDone = 1; - continue; - } + /* Get the event from the queue */ + set_last_task(sb->shared->last_task[cookie], "read_queue"); + pthread_mutex_lock(&sb->shared->queue_lock); + if ( ((sb->shared->n_events==0) && (sb->shared->no_more)) + || (sb->shared->should_shutdown) ) + { + /* Queue is empty and no more are coming, + * or another process has initiated a shutdown. + * Either way, it's time to get out of here. */ + pthread_mutex_unlock(&sb->shared->queue_lock); + allDone = 1; + continue; + } + if ( sb->shared->n_events == 0 ) { + ERROR("Got the semaphore, but no events in queue!\n"); + ERROR("no_more = %i\n", sb->shared->no_more); + pthread_mutex_unlock(&sb->shared->queue_lock); + allDone = 1; + continue; + } - line = strdup(sb->shared->queue[0]); + line = strdup(sb->shared->queue[0]); - len = strlen(line); - assert(len > 1); - for ( i=len-1; i>0; i-- ) { - if ( line[i] == ' ' ) { - line[i] = '\0'; - ser_str = &line[i+1]; - break; - } - } - len = strlen(line); - assert(len > 1); - for ( i=len-1; i>0; i-- ) { - if ( line[i] == ' ' ) { - line[i] = '\0'; - event_str = &line[i+1]; - break; - } + len = strlen(line); + assert(len > 1); + for ( i=len-1; i>0; i-- ) { + if ( line[i] == ' ' ) { + line[i] = '\0'; + ser_str = &line[i+1]; + break; } - if ( (ser_str != NULL) && (event_str != NULL) ) { - if ( sscanf(ser_str, "%i", &ser) != 1 ) { - STATUS("Invalid serial number '%s'\n", - ser_str); - ok = 0; - } + } + len = strlen(line); + assert(len > 1); + for ( i=len-1; i>0; i-- ) { + if ( line[i] == ' ' ) { + line[i] = '\0'; + event_str = &line[i+1]; + break; } - if ( !ok ) { - STATUS("Invalid event string '%s'\n", - sb->shared->queue[0]); + } + if ( (ser_str != NULL) && (event_str != NULL) ) { + if ( sscanf(ser_str, "%i", &ser) != 1 ) { + STATUS("Invalid serial number '%s'\n", + ser_str); ok = 0; } - memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0], - MAX_EV_LEN); - shuffle_events(sb->shared); - pthread_mutex_unlock(&sb->shared->queue_lock); + } + if ( !ok ) { + STATUS("Invalid event string '%s'\n", + sb->shared->queue[0]); + ok = 0; + } + memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0], + MAX_EV_LEN); + shuffle_events(sb->shared); + pthread_mutex_unlock(&sb->shared->queue_lock); - if ( !ok ) continue; + if ( !ok ) continue; - pargs.filename = strdup(line); - pargs.event = strdup(event_str); + pargs.filename = strdup(line); + pargs.event = strdup(event_str); - free(line); + free(line); + + if ( !sb->zmq ) { pargs.zmq_data = NULL; pargs.zmq_data_size = 0; @@ -440,9 +439,11 @@ static int run_work(const struct index_args *iargs, Stream *st, pargs.zmq_data = im_zmq_fetch(zmqstuff, &pargs.zmq_data_size); } while ( pargs.zmq_data_size < 15 ); - pargs.filename = strdup("(from ZMQ)"); - pargs.event = NULL; - ser = 0; /* FIXME: Serial numbers from ZMQ? */ + + /* The filename/event, which will be 'fake' values in + * this case, still came via the event queue. More + * importantly, the event queue gave us a unique + * serial number for this image. */ } @@ -820,17 +821,21 @@ static int setup_shm(struct sandbox *sb) /* Assumes the caller is already holding queue_lock! */ static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb) { - if ( sb->zmq ) { - /* Do nothing */ - return 0; - } - while ( sb->shared->n_events < QUEUE_SIZE ) { char *filename; char *evstr; - if ( !get_pattern(gpctx, &filename, &evstr) ) return 1; + if ( sb->zmq ) { + /* These values will be passed down to process_image, + * but ignored. The 'real' filename, which is still a + * 'fake' filename - only for accounting purposes - will + * be generated by image_read_data_block(). */ + filename = "ZMQdata"; + evstr = strdup("//"); + } else { + if ( !get_pattern(gpctx, &filename, &evstr) ) return 1; + } memset(sb->shared->queue[sb->shared->n_events], 0, MAX_EV_LEN); snprintf(sb->shared->queue[sb->shared->n_events++], MAX_EV_LEN, diff --git a/src/process_image.c b/src/process_image.c index 188a4829..281cbab5 100644 --- a/src/process_image.c +++ b/src/process_image.c @@ -199,6 +199,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs, pargs->zmq_data, pargs->zmq_data_size, DST_MSGPACK, + serial, iargs->no_image_data, iargs->no_mask_data); if ( image == NULL ) return; |