From b3892119c0879181fe74b108e1f9e089192cfa21 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 3 Dec 2020 18:07:19 +0100 Subject: SLURM BE: Submit merge job --- src/gui_backend_slurm.c | 154 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 128 insertions(+), 26 deletions(-) diff --git a/src/gui_backend_slurm.c b/src/gui_backend_slurm.c index 7c19f426..f6f2e8e2 100644 --- a/src/gui_backend_slurm.c +++ b/src/gui_backend_slurm.c @@ -38,6 +38,7 @@ #include "gui_project.h" #include "gui_index.h" +#include "gui_merge.h" #include "crystfel_gui.h" @@ -56,14 +57,25 @@ struct slurm_merging_opts char *email_address; }; +enum slurm_job_type +{ + SLURM_JOB_INDEXING, + SLURM_JOB_MERGING, + SLURM_JOB_AMBIGATOR, +}; struct slurm_job { - double frac_complete; + enum slurm_job_type type; + + /* For indexing job (don't worry - will be replaced soon!) */ int n_frames; int n_blocks; uint32_t *job_ids; char **stderr_filenames; + + /* For merging/ambigator job */ + uint32_t job_id; }; @@ -135,21 +147,28 @@ static int get_task_status(void *job_priv, int n_proc = 0; int all_complete = 1; - for ( i=0; in_blocks; i++ ) { + if ( job->type == SLURM_JOB_INDEXING ) { + for ( i=0; in_blocks; i++ ) { - n_proc += read_number_processed(job->stderr_filenames[i]); + n_proc += read_number_processed(job->stderr_filenames[i]); - if ( job->job_ids[i] == 0 ) continue; + if ( job->job_ids[i] == 0 ) continue; - if ( !job_running(job->job_ids[i]) ) { - job->job_ids[i] = 0; - } else { - all_complete = 0; + if ( !job_running(job->job_ids[i]) ) { + job->job_ids[i] = 0; + } else { + all_complete = 0; + } } + + *frac_complete = (double)n_proc / job->n_frames; + *running = 1 - all_complete; + + } else { + *frac_complete = 0.5; + *running = 1; } - *frac_complete = (double)n_proc / job->n_frames; - *running = 1 - all_complete; return 0; } @@ -324,13 +343,9 @@ static void write_partial_file_list(GFile *workdir, } -static void *run_indexing(const char *job_title, - const char *job_notes, - struct crystfelproject *proj, - void *opts_priv) +static char *make_workdir(const char *job_title, + const char *job_notes) { - struct slurm_indexing_opts *opts = opts_priv; - struct slurm_job *job; char *workdir; struct stat s; GFile *cwd_file; @@ -338,11 +353,6 @@ static void *run_indexing(const char *job_title, GFile *workdir_file; char *notes_path; FILE *fh; - char **env; - int n_env; - int i; - int fail = 0; - char **streams; workdir = strdup(job_title); if ( workdir == NULL ) return NULL; @@ -372,12 +382,35 @@ static void *run_indexing(const char *job_title, g_object_unref(notes_file); workdir = g_file_get_path(workdir_file); + return workdir; +} + + +static void *run_indexing(const char *job_title, + const char *job_notes, + struct crystfelproject *proj, + void *opts_priv) +{ + struct slurm_indexing_opts *opts = opts_priv; + struct slurm_job *job; + char **env; + int n_env; + int i; + int fail = 0; + char **streams; + char *workdir; + GFile *workdir_gfile; + + workdir = make_workdir(job_title, job_notes); + if ( workdir == NULL ) return NULL; + workdir_gfile = g_file_new_for_path(workdir); env = create_env(&n_env, opts->path_add); job = malloc(sizeof(struct slurm_job)); if ( job == NULL ) return 0; + job->type = SLURM_JOB_INDEXING; job->n_frames = proj->n_frames; job->n_blocks = proj->n_frames / opts->block_size; if ( proj->n_frames % opts->block_size ) job->n_blocks++; @@ -411,7 +444,7 @@ static void *run_indexing(const char *job_title, snprintf(stderr_file, 127, "stderr-%i.log", i); snprintf(stdout_file, 127, "stdout-%i.log", i); - write_partial_file_list(workdir_file, file_list, + write_partial_file_list(workdir_gfile, file_list, i, opts->block_size, proj->filenames, proj->events, @@ -438,12 +471,12 @@ static void *run_indexing(const char *job_title, job->job_ids[i] = job_id; - stderr_gfile = g_file_get_child(workdir_file, + stderr_gfile = g_file_get_child(workdir_gfile, stderr_file); job->stderr_filenames[i] = g_file_get_path(stderr_gfile); g_object_unref(stderr_gfile); - stream_gfile = g_file_get_child(workdir_file, + stream_gfile = g_file_get_child(workdir_gfile, stream_filename); streams[i] = g_file_get_path(stream_gfile); g_object_unref(stream_gfile); @@ -454,7 +487,7 @@ static void *run_indexing(const char *job_title, for ( i=0; ijob_ids); @@ -697,7 +730,76 @@ static void *run_merging(const char *job_title, struct gui_result *input, void *opts_priv) { - return NULL; + struct slurm_job *job; + job_desc_msg_t job_desc_msg; + submit_response_msg_t *resp; + char **cmdline; + char *cmdline_all; + char *script; + char **env; + int n_env; + struct slurm_merging_opts *opts = opts_priv; + char *workdir; + int r; + + workdir = make_workdir(job_title, job_notes); + if ( workdir == NULL ) return NULL; + + job = malloc(sizeof(struct slurm_job)); + if ( job == NULL ) return NULL; + + cmdline = merging_command_line("`nproc`", + input, + &proj->merging_params); + + cmdline_all = g_strjoinv(" ", cmdline); + + script = malloc(strlen(cmdline_all)+16); + if ( script == NULL ) return 0; + + strcpy(script, "#!/bin/sh\n"); + strcat(script, cmdline_all); + g_free(cmdline_all); + + env = create_env(&n_env, NULL); + + slurm_init_job_desc_msg(&job_desc_msg); + job_desc_msg.user_id = getuid(); + job_desc_msg.group_id = getgid(); + job_desc_msg.mail_user = safe_strdup(opts->email_address); + job_desc_msg.mail_type = MAIL_JOB_FAIL; + job_desc_msg.comment = "Submitted via CrystFEL GUI"; + job_desc_msg.shared = 0; + job_desc_msg.time_limit = 60; + job_desc_msg.partition = safe_strdup(opts->partition); + job_desc_msg.min_nodes = 1; + job_desc_msg.max_nodes = 1; + job_desc_msg.name = safe_strdup(job_title); + job_desc_msg.std_err = strdup("stderr.log"); + job_desc_msg.std_out = strdup("stdout.log"); + job_desc_msg.work_dir = strdup(workdir); + job_desc_msg.script = script; + job_desc_msg.environment = env; + job_desc_msg.env_size = n_env; + + r = slurm_submit_batch_job(&job_desc_msg, &resp); + if ( r ) { + ERROR("Couldn't submit job: %i\n", errno); + return 0; + } + + free(job_desc_msg.mail_user); + free(job_desc_msg.partition); + free(job_desc_msg.name); + free(job_desc_msg.work_dir); + free(job_desc_msg.std_err); + free(job_desc_msg.std_out); + + job->job_id = resp->job_id; + job->type = SLURM_JOB_MERGING; + slurm_free_submit_response_response_msg(resp); + + return job; } -- cgit v1.2.3