diff options
Diffstat (limited to 'src/im-sandbox.c')
-rw-r--r-- | src/im-sandbox.c | 46 |
1 files changed, 14 insertions, 32 deletions
diff --git a/src/im-sandbox.c b/src/im-sandbox.c index 501efb45..bd3f47e7 100644 --- a/src/im-sandbox.c +++ b/src/im-sandbox.c @@ -60,8 +60,8 @@ #include "im-sandbox.h" #include "process_image.h" -#include "time-accounts.h" #include "im-zmq.h" +#include "profile.h" struct sandbox @@ -331,6 +331,10 @@ static int run_work(const struct index_args *iargs, Stream *st, int allDone = 0; struct im_zmq *zmqstuff = NULL; + if ( sb->profile ) { + profile_init(); + } + /* Connect via ZMQ */ if ( sb->zmq ) { zmqstuff = im_zmq_connect(sb->zmq_address, @@ -345,7 +349,6 @@ static int run_work(const struct index_args *iargs, Stream *st, while ( !allDone ) { - TimeAccounts *taccs; struct pattern_args pargs; int ser; char *line; @@ -355,16 +358,15 @@ static int run_work(const struct index_args *iargs, Stream *st, char *ser_str = NULL; int ok = 1; - taccs = time_accounts_init(); - /* Wait until an event is ready */ - time_accounts_set(taccs, TACC_EVENTWAIT); sb->shared->pings[cookie]++; set_last_task(sb->shared->last_task[cookie], "wait_event"); + profile_start("wait-queue-semaphore"); if ( sem_wait(sb->queue_sem) != 0 ) { ERROR("Failed to wait on queue semaphore: %s\n", strerror(errno)); } + profile_end("wait-queue-semaphore"); /* Get the event from the queue */ set_last_task(sb->shared->last_task[cookie], "read_queue"); @@ -451,19 +453,18 @@ static int run_work(const struct index_args *iargs, Stream *st, } sb->shared->time_last_start[cookie] = get_monotonic_seconds(); + profile_start("process-image"); process_image(iargs, &pargs, st, cookie, tmpdir, ser, - sb->shared, taccs, sb->shared->last_task[cookie]); + sb->shared, sb->shared->last_task[cookie]); + profile_end("process-image"); /* pargs.zmq_data will be copied into the image structure, so * that it can be queried for "header" values etc. It will * eventually be freed by image_free() under process_image() */ if ( sb->profile ) { - pthread_mutex_lock(&sb->shared->term_lock); - time_accounts_print_short(taccs); - pthread_mutex_unlock(&sb->shared->term_lock); + profile_print_and_reset(); } - time_accounts_free(taccs); } im_zmq_shutdown(zmqstuff); @@ -576,7 +577,7 @@ static void remove_pipe(struct sandbox *sb, int d) } -static void try_read(struct sandbox *sb, TimeAccounts *taccs) +static void try_read(struct sandbox *sb) { int r, i; struct timeval tv; @@ -584,8 +585,6 @@ static void try_read(struct sandbox *sb, TimeAccounts *taccs) int fdmax; const int ofd = stream_get_fd(sb->stream); - time_accounts_set(taccs, TACC_SELECT); - tv.tv_sec = 0; tv.tv_usec = 500000; @@ -619,7 +618,6 @@ static void try_read(struct sandbox *sb, TimeAccounts *taccs) /* If the chunk cannot be read, assume the connection * is broken and that the process will die soon. */ - time_accounts_set(taccs, TACC_STREAMREAD); if ( pump_chunk(sb->fhs[i], ofd) ) { remove_pipe(sb, i); } @@ -1066,7 +1064,6 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, struct sigaction sa; int r; int allDone = 0; - TimeAccounts *taccs; struct get_pattern_ctx gpctx; if ( n_proc > MAX_NUM_WORKERS ) { @@ -1188,22 +1185,18 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, return 0; } - taccs = time_accounts_init(); - do { /* Check for stream output from workers */ - try_read(sb, taccs); + try_read(sb); /* Check for interrupt or zombies */ - time_accounts_set(taccs, TACC_SIGNALS); check_signals(sb, semname_q, 1); /* Check for hung workers */ check_hung_workers(sb); /* Top up the queue if necessary */ - time_accounts_set(taccs, TACC_QUEUETOPUP); pthread_mutex_lock(&sb->shared->queue_lock); if ( !sb->shared->no_more && (sb->shared->n_events < QUEUE_SIZE/2) ) { if ( fill_queue(&gpctx, sb) ) sb->shared->no_more = 1; @@ -1211,11 +1204,9 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, pthread_mutex_unlock(&sb->shared->queue_lock); /* Update progress */ - time_accounts_set(taccs, TACC_STATUS); try_status(sb, 0); /* Have all the events been swallowed? */ - time_accounts_set(taccs, TACC_ENDCHECK); pthread_mutex_lock(&sb->shared->queue_lock); if ( sb->shared->no_more && (sb->shared->n_events == 0) ) allDone = 1; if ( sb->shared->should_shutdown ) { @@ -1234,7 +1225,6 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, /* Indicate to the workers that we are finished, and wake them up one * last time */ - time_accounts_set(taccs, TACC_WAKEUP); STATUS("Waiting for the last patterns to be processed...\n"); pthread_mutex_lock(&sb->shared->queue_lock); sb->shared->no_more = 1; @@ -1244,29 +1234,21 @@ int create_sandbox(struct index_args *iargs, int n_proc, char *prefix, } for ( i=0; i<n_proc; i++ ) { int status; - time_accounts_set(taccs, TACC_WAITPID); while ( waitpid(sb->pids[i], &status, WNOHANG) == 0 ) { - time_accounts_set(taccs, TACC_STREAMREAD); - try_read(sb, taccs); + try_read(sb); - time_accounts_set(taccs, TACC_SIGNALS); check_signals(sb, semname_q, 0); check_hung_workers(sb); - time_accounts_set(taccs, TACC_STATUS); try_status(sb, 0); - time_accounts_set(taccs, TACC_WAITPID); } /* If this worker died and got waited by the zombie handler, * waitpid() returns -1 and the loop still exits. */ } - if ( profile ) time_accounts_print(taccs); - time_accounts_free(taccs); - sem_unlink(semname_q); for ( i=0; i<sb->n_read; i++ ) { |