aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2015-07-11 18:26:51 +0200
committerThomas White <taw@physics.org>2015-07-13 16:00:46 +0200
commit59b0708b8b89e921ee6914e072ff71eb121d7c41 (patch)
tree2930fd7d44756b441cc19f6923a36cfd19aacde1
parentcca5d6e35b0ab653b333424abf819b4a874cf911 (diff)
Clean up semaphore on interrupt
Necessary since we encourage users to interrupt indexamajig with Ctrl+C Also, generally improve the signal handling.
-rw-r--r--src/im-sandbox.c143
1 files changed, 92 insertions, 51 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 6464202c..5e9c4838 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -96,10 +96,6 @@ struct sandbox
};
-/* Horrible global variable for signal handler */
-sem_t *zombie_sem;
-
-
static struct filename_plus_event *get_pattern(FILE *fh, int config_basename,
struct detector *det,
const char *prefix)
@@ -456,8 +452,8 @@ static void try_read(struct sandbox *sb)
int fdmax;
const int ofd = get_stream_fd(sb->stream);
- tv.tv_sec = 5;
- tv.tv_usec = 0;
+ tv.tv_sec = 0;
+ tv.tv_usec = 500000;
FD_ZERO(&fds);
fdmax = 0;
@@ -523,21 +519,31 @@ static void start_worker_process(struct sandbox *sb, int slot)
size_t ll;
int i;
- /* First, disconnect the signal handler */
- sa.sa_flags = 0;
- sigemptyset(&sa.sa_mask);
- sa.sa_handler = SIG_DFL;
- r = sigaction(SIGCHLD, &sa, NULL);
- if ( r == -1 ) {
+ /* First, disconnect the signal handlers */
+ sa.sa_flags = 0;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_handler = SIG_DFL;
+ r = sigaction(SIGCHLD, &sa, NULL);
+ if ( r == -1 ) {
ERROR("Failed to set signal handler!\n");
- return;
- }
+ exit(1);
+ }
+ r = sigaction(SIGINT, &sa, NULL);
+ if ( r == -1 ) {
+ ERROR("Failed to set signal handler!\n");
+ exit(1);
+ }
+ r = sigaction(SIGQUIT, &sa, NULL);
+ if ( r == -1 ) {
+ ERROR("Failed to set signal handler!\n");
+ exit(1);
+ }
ll = 64 + strlen(sb->tmpdir);
tmp = malloc(ll);
if ( tmp == NULL ) {
ERROR("Failed to allocate temporary dir\n");
- return;
+ exit(1);
}
snprintf(tmp, 63, "%s/worker.%i", sb->tmpdir, slot);
@@ -545,14 +551,14 @@ static void start_worker_process(struct sandbox *sb, int slot)
if ( stat(tmp, &s) == -1 ) {
if ( errno != ENOENT ) {
ERROR("Failed to stat temporary folder.\n");
- return;
+ exit(1);
}
r = mkdir(tmp, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
if ( r ) {
ERROR("Failed to create temporary folder: %s\n",
strerror(errno));
- return;
+ exit(1);
}
}
@@ -596,13 +602,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
}
-static void signal_handler(int sig, siginfo_t *si, void *uc_v)
-{
- sem_post(zombie_sem);
-}
-
-
-static void handle_zombie(struct sandbox *sb)
+static void handle_zombie(struct sandbox *sb, int respawn)
{
int i;
@@ -628,11 +628,15 @@ static void handle_zombie(struct sandbox *sb)
}
if ( WIFSIGNALED(status) ) {
+
+ if ( (WTERMSIG(status) == SIGINT)
+ || (WTERMSIG(status) == SIGQUIT) ) continue;
+
STATUS("Worker %i was killed by signal %i\n",
i, WTERMSIG(status));
STATUS("Event ID was: %s\n",
sb->shared->last_ev[i]);
- start_worker_process(sb, i);
+ if ( respawn ) start_worker_process(sb, i);
}
}
@@ -715,19 +719,47 @@ static int fill_queue(FILE *fh, int config_basename, struct detector *det,
return 0;
}
+volatile sig_atomic_t at_zombies = 0;
+volatile sig_atomic_t at_interrupt = 0;
+
+static void sigchld_handler(int sig, siginfo_t *si, void *uc_v)
+{
+ at_zombies = 1;
+}
+
+
+static void sigint_handler(int sig, siginfo_t *si, void *uc_v)
+{
+ at_interrupt = 1;
+}
+
+
+static void check_signals(struct sandbox *sb, const char *semname_q,
+ int respawn)
+{
+ if ( at_zombies ) {
+ at_zombies = 0;
+ handle_zombie(sb, respawn);
+ }
+
+ if ( at_interrupt ) {
+ sem_unlink(semname_q);
+ exit(0);
+ }
+}
+
void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int config_basename, FILE *fh,
Stream *stream, const char *tempdir)
{
int i;
- struct sigaction sa;
- int r;
struct sandbox *sb;
size_t ll;
struct stat s;
char semname_q[64];
- char semname_z[64];
+ struct sigaction sa;
+ int r;
int allDone = 0;
if ( n_proc > MAX_NUM_WORKERS ) {
@@ -772,14 +804,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
ERROR("Failed to create semaphore: %s\n", strerror(errno));
return;
}
- snprintf(semname_z, 64, "indexamajig-z%i", getpid());
- zombie_sem = sem_open(semname_z, O_CREAT | O_EXCL,
- S_IRUSR | S_IWUSR, 0);
- if ( zombie_sem == SEM_FAILED ) {
- ERROR("Failed to create zombie semaphore: %s\n",
- strerror(errno));
- return;
- }
sb->pids = calloc(n_proc, sizeof(pid_t));
sb->running = calloc(n_proc, sizeof(int));
@@ -792,16 +816,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
return;
}
- /* Set up signal handler to take action if any children die */
- sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP;
- sigemptyset(&sa.sa_mask);
- sa.sa_sigaction = signal_handler;
- r = sigaction(SIGCHLD, &sa, NULL);
- if ( r == -1 ) {
- ERROR("Failed to set signal handler!\n");
- return;
- }
-
if ( tempdir == NULL ) {
tempdir = "";
}
@@ -844,15 +858,38 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
start_worker_process(sb, i);
}
+ /* Set up signal handler to take action if any children die */
+ sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_sigaction = sigchld_handler;
+ r = sigaction(SIGCHLD, &sa, NULL);
+ if ( r == -1 ) {
+ ERROR("Failed to set signal handler!\n");
+ return;
+ }
+
+ /* Set up signal handler to clean up semaphore on exit */
+ sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_sigaction = sigint_handler;
+ r = sigaction(SIGINT, &sa, NULL);
+ if ( r == -1 ) {
+ ERROR("Failed to set signal handler!\n");
+ return;
+ }
+ r = sigaction(SIGQUIT, &sa, NULL);
+ if ( r == -1 ) {
+ ERROR("Failed to set signal handler!\n");
+ return;
+ }
+
do {
int r;
double tNow;
try_read(sb);
-
- /* Check for dead workers */
- if ( sem_trywait(zombie_sem) == 0 ) handle_zombie(sb);
+ check_signals(sb, semname_q, 1);
/* Top up the queue if necessary */
r = 0;
@@ -900,6 +937,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
pthread_mutex_unlock(&sb->shared->queue_lock);
} while ( !allDone );
+
fclose(fh);
/* Indicate to the workers that we are finished, and wake them up one
@@ -915,9 +953,14 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
int status;
while ( waitpid(sb->pids[i], &status, WNOHANG) == 0 ) {
try_read(sb);
+ check_signals(sb, semname_q, 0);
}
+ /* If this worker died and got waited by the zombie handler,
+ * waitpid() returns -1 and the loop still exits. */
}
+ sem_unlink(semname_q);
+
for ( i=0; i<sb->n_read; i++ ) {
fclose(sb->fhs[i]);
}
@@ -933,8 +976,6 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
100.0 * sb->shared->n_hadcrystals / sb->shared->n_processed,
sb->shared->n_crystals);
- sem_unlink(semname_q);
- sem_unlink(semname_z);
munmap(sb->shared, sizeof(struct sb_shm));
free(sb);
}