aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2014-05-22 16:42:33 +0200
committerThomas White <taw@physics.org>2014-05-22 16:42:33 +0200
commit30ddb045dfaaf867bda7122d76ceb12d24d248e7 (patch)
tree2e956fc678aee387d152a337749a53b4ca9b29b2
parent8c76ffc087dc1f900125b8669d3460b7ea52f14c (diff)
Rationalise and document stream "open for write" functions
A nice side-effect is that streams now only have one set of headers
-rw-r--r--libcrystfel/src/stream.c87
-rw-r--r--libcrystfel/src/stream.h2
-rw-r--r--src/im-sandbox.c20
-rw-r--r--src/im-sandbox.h8
-rw-r--r--src/indexamajig.c11
5 files changed, 92 insertions, 36 deletions
diff --git a/libcrystfel/src/stream.c b/libcrystfel/src/stream.c
index fa893a30..13b9065d 100644
--- a/libcrystfel/src/stream.c
+++ b/libcrystfel/src/stream.c
@@ -840,6 +840,20 @@ Stream *open_stream_for_read(const char *filename)
}
+/**
+ * open_stream_fd_for_write
+ * @fd: File descriptor (e.g. from open()) to use for stream data.
+ *
+ * Creates a new %Stream from @fd, so that stream data can be written to @fd
+ * using write_chunk().
+ *
+ * In contrast to open_stream_for_write(), this function does not write any of
+ * the usual headers. This function is mostly for use when multiple substreams
+ * need to be multiplexed into a single master stream. The master would be
+ * opened using open_stream_for_write(), and the substreams using this function.
+ *
+ * Returns: a %Stream, or NULL on failure.
+ */
Stream *open_stream_fd_for_write(int fd)
{
Stream *st;
@@ -856,25 +870,64 @@ Stream *open_stream_fd_for_write(int fd)
st->major_version = LATEST_MAJOR_VERSION;
st->minor_version = LATEST_MINOR_VERSION;
- fprintf(st->fh, "CrystFEL stream format %i.%i\n",
- st->major_version, st->minor_version);
- fprintf(st->fh, "Generated by CrystFEL "CRYSTFEL_VERSIONSTRING"\n");
return st;
}
+/**
+ * open_stream_for_write
+ * @filename: Filename of new stream
+ *
+ * Creates a new stream with name @filename, and adds the stream format
+ * and version headers.
+ *
+ * You may want to follow this with a call to write_command() to record the
+ * command line.
+ *
+ * Returns: a %Stream, or NULL on failure.
+ */
Stream *open_stream_for_write(const char *filename)
{
- int fd;
+ Stream *st;
+
+ st = malloc(sizeof(struct _stream));
+ if ( st == NULL ) return NULL;
- fd = open(filename, O_CREAT | O_TRUNC | O_WRONLY,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
- if ( fd == -1 ) {
+ st->fh = fopen(filename, "w");
+ if ( st->fh == NULL ) {
ERROR("Failed to open stream.\n");
+ free(st);
return NULL;
}
- return open_stream_fd_for_write(fd);
+ st->major_version = LATEST_MAJOR_VERSION;
+ st->minor_version = LATEST_MINOR_VERSION;
+
+ fprintf(st->fh, "CrystFEL stream format %i.%i\n",
+ st->major_version, st->minor_version);
+ fprintf(st->fh, "Generated by CrystFEL "CRYSTFEL_VERSIONSTRING"\n");
+ fflush(st->fh);
+
+ return st;
+}
+
+
+/**
+ * get_stream_fd
+ * @st: A %Stream
+ *
+ * This function gets the integer file descriptor for @st, a bit like fileno().
+ *
+ * This is useful in conjunction with open_stream_fd_for_write(), to get the
+ * underlying file descriptor to which the multiplexed stream data should be
+ * written. In this case, the only other operations you should ever do (or have
+ * done) on @st are open_stream_for_write() and close_stream().
+ *
+ * Returns: an integer file descriptor
+ */
+int get_stream_fd(Stream *st)
+{
+ return fileno(st->fh);
}
@@ -906,13 +959,16 @@ int is_stream(const char *filename)
}
-void write_line(Stream *st, const char *line)
-{
- fprintf(st->fh, "%s\n", line);
- fflush(st->fh);
-}
-
-
+/**
+ * write_command
+ * @st: A %Stream
+ * @argc: number of arguments
+ * @argv: command-line arguments
+ *
+ * Writes the command line to @st. @argc and @argv should be exactly as were
+ * given to main(). This should usually be called immediately after
+ * open_stream_for_write().
+ */
void write_command(Stream *st, int argc, char *argv[])
{
int i;
@@ -924,6 +980,7 @@ void write_command(Stream *st, int argc, char *argv[])
fprintf(st->fh, "%s", argv[i]);
}
fprintf(st->fh, "\n");
+ fflush(st->fh);
}
diff --git a/libcrystfel/src/stream.h b/libcrystfel/src/stream.h
index 9f852c60..78c27b44 100644
--- a/libcrystfel/src/stream.h
+++ b/libcrystfel/src/stream.h
@@ -77,6 +77,7 @@ typedef enum {
extern Stream *open_stream_for_read(const char *filename);
extern Stream *open_stream_for_write(const char *filename);
extern Stream *open_stream_fd_for_write(int fd);
+extern int get_stream_fd(Stream *st);
extern void close_stream(Stream *st);
extern int read_chunk(Stream *st, struct image *image);
@@ -84,7 +85,6 @@ extern int read_chunk_2(Stream *st, struct image *image, StreamReadFlags srf);
extern void write_chunk(Stream *st, struct image *image, struct hdfile *hdfile,
int include_peaks, int include_reflections);
-extern void write_line(Stream *st, const char *line);
extern void write_command(Stream *st, int argc, char *argv[]);
extern int rewind_stream(Stream *st);
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 3558c8ea..3e2ab2b3 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -74,8 +74,8 @@ struct sb_reader
FILE **fhs;
int *fds;
- /* Final output fd */
- int ofd;
+ /* Final output */
+ Stream *stream;
};
@@ -376,6 +376,7 @@ static void remove_pipe(struct sb_reader *rd, int d)
static void *run_reader(void *rdv)
{
struct sb_reader *rd = rdv;
+ const int ofd = get_stream_fd(rd->stream);
while ( 1 ) {
@@ -425,7 +426,7 @@ static void *run_reader(void *rdv)
/* If the chunk cannot be read, assume the connection
* is broken and that the process will die soon. */
- if ( pump_chunk(rd->fhs[i], rd->ofd) ) {
+ if ( pump_chunk(rd->fhs[i], ofd) ) {
/* remove_pipe() assumes that the caller is
* holding rd->lock ! */
remove_pipe(rd, i);
@@ -440,8 +441,7 @@ static void *run_reader(void *rdv)
}
-static void start_worker_process(struct sandbox *sb, int slot,
- int argc, char *argv[])
+static void start_worker_process(struct sandbox *sb, int slot)
{
pid_t p;
int filename_pipe[2];
@@ -537,8 +537,6 @@ static void start_worker_process(struct sandbox *sb, int slot,
close(result_pipe[0]);
st = open_stream_fd_for_write(stream_pipe[1]);
- write_command(st, argc, argv);
- write_line(st, "FLUSH");
run_work(sb->iargs, filename_pipe[0], result_pipe[1],
st, slot, tmp);
close_stream(st);
@@ -606,7 +604,7 @@ static void handle_zombie(struct sandbox *sb)
STATUS("Last filename was: %s\n",
sb->last_filename[i]);
sb->n_processed++;
- start_worker_process(sb, i, 0, NULL);
+ start_worker_process(sb, i);
}
}
@@ -618,7 +616,7 @@ static void handle_zombie(struct sandbox *sb)
void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
- int ofd, int argc, char *argv[], const char *tempdir)
+ Stream *stream, const char *tempdir)
{
int i;
int allDone;
@@ -657,7 +655,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->reader->fds = NULL;
sb->reader->fhs = NULL;
- sb->reader->ofd = ofd;
+ sb->reader->stream = stream;
sb->stream_pipe_write = calloc(n_proc, sizeof(int));
if ( sb->stream_pipe_write == NULL ) {
@@ -742,7 +740,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
/* Fork the right number of times */
lock_sandbox(sb);
for ( i=0; i<n_proc; i++ ) {
- start_worker_process(sb, i, argc, argv);
+ start_worker_process(sb, i);
}
unlock_sandbox(sb);
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index 71af5052..235825ce 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -3,13 +3,13 @@
*
* Sandbox for indexing
*
- * Copyright © 2012-2013 Deutsches Elektronen-Synchrotron DESY,
+ * Copyright © 2012-2014 Deutsches Elektronen-Synchrotron DESY,
* a research centre of the Helmholtz Association.
* Copyright © 2012 Richard Kirian
* Copyright © 2012 Lorenzo Galli
*
* Authors:
- * 2010-2013 Thomas White <taw@physics.org>
+ * 2010-2014 Thomas White <taw@physics.org>
* 2011 Richard Kirian
* 2012 Lorenzo Galli
* 2012 Chunhong Yoon
@@ -37,5 +37,5 @@
#include "process_image.h"
extern void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
- int config_basename, FILE *fh, int streamfd,
- int argc, char *argv[], const char *tempdir);
+ int config_basename, FILE *fh, Stream *stream,
+ const char *tempdir);
diff --git a/src/indexamajig.c b/src/indexamajig.c
index 9102ccc5..607e8b0c 100644
--- a/src/indexamajig.c
+++ b/src/indexamajig.c
@@ -178,7 +178,7 @@ int main(int argc, char *argv[])
char *filename = NULL;
char *outfile = NULL;
FILE *fh;
- int ofd;
+ Stream *st;
int config_checkprefix = 1;
int config_basename = 0;
int integrate_saturated = 0;
@@ -627,14 +627,15 @@ int main(int argc, char *argv[])
}
- ofd = open(outfile, O_CREAT | O_TRUNC | O_WRONLY,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
- if ( ofd == -1 ) {
+ st = open_stream_for_write(outfile);
+ if ( st == NULL ) {
ERROR("Failed to open stream '%s'\n", outfile);
return 1;
}
free(outfile);
+ write_command(st, argc, argv);
+
/* Prepare the indexer */
if ( indm != NULL ) {
ipriv = prepare_indexing(indm, iargs.cell, iargs.det,
@@ -653,7 +654,7 @@ int main(int argc, char *argv[])
iargs.ipriv = ipriv;
create_sandbox(&iargs, n_proc, prefix, config_basename, fh,
- ofd, argc, argv, tempdir);
+ st, tempdir);
free(prefix);
free(tempdir);