aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2021-03-03 17:12:07 +0100
committerThomas White <taw@physics.org>2021-03-03 18:50:28 +0100
commit65a1afdce2da5cd92f6907f6e517b9ec4280cdd5 (patch)
tree39ec35043db5cebd86f06ccca1110e524d2a68dc
parentdefd159f19585aad359bf5dfbd9f1ff9dc2831fc (diff)
GUI: Submit indexing jobs as job arrays, not individual jobs
-rw-r--r--src/gui_backend_local.c1
-rw-r--r--src/gui_backend_slurm.c254
-rw-r--r--src/gui_index.c17
-rw-r--r--src/gui_index.h8
4 files changed, 83 insertions, 197 deletions
diff --git a/src/gui_backend_local.c b/src/gui_backend_local.c
index f1778c43..bc5d3ad0 100644
--- a/src/gui_backend_local.c
+++ b/src/gui_backend_local.c
@@ -527,6 +527,7 @@ static void *run_indexing(const char *job_title,
n_thread_str,
"files.lst",
"crystfel.stream",
+ 1,
&proj->peak_search_params,
&proj->indexing_params) )
{
diff --git a/src/gui_backend_slurm.c b/src/gui_backend_slurm.c
index ca4cada5..175a3688 100644
--- a/src/gui_backend_slurm.c
+++ b/src/gui_backend_slurm.c
@@ -74,15 +74,13 @@ struct slurm_job
{
enum gui_job_type type;
GFile *workdir;
+ uint32_t job_id;
- /* For indexing job (don't worry - will be replaced soon!) */
+ /* For indexing job */
int n_frames;
int n_blocks;
- uint32_t *job_ids;
- char **stderr_filenames;
/* For merging/ambigator job */
- uint32_t job_id;
char *stderr_filename;
int niter;
};
@@ -123,6 +121,7 @@ static int job_running(uint32_t job_id)
static double indexing_progress(struct slurm_job *job, int *running)
{
+#if 0
if ( job->n_blocks > 15 ) {
/* Fast path for larger number of sub-jobs */
@@ -167,6 +166,8 @@ static double indexing_progress(struct slurm_job *job, int *running)
return (double)n_proc / job->n_frames;
}
+#endif
+ return 0.5;
}
@@ -204,22 +205,10 @@ static int get_task_status(void *job_priv,
static void cancel_task(void *job_priv)
{
- int i;
struct slurm_job *job = job_priv;
- if ( job->type == GUI_JOB_INDEXING ) {
- for ( i=0; i<job->n_blocks; i++) {
- if ( job->job_ids[i] == 0 ) continue;
- STATUS("Stopping SLURM job %i\n", job->job_ids[i]);
- if ( slurm_kill_job(job->job_ids[i], SIGINT, 0) ) {
- ERROR("Couldn't stop job: %s\n",
- slurm_strerror(slurm_get_errno()));
- }
- }
- } else {
- if ( slurm_kill_job(job->job_id, SIGINT, 0) ) {
- ERROR("Couldn't stop job: %s\n",
- slurm_strerror(slurm_get_errno()));
- }
+ if ( slurm_kill_job(job->job_id, SIGINT, 0) ) {
+ ERROR("Couldn't stop job: %s\n",
+ slurm_strerror(slurm_get_errno()));
}
}
@@ -428,94 +417,13 @@ static void write_common_opts(FILE *fh,
}
-static uint32_t submit_batch_job(const char *geom_filename,
- const char *file_list,
- const char *stream_filename,
- struct slurm_common_opts *opts,
- char **env,
- int n_env,
- const char *job_name,
- const char *workdir,
- const char *stderr_file,
- const char *stdout_file,
- struct peak_params *peak_search_params,
- struct index_params *indexing_params)
-
-{
- job_desc_msg_t job_desc_msg;
- submit_response_msg_t *resp;
- char **cmdline;
- char *cmdline_all;
- char *script;
- int job_id;
- int r;
-
- cmdline = indexamajig_command_line(geom_filename,
- "`nproc`",
- file_list,
- stream_filename,
- peak_search_params,
- indexing_params);
-
- cmdline_all = g_strjoinv(" ", cmdline);
-
- script = malloc(strlen(cmdline_all)+16);
- if ( script == NULL ) return 0;
-
- strcpy(script, "#!/bin/sh\n");
- strcat(script, cmdline_all);
- g_free(cmdline_all);
-
- slurm_init_job_desc_msg(&job_desc_msg);
- job_desc_msg.user_id = getuid();
- job_desc_msg.group_id = getgid();
- job_desc_msg.mail_user = safe_strdup(opts->email_address);
- job_desc_msg.mail_type = MAIL_JOB_FAIL;
- job_desc_msg.comment = "Submitted via CrystFEL GUI";
- job_desc_msg.shared = 0;
- job_desc_msg.time_limit = 60;
- job_desc_msg.partition = safe_strdup(opts->partition);
- job_desc_msg.min_nodes = 1;
- job_desc_msg.max_nodes = 1;
- job_desc_msg.name = safe_strdup(job_name);
- job_desc_msg.std_err = strdup(stderr_file);
- job_desc_msg.std_out = strdup(stdout_file);
- job_desc_msg.work_dir = strdup(workdir);
- job_desc_msg.script = script;
- job_desc_msg.environment = env;
- job_desc_msg.env_size = n_env;
- job_desc_msg.features = safe_strdup(opts->constraint);
- job_desc_msg.account = safe_strdup(opts->account);
-
- r = slurm_submit_batch_job(&job_desc_msg, &resp);
- if ( r ) {
- ERROR("Couldn't submit job: %i\n", errno);
- return 0;
- }
-
- free(job_desc_msg.mail_user);
- free(job_desc_msg.partition);
- free(job_desc_msg.name);
- free(job_desc_msg.work_dir);
- free(job_desc_msg.std_err);
- free(job_desc_msg.std_out);
- free(job_desc_msg.features);
- free(job_desc_msg.account);
-
- job_id = resp->job_id;
- slurm_free_submit_response_response_msg(resp);
-
- return job_id;
-}
-
-
-/* For submitting a single script to the SLURM cluster.
- * Used for merging and ambigator, but not for indexing - that needs something
- * more sophisticated. */
static struct slurm_job *start_slurm_job(enum gui_job_type type,
const char *script_filename,
const char *jobname,
+ const char *array_inx,
GFile *workdir,
+ const char *stdout_filename,
+ const char *stderr_filename,
struct slurm_common_opts *opts)
{
char **env;
@@ -549,14 +457,15 @@ static struct slurm_job *start_slurm_job(enum gui_job_type type,
job_desc_msg.min_nodes = 1;
job_desc_msg.max_nodes = 1;
job_desc_msg.name = safe_strdup(jobname);
- job_desc_msg.std_err = strdup("stderr.log");
- job_desc_msg.std_out = strdup("stdout.log");
+ job_desc_msg.std_err = strdup(stderr_filename);
+ job_desc_msg.std_out = strdup(stdout_filename);
job_desc_msg.work_dir = g_file_get_path(workdir);
job_desc_msg.script = script;
job_desc_msg.environment = env;
job_desc_msg.env_size = n_env;
job_desc_msg.features = safe_strdup(opts->constraint);
job_desc_msg.account = safe_strdup(opts->account);
+ job_desc_msg.array_inx = safe_strdup(array_inx);
r = slurm_submit_batch_job(&job_desc_msg, &resp);
if ( r ) {
@@ -628,107 +537,85 @@ static void *run_indexing(const char *job_title,
{
struct slurm_indexing_opts *opts = opts_priv;
struct slurm_job *job;
- char **env;
- int n_env;
int i;
- int fail = 0;
char **streams;
- GFile *workdir_gfile;
-
- workdir_gfile = make_job_folder(job_title, job_notes);
- if ( workdir_gfile == NULL ) return NULL;
-
- env = create_env(&n_env, opts->path_add);
+ GFile *workdir;
+ GFile *sc_gfile;
+ char *sc_filename;
+ int n_blocks;
+ char array_inx[128];
- job = malloc(sizeof(struct slurm_job));
- if ( job == NULL ) return 0;
+ workdir = make_job_folder(job_title, job_notes);
+ if ( workdir == NULL ) return NULL;
- job->type = GUI_JOB_INDEXING;
- job->n_frames = proj->n_frames;
- job->n_blocks = proj->n_frames / opts->block_size;
- if ( proj->n_frames % opts->block_size ) job->n_blocks++;
+ n_blocks = proj->n_frames / opts->block_size;
+ if ( proj->n_frames % opts->block_size ) n_blocks++;
STATUS("Splitting job into %i blocks of max %i frames\n",
- job->n_blocks, opts->block_size);
-
- job->job_ids = malloc(job->n_blocks * sizeof(uint32_t));
- if ( job->job_ids == NULL ) return NULL;
+ n_blocks, opts->block_size);
- job->stderr_filenames = malloc(job->n_blocks * sizeof(char *));
- if ( job->stderr_filenames == NULL ) return NULL;
-
- streams = malloc(job->n_blocks*sizeof(char *));
+ streams = malloc(n_blocks*sizeof(char *));
if ( streams == NULL ) return NULL;
- for ( i=0; i<job->n_blocks; i++ ) {
+ for ( i=0; i<n_blocks; i++ ) {
- char job_name[128];
char file_list[128];
char stream_filename[128];
- char stderr_file[128];
- char stdout_file[128];
- int job_id;
- GFile *stderr_gfile;
GFile *stream_gfile;
- snprintf(job_name, 127, "%s-%i", job_title, i);
+ /* Create (sub-)list of files */
snprintf(file_list, 127, "files-%i.lst", i);
- snprintf(stream_filename, 127,
- "crystfel-%i.stream", i);
- snprintf(stderr_file, 127, "stderr-%i.log", i);
- snprintf(stdout_file, 127, "stdout-%i.log", i);
-
- write_partial_file_list(workdir_gfile, file_list,
- i, opts->block_size,
+ write_partial_file_list(workdir,
+ file_list,
+ i,
+ opts->block_size,
proj->filenames,
proj->events,
proj->n_frames);
- job_id = submit_batch_job(proj->geom_filename,
- file_list,
- stream_filename,
- &opts->common,
- env,
- n_env,
- job_name,
- g_file_get_path(workdir_gfile),
- stderr_file,
- stdout_file,
- &proj->peak_search_params,
- &proj->indexing_params);
-
- if ( job_id == 0 ) {
- fail = 1;
- break;
- }
-
- job->job_ids[i] = job_id;
-
- stderr_gfile = g_file_get_child(workdir_gfile,
- stderr_file);
- job->stderr_filenames[i] = g_file_get_path(stderr_gfile);
- g_object_unref(stderr_gfile);
-
- stream_gfile = g_file_get_child(workdir_gfile,
+ /* Work out the stream filename */
+ snprintf(stream_filename, 127, "crystfel-%i.stream", i);
+ stream_gfile = g_file_get_child(workdir,
stream_filename);
streams[i] = g_file_get_path(stream_gfile);
g_object_unref(stream_gfile);
-
- STATUS("Submitted SLURM job ID %i\n", job_id);
}
- for ( i=0; i<n_env; i++ ) free(env[i]);
- free(env);
- g_object_unref(workdir_gfile);
+ sc_gfile = g_file_get_child(workdir, "run_indexamajig.sh");
+ sc_filename = g_file_get_path(sc_gfile);
+ g_object_unref(sc_gfile);
+ if ( sc_filename == NULL ) return NULL;
- if ( fail ) {
- free(job->job_ids);
- free(job->stderr_filenames);
- free(job);
- return NULL;
+ snprintf(array_inx, 127, "0-%i", n_blocks-1);
+
+ if ( !write_indexamajig_script(sc_filename,
+ proj->geom_filename,
+ "`nproc`",
+ "files-${SLURM_ARRAY_TASK_ID}.lst",
+ "crystfel-${SLURM_ARRAY_TASK_ID}.stream",
+ 0,
+ &proj->peak_search_params,
+ &proj->indexing_params) )
+ {
+ job = start_slurm_job(GUI_JOB_INDEXING,
+ sc_filename,
+ job_title,
+ array_inx,
+ workdir,
+ "stdout-%a.log",
+ "stderr-%a.log",
+ &opts->common);
+ job->n_frames = proj->n_frames;
+ job->n_blocks = n_blocks;
} else {
- add_indexing_result(proj, strdup(job_title),
- streams, job->n_blocks);
+ job = NULL;
}
+ g_free(sc_filename);
+
+ if ( job != NULL ) {
+ add_indexing_result(proj, strdup(job_title), streams, n_blocks);
+ }
+
+ g_object_unref(workdir);
return job;
}
@@ -924,8 +811,8 @@ static void *run_ambi(const char *job_title,
&proj->ambi_params, stream_str) )
{
job = start_slurm_job(GUI_JOB_AMBIGATOR,
- sc_filename, job_title, workdir,
- &opts->common);
+ sc_filename, job_title, NULL, workdir,
+ "stdout.log", "stderr.log", &opts->common);
job->niter = proj->ambi_params.niter;
} else {
job = NULL;
@@ -978,7 +865,8 @@ static void *run_merging(const char *job_title,
} else {
type = GUI_JOB_PARTIALATOR;
}
- job = start_slurm_job(type, sc_filename, job_title, workdir,
+ job = start_slurm_job(type, sc_filename, job_title, NULL,
+ workdir, "stdout.log", "stderr.log",
&opts->common);
} else {
job = NULL;
diff --git a/src/gui_index.c b/src/gui_index.c
index 450030d2..589e0281 100644
--- a/src/gui_index.c
+++ b/src/gui_index.c
@@ -667,12 +667,12 @@ static void add_arg_string(char **args, int pos, const char *label,
}
-char **indexamajig_command_line(const char *geom_filename,
- const char *n_thread_str,
- const char *files_list,
- const char *stream_filename,
- struct peak_params *peak_search_params,
- struct index_params *indexing_params)
+static char **indexamajig_command_line(const char *geom_filename,
+ const char *n_thread_str,
+ const char *files_list,
+ const char *stream_filename,
+ struct peak_params *peak_search_params,
+ struct index_params *indexing_params)
{
char **args;
char tols[2048];
@@ -828,6 +828,7 @@ int write_indexamajig_script(const char *script_filename,
const char *n_thread_str,
const char *files_list,
const char *stream_filename,
+ int redirect_output,
struct peak_params *peak_search_params,
struct index_params *indexing_params)
{
@@ -855,7 +856,9 @@ int write_indexamajig_script(const char *script_filename,
i++;
};
free(cmdline);
- fprintf(fh, ">stdout.log 2>stderr.log\n");
+ if ( redirect_output ) {
+ fprintf(fh, ">stdout.log 2>stderr.log\n");
+ }
fclose(fh);
return 0;
diff --git a/src/gui_index.h b/src/gui_index.h
index 4a1ef1e9..e0f42aad 100644
--- a/src/gui_index.h
+++ b/src/gui_index.h
@@ -41,13 +41,6 @@ extern gint index_all_sig(GtkWidget *widget,
extern void cell_explorer_sig(struct crystfelproject *proj);
-extern char **indexamajig_command_line(const char *geom_filename,
- const char *n_thread_str,
- const char *files_list,
- const char *stream_filename,
- struct peak_params *peak_search_params,
- struct index_params *indexing_params);
-
extern int read_number_processed(const char *filename);
extern int write_indexamajig_script(const char *script_filename,
@@ -55,6 +48,7 @@ extern int write_indexamajig_script(const char *script_filename,
const char *n_thread_str,
const char *files_list,
const char *stream_filename,
+ int redirect_output,
struct peak_params *peak_search_params,
struct index_params *indexing_params);