aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas White <taw@physics.org>2015-07-11 10:16:56 +0200
committerThomas White <taw@physics.org>2015-07-13 16:00:16 +0200
commitcca5d6e35b0ab653b333424abf819b4a874cf911 (patch)
tree711984cb36323f8cc250f09bdf8b4a8ecf610af6
parent65d5d478b4288b26a455e260ee9bc153a1789f29 (diff)
Use named semaphores instead of unnamed
-rw-r--r--src/im-sandbox.c68
-rw-r--r--src/im-sandbox.h1
2 files changed, 48 insertions, 21 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c
index 1b3dbfcd..6464202c 100644
--- a/src/im-sandbox.c
+++ b/src/im-sandbox.c
@@ -82,6 +82,7 @@ struct sandbox
int serial;
struct sb_shm *shared;
+ sem_t *queue_sem;
char *tmpdir;
@@ -96,7 +97,7 @@ struct sandbox
/* Horrible global variable for signal handler */
-sem_t zombie_sem;
+sem_t *zombie_sem;
static struct filename_plus_event *get_pattern(FILE *fh, int config_basename,
@@ -246,7 +247,7 @@ static void shuffle_events(struct sb_shm *sb_shared)
static void run_work(const struct index_args *iargs, Stream *st,
- int cookie, const char *tmpdir, struct sb_shm *sb_shared)
+ int cookie, const char *tmpdir, struct sandbox *sb)
{
int allDone = 0;
@@ -260,25 +261,35 @@ static void run_work(const struct index_args *iargs, Stream *st,
int r;
/* Wait until an event is ready */
- sem_wait(&sb_shared->queue_sem);
+ if ( sem_wait(sb->queue_sem) != 0 ) {
+ ERROR("Failed to wait on queue semaphore: %s\n",
+ strerror(errno));
+ }
/* Get the event from the queue */
- pthread_mutex_lock(&sb_shared->queue_lock);
- if ( sb_shared->no_more ) {
- pthread_mutex_unlock(&sb_shared->queue_lock);
+ pthread_mutex_lock(&sb->shared->queue_lock);
+ if ( sb->shared->no_more ) {
+ pthread_mutex_unlock(&sb->shared->queue_lock);
+ allDone = 1;
+ continue;
+ }
+ if ( sb->shared->n_events == 0 ) {
+ ERROR("Got the semaphore, but no events in queue!\n");
+ ERROR("no_more = %i\n", sb->shared->no_more);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
allDone = 1;
continue;
}
- r = sscanf(sb_shared->queue[0], "%s %s %i",
+ r = sscanf(sb->shared->queue[0], "%s %s %i",
filename, event_str, &ser);
if ( r != 3 ) {
STATUS("Invalid event string '%s'\n",
- sb_shared->queue[0]);
+ sb->shared->queue[0]);
}
- memcpy(sb_shared->last_ev[cookie], sb_shared->queue[0],
+ memcpy(sb->shared->last_ev[cookie], sb->shared->queue[0],
MAX_EV_LEN);
- shuffle_events(sb_shared);
- pthread_mutex_unlock(&sb_shared->queue_lock);
+ shuffle_events(sb->shared);
+ pthread_mutex_unlock(&sb->shared->queue_lock);
if ( r != 3 ) continue;
@@ -301,7 +312,7 @@ static void run_work(const struct index_args *iargs, Stream *st,
}
process_image(iargs, &pargs, st, cookie, tmpdir, ser,
- sb_shared);
+ sb->shared);
free_filename_plus_event(pargs.filename_p_e);
@@ -562,7 +573,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
*/
st = open_stream_fd_for_write(stream_pipe[1]);
- run_work(sb->iargs, st, slot, tmp, sb->shared);
+ run_work(sb->iargs, st, slot, tmp, sb);
close_stream(st);
free(tmp);
@@ -587,7 +598,7 @@ static void start_worker_process(struct sandbox *sb, int slot)
static void signal_handler(int sig, siginfo_t *si, void *uc_v)
{
- sem_post(&zombie_sem);
+ sem_post(zombie_sem);
}
@@ -697,7 +708,7 @@ static int fill_queue(FILE *fh, int config_basename, struct detector *det,
maybe_get_event_string(ne->ev), sb->serial++);
memcpy(sb->shared->queue[sb->shared->n_events++], ev_string,
MAX_EV_LEN);
- sem_post(&sb->shared->queue_sem);
+ sem_post(sb->queue_sem);
free_filename_plus_event(ne);
}
@@ -715,6 +726,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
struct sandbox *sb;
size_t ll;
struct stat s;
+ char semname_q[64];
+ char semname_z[64];
int allDone = 0;
if ( n_proc > MAX_NUM_WORKERS ) {
@@ -751,8 +764,22 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->shared->n_hadcrystals = 0;
sb->shared->n_crystals = 0;
- sem_init(&sb->shared->queue_sem, 1, 0);
- sem_init(&zombie_sem, 0, 0);
+ /* Set up semaphore to control work queue */
+ snprintf(semname_q, 64, "indexamajig-q%i", getpid());
+ sb->queue_sem = sem_open(semname_q, O_CREAT | O_EXCL,
+ S_IRUSR | S_IWUSR, 0);
+ if ( sb->queue_sem == SEM_FAILED ) {
+ ERROR("Failed to create semaphore: %s\n", strerror(errno));
+ return;
+ }
+ snprintf(semname_z, 64, "indexamajig-z%i", getpid());
+ zombie_sem = sem_open(semname_z, O_CREAT | O_EXCL,
+ S_IRUSR | S_IWUSR, 0);
+ if ( zombie_sem == SEM_FAILED ) {
+ ERROR("Failed to create zombie semaphore: %s\n",
+ strerror(errno));
+ return;
+ }
sb->pids = calloc(n_proc, sizeof(pid_t));
sb->running = calloc(n_proc, sizeof(int));
@@ -823,10 +850,9 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
double tNow;
try_read(sb);
- sleep(5);
/* Check for dead workers */
- if ( sem_trywait(&zombie_sem) == 0 ) handle_zombie(sb);
+ if ( sem_trywait(zombie_sem) == 0 ) handle_zombie(sb);
/* Top up the queue if necessary */
r = 0;
@@ -883,7 +909,7 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
sb->shared->no_more = 1;
pthread_mutex_unlock(&sb->shared->queue_lock);
for ( i=0; i<n_proc; i++ ) {
- sem_post(&sb->shared->queue_sem);
+ sem_post(sb->queue_sem);
}
for ( i=0; i<n_proc; i++ ) {
int status;
@@ -907,6 +933,8 @@ void create_sandbox(struct index_args *iargs, int n_proc, char *prefix,
100.0 * sb->shared->n_hadcrystals / sb->shared->n_processed,
sb->shared->n_crystals);
+ sem_unlink(semname_q);
+ sem_unlink(semname_z);
munmap(sb->shared, sizeof(struct sb_shm));
free(sb);
}
diff --git a/src/im-sandbox.h b/src/im-sandbox.h
index 67bc88c8..b39f9566 100644
--- a/src/im-sandbox.h
+++ b/src/im-sandbox.h
@@ -61,7 +61,6 @@ struct sb_shm
char queue[QUEUE_SIZE][MAX_EV_LEN];
int no_more;
char last_ev[MAX_NUM_WORKERS][MAX_EV_LEN];
- sem_t queue_sem;
pthread_mutex_t totals_lock;
int n_processed;