aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libcrystfel/src/image.c9
-rw-r--r--libcrystfel/src/image.h1
-rw-r--r--src/im-sandbox.c167
-rw-r--r--src/process_image.c1
4 files changed, 94 insertions, 84 deletions
diff --git a/libcrystfel/src/image.c b/libcrystfel/src/image.c
index ad651cae..b40819ef 100644
--- a/libcrystfel/src/image.c
+++ b/libcrystfel/src/image.c
@@ -669,7 +669,7 @@ int image_set_zero_mask(struct image *image,
static int image_read_image_data(struct image *image,
const DataTemplate *dtempl)
{
- if ( (image->filename != NULL)
+ if ( (image->data_block == NULL)
&& (!file_exists(image->filename)) )
{
ERROR("File not found: %s (read data)\n", image->filename);
@@ -1201,10 +1201,12 @@ struct image *image_read_data_block(const DataTemplate *dtempl,
void *data_block,
size_t data_block_size,
DataSourceType type,
+ int serial,
int no_image_data,
int no_mask_data)
{
struct image *image;
+ char tmp[64];
if ( dtempl == NULL ) {
ERROR("NULL data template!\n");
@@ -1217,8 +1219,9 @@ struct image *image_read_data_block(const DataTemplate *dtempl,
return NULL;
}
- image->filename = NULL;
- image->ev = NULL;
+ snprintf(tmp, 63, "datablock-%i", serial);
+ image->filename = strdup(tmp);
+ image->ev = strdup("//");
image->data_block = data_block;
image->data_block_size = data_block_size;
diff --git a/libcrystfel/src/image.h b/libcrystfel/src/image.h
index 58228e35..045f3e77 100644
--- a/libcrystfel/src/image.h
+++ b/libcrystfel/src/image.h
@@ -216,6 +216,7 @@ extern struct image *image_read_data_block(const DataTemplate *dtempl,
void *data_block,
size_t data_block_size,
DataSourceType type,
+ int serial,
int no_image_data,
int no_mask_data);
extern void image_free(struct image *image);
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 7760693a..ae1fe92a 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -348,88 +348,87 @@ static int run_work(const struct index_args *iargs, Stream *st,
struct pattern_args pargs;
int ser;
+ char *line;
+ size_t len;
+ int i;
+ char *event_str = NULL;
+ char *ser_str = NULL;
+ int ok = 1;
+
+ /* Wait until an event is ready */
+ time_accounts_set(taccs, TACC_EVENTWAIT);
+ set_last_task(sb->shared->last_task[cookie], "wait_event");
+ if ( sem_wait(sb->queue_sem) != 0 ) {
+ ERROR("Failed to wait on queue semaphore: %s\n",
+ strerror(errno));
+ }
- if ( !sb->zmq ) {
-
- char *line;
- size_t len;
- int i;
- char *event_str = NULL;
- char *ser_str = NULL;
- int ok = 1;
-
- /* Wait until an event is ready */
- time_accounts_set(taccs, TACC_EVENTWAIT);
- set_last_task(sb->shared->last_task[cookie], "wait_event");
- if ( sem_wait(sb->queue_sem) != 0 ) {
- ERROR("Failed to wait on queue semaphore: %s\n",
- strerror(errno));
- }
-
- /* Get the event from the queue */
- set_last_task(sb->shared->last_task[cookie], "read_queue");
- pthread_mutex_lock(&sb->shared->queue_lock);
- if ( ((sb->shared->n_events==0) && (sb->shared->no_more))
- || (sb->shared->should_shutdown) )
- {
- /* Queue is empty and no more are coming,
- * or another process has initiated a shutdown.
- * Either way, it's time to get out of here. */
- pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
- if ( sb->shared->n_events == 0 ) {
- ERROR("Got the semaphore, but no events in queue!\n");
- ERROR("no_more = %i\n", sb->shared->no_more);
- pthread_mutex_unlock(&sb->shared->queue_lock);
- allDone = 1;
- continue;
- }
+ /* Get the event from the queue */
+ set_last_task(sb->shared->last_task[cookie], "read_queue");
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ if ( ((sb->shared->n_events==0) && (sb->shared->no_more))
+ || (sb->shared->should_shutdown) )
+ {
+ /* Queue is empty and no more are coming,
+ * or another process has initiated a shutdown.
+ * Either way, it's time to get out of here. */
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ if ( sb->shared->n_events == 0 ) {
+ ERROR("Got the semaphore, but no events in queue!\n");
+ ERROR("no_more = %i\n", sb->shared->no_more);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
- line = strdup(sb->shared->queue[0]);
+ line = strdup(sb->shared->queue[0]);
- len = strlen(line);
- assert(len > 1);
- for ( i=len-1; i>0; i-- ) {
- if ( line[i] == ' ' ) {
- line[i] = '\0';
- ser_str = &line[i+1];
- break;
- }
- }
- len = strlen(line);
- assert(len > 1);
- for ( i=len-1; i>0; i-- ) {
- if ( line[i] == ' ' ) {
- line[i] = '\0';
- event_str = &line[i+1];
- break;
- }
+ len = strlen(line);
+ assert(len > 1);
+ for ( i=len-1; i>0; i-- ) {
+ if ( line[i] == ' ' ) {
+ line[i] = '\0';
+ ser_str = &line[i+1];
+ break;
}
- if ( (ser_str != NULL) && (event_str != NULL) ) {
- if ( sscanf(ser_str, "%i", &ser) != 1 ) {
- STATUS("Invalid serial number '%s'\n",
- ser_str);
- ok = 0;
- }
+ }
+ len = strlen(line);
+ assert(len > 1);
+ for ( i=len-1; i>0; i-- ) {
+ if ( line[i] == ' ' ) {
+ line[i] = '\0';
+ event_str = &line[i+1];
+ break;
}
- if ( !ok ) {
- STATUS("Invalid event string '%s'\n",
- sb->shared->queue[0]);
+ }
+ if ( (ser_str != NULL) && (event_str != NULL) ) {
+ if ( sscanf(ser_str, "%i", &ser) != 1 ) {
+ STATUS("Invalid serial number '%s'\n",
+ ser_str);
ok = 0;
}
- memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
- MAX_EV_LEN);
- shuffle_events(sb->shared);
- pthread_mutex_unlock(&sb->shared->queue_lock);
+ }
+ if ( !ok ) {
+ STATUS("Invalid event string '%s'\n",
+ sb->shared->queue[0]);
+ ok = 0;
+ }
+ memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
+ MAX_EV_LEN);
+ shuffle_events(sb->shared);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
- if ( !ok ) continue;
+ if ( !ok ) continue;
- pargs.filename = strdup(line);
- pargs.event = strdup(event_str);
+ pargs.filename = strdup(line);
+ pargs.event = strdup(event_str);
- free(line);
+ free(line);
+
+ if ( !sb->zmq ) {
pargs.zmq_data = NULL;
pargs.zmq_data_size = 0;
@@ -440,9 +439,11 @@ static int run_work(const struct index_args *iargs, Stream *st,
pargs.zmq_data = im_zmq_fetch(zmqstuff,
&pargs.zmq_data_size);
} while ( pargs.zmq_data_size < 15 );
- pargs.filename = strdup("(from ZMQ)");
- pargs.event = NULL;
- ser = 0; /* FIXME: Serial numbers from ZMQ? */
+
+ /* The filename/event, which will be 'fake' values in
+ * this case, still came via the event queue. More
+ * importantly, the event queue gave us a unique
+ * serial number for this image. */
}
@@ -820,17 +821,21 @@ static int setup_shm(struct sandbox *sb)
/* Assumes the caller is already holding queue_lock! */
static int fill_queue(struct get_pattern_ctx *gpctx, struct sandbox *sb)
{
- if ( sb->zmq ) {
- /* Do nothing */
- return 0;
- }
-
while ( sb->shared->n_events < QUEUE_SIZE ) {
char *filename;
char *evstr;
- if ( !get_pattern(gpctx, &filename, &evstr) ) return 1;
+ if ( sb->zmq ) {
+ /* These values will be passed down to process_image,
+ * but ignored. The 'real' filename, which is still a
+ * 'fake' filename - only for accounting purposes - will
+ * be generated by image_read_data_block(). */
+ filename = "ZMQdata";
+ evstr = strdup("//");
+ } else {
+ if ( !get_pattern(gpctx, &filename, &evstr) ) return 1;
+ }
memset(sb->shared->queue[sb->shared->n_events], 0, MAX_EV_LEN);
snprintf(sb->shared->queue[sb->shared->n_events++], MAX_EV_LEN,
diff --git a/src/process_image.c b/src/process_image.c
index 188a4829..281cbab5 100644
--- a/src/process_image.c
+++ b/src/process_image.c
@@ -199,6 +199,7 @@ void process_image(const struct index_args *iargs, struct pattern_args *pargs,
pargs->zmq_data,
pargs->zmq_data_size,
DST_MSGPACK,
+ serial,
iargs->no_image_data,
iargs->no_mask_data);
if ( image == NULL ) return;