From 9042846bc7ae69cc3288d85af6bad16208d93a95 Mon Sep 17 00:00:00 2001 From: Chris Mason Date: Tue, 4 Aug 2009 16:56:34 -0400 Subject: Btrfs: Allow worker threads to exit when idle The Btrfs worker threads don't currently die off after they have been idle for a while, leading to a lot of threads sitting around doing nothing for each mount. Also, they are unable to start atomically (from end_io hanlders). This commit reworks the worker threads so they can be started from end_io handlers (just setting a flag that asks for a thread to be added at a later date) and so they can exit if they have been idle for a long time. Signed-off-by: Chris Mason --- fs/btrfs/async-thread.c | 133 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 111 insertions(+), 22 deletions(-) (limited to 'fs/btrfs/async-thread.c') diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c index 019e8af449a..f10c895224a 100644 --- a/fs/btrfs/async-thread.c +++ b/fs/btrfs/async-thread.c @@ -48,6 +48,9 @@ struct btrfs_worker_thread { /* number of things on the pending list */ atomic_t num_pending; + /* reference counter for this struct */ + atomic_t refs; + unsigned long sequence; /* protects the pending list. */ @@ -93,6 +96,31 @@ static void check_busy_worker(struct btrfs_worker_thread *worker) } } +static void check_pending_worker_creates(struct btrfs_worker_thread *worker) +{ + struct btrfs_workers *workers = worker->workers; + unsigned long flags; + + rmb(); + if (!workers->atomic_start_pending) + return; + + spin_lock_irqsave(&workers->lock, flags); + if (!workers->atomic_start_pending) + goto out; + + workers->atomic_start_pending = 0; + if (workers->num_workers >= workers->max_workers) + goto out; + + spin_unlock_irqrestore(&workers->lock, flags); + btrfs_start_workers(workers, 1); + return; + +out: + spin_unlock_irqrestore(&workers->lock, flags); +} + static noinline int run_ordered_completions(struct btrfs_workers *workers, struct btrfs_work *work) { @@ -140,6 +168,36 @@ static noinline int run_ordered_completions(struct btrfs_workers *workers, return 0; } +static void put_worker(struct btrfs_worker_thread *worker) +{ + if (atomic_dec_and_test(&worker->refs)) + kfree(worker); +} + +static int try_worker_shutdown(struct btrfs_worker_thread *worker) +{ + int freeit = 0; + + spin_lock_irq(&worker->lock); + spin_lock_irq(&worker->workers->lock); + if (worker->workers->num_workers > 1 && + worker->idle && + !worker->working && + !list_empty(&worker->worker_list) && + list_empty(&worker->prio_pending) && + list_empty(&worker->pending)) { + freeit = 1; + list_del_init(&worker->worker_list); + worker->workers->num_workers--; + } + spin_unlock_irq(&worker->workers->lock); + spin_unlock_irq(&worker->lock); + + if (freeit) + put_worker(worker); + return freeit; +} + /* * main loop for servicing work items */ @@ -175,6 +233,8 @@ again_locked: */ run_ordered_completions(worker->workers, work); + check_pending_worker_creates(worker); + spin_lock_irq(&worker->lock); check_idle_worker(worker); } @@ -226,8 +286,13 @@ again_locked: worker->working = 0; spin_unlock_irq(&worker->lock); - if (!kthread_should_stop()) - schedule(); + if (!kthread_should_stop()) { + schedule_timeout(HZ * 120); + if (!worker->working && + try_worker_shutdown(worker)) { + return 0; + } + } } __set_current_state(TASK_RUNNING); } @@ -242,16 +307,30 @@ int btrfs_stop_workers(struct btrfs_workers *workers) { struct list_head *cur; struct btrfs_worker_thread *worker; + int can_stop; + spin_lock_irq(&workers->lock); list_splice_init(&workers->idle_list, &workers->worker_list); while (!list_empty(&workers->worker_list)) { cur = workers->worker_list.next; worker = list_entry(cur, struct btrfs_worker_thread, worker_list); - kthread_stop(worker->task); - list_del(&worker->worker_list); - kfree(worker); + + atomic_inc(&worker->refs); + workers->num_workers -= 1; + if (!list_empty(&worker->worker_list)) { + list_del_init(&worker->worker_list); + put_worker(worker); + can_stop = 1; + } else + can_stop = 0; + spin_unlock_irq(&workers->lock); + if (can_stop) + kthread_stop(worker->task); + spin_lock_irq(&workers->lock); + put_worker(worker); } + spin_unlock_irq(&workers->lock); return 0; } @@ -270,6 +349,8 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max) workers->idle_thresh = 32; workers->name = name; workers->ordered = 0; + workers->atomic_start_pending = 0; + workers->atomic_worker_start = 0; } /* @@ -294,6 +375,7 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers) INIT_LIST_HEAD(&worker->worker_list); spin_lock_init(&worker->lock); atomic_set(&worker->num_pending, 0); + atomic_set(&worker->refs, 1); worker->workers = workers; worker->task = kthread_run(worker_loop, worker, "btrfs-%s-%d", workers->name, @@ -303,7 +385,6 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers) kfree(worker); goto fail; } - spin_lock_irq(&workers->lock); list_add_tail(&worker->worker_list, &workers->idle_list); worker->idle = 1; @@ -367,6 +448,7 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) { struct btrfs_worker_thread *worker; unsigned long flags; + struct list_head *fallback; again: spin_lock_irqsave(&workers->lock, flags); @@ -376,19 +458,10 @@ again: if (!worker) { spin_lock_irqsave(&workers->lock, flags); if (workers->num_workers >= workers->max_workers) { - struct list_head *fallback = NULL; - /* - * we have failed to find any workers, just - * return the force one - */ - if (!list_empty(&workers->worker_list)) - fallback = workers->worker_list.next; - if (!list_empty(&workers->idle_list)) - fallback = workers->idle_list.next; - BUG_ON(!fallback); - worker = list_entry(fallback, - struct btrfs_worker_thread, worker_list); - spin_unlock_irqrestore(&workers->lock, flags); + goto fallback; + } else if (workers->atomic_worker_start) { + workers->atomic_start_pending = 1; + goto fallback; } else { spin_unlock_irqrestore(&workers->lock, flags); /* we're below the limit, start another worker */ @@ -397,6 +470,22 @@ again: } } return worker; + +fallback: + fallback = NULL; + /* + * we have failed to find any workers, just + * return the first one we can find. + */ + if (!list_empty(&workers->worker_list)) + fallback = workers->worker_list.next; + if (!list_empty(&workers->idle_list)) + fallback = workers->idle_list.next; + BUG_ON(!fallback); + worker = list_entry(fallback, + struct btrfs_worker_thread, worker_list); + spin_unlock_irqrestore(&workers->lock, flags); + return worker; } /* @@ -435,9 +524,9 @@ int btrfs_requeue_work(struct btrfs_work *work) worker->working = 1; } - spin_unlock_irqrestore(&worker->lock, flags); if (wake) wake_up_process(worker->task); + spin_unlock_irqrestore(&worker->lock, flags); out: return 0; @@ -492,10 +581,10 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) wake = 1; worker->working = 1; - spin_unlock_irqrestore(&worker->lock, flags); - if (wake) wake_up_process(worker->task); + spin_unlock_irqrestore(&worker->lock, flags); + out: return 0; } -- cgit v1.2.3 From 4e3f9c5042b43301d70781aee4a164a20878066b Mon Sep 17 00:00:00 2001 From: Chris Mason Date: Wed, 5 Aug 2009 16:36:45 -0400 Subject: Btrfs: keep irqs on more often in the worker threads The btrfs worker thread spinlock was being used both for the queueing of IO and for the processing of ordered events. The ordered events never happen from end_io handlers, and so they don't need to use the _irq version of spinlocks. This adds a dedicated lock to the ordered lists so they don't have to run with irqs off. Signed-off-by: Chris Mason --- fs/btrfs/async-thread.c | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) (limited to 'fs/btrfs/async-thread.c') diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c index f10c895224a..4b4372df3b6 100644 --- a/fs/btrfs/async-thread.c +++ b/fs/btrfs/async-thread.c @@ -124,14 +124,12 @@ out: static noinline int run_ordered_completions(struct btrfs_workers *workers, struct btrfs_work *work) { - unsigned long flags; - if (!workers->ordered) return 0; set_bit(WORK_DONE_BIT, &work->flags); - spin_lock_irqsave(&workers->lock, flags); + spin_lock(&workers->order_lock); while (1) { if (!list_empty(&workers->prio_order_list)) { @@ -154,17 +152,17 @@ static noinline int run_ordered_completions(struct btrfs_workers *workers, if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) break; - spin_unlock_irqrestore(&workers->lock, flags); + spin_unlock(&workers->order_lock); work->ordered_func(work); /* now take the lock again and call the freeing code */ - spin_lock_irqsave(&workers->lock, flags); + spin_lock(&workers->order_lock); list_del(&work->order_list); work->ordered_free(work); } - spin_unlock_irqrestore(&workers->lock, flags); + spin_unlock(&workers->order_lock); return 0; } @@ -345,6 +343,7 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max) INIT_LIST_HEAD(&workers->order_list); INIT_LIST_HEAD(&workers->prio_order_list); spin_lock_init(&workers->lock); + spin_lock_init(&workers->order_lock); workers->max_workers = max; workers->idle_thresh = 32; workers->name = name; @@ -374,6 +373,7 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers) INIT_LIST_HEAD(&worker->prio_pending); INIT_LIST_HEAD(&worker->worker_list); spin_lock_init(&worker->lock); + atomic_set(&worker->num_pending, 0); atomic_set(&worker->refs, 1); worker->workers = workers; @@ -453,10 +453,8 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) again: spin_lock_irqsave(&workers->lock, flags); worker = next_worker(workers); - spin_unlock_irqrestore(&workers->lock, flags); if (!worker) { - spin_lock_irqsave(&workers->lock, flags); if (workers->num_workers >= workers->max_workers) { goto fallback; } else if (workers->atomic_worker_start) { @@ -469,6 +467,7 @@ again: goto again; } } + spin_unlock_irqrestore(&workers->lock, flags); return worker; fallback: @@ -552,14 +551,18 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) worker = find_worker(workers); if (workers->ordered) { - spin_lock_irqsave(&workers->lock, flags); + /* + * you're not allowed to do ordered queues from an + * interrupt handler + */ + spin_lock(&workers->order_lock); if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) { list_add_tail(&work->order_list, &workers->prio_order_list); } else { list_add_tail(&work->order_list, &workers->order_list); } - spin_unlock_irqrestore(&workers->lock, flags); + spin_unlock(&workers->order_lock); } else { INIT_LIST_HEAD(&work->order_list); } -- cgit v1.2.3 From 4f878e8475a465ddbd951e06a23317303f1b5b30 Mon Sep 17 00:00:00 2001 From: Chris Mason Date: Fri, 7 Aug 2009 09:27:38 -0400 Subject: Btrfs: reduce worker thread spin_lock_irq hold times This changes the btrfs worker threads to batch work items into a local list. It allows us to pull work items in large chunks and significantly reduces the number of times we need to take the worker thread spinlock. Signed-off-by: Chris Mason --- fs/btrfs/async-thread.c | 74 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 60 insertions(+), 14 deletions(-) (limited to 'fs/btrfs/async-thread.c') diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c index 4b4372df3b6..6ea5cd0a595 100644 --- a/fs/btrfs/async-thread.c +++ b/fs/btrfs/async-thread.c @@ -196,31 +196,73 @@ static int try_worker_shutdown(struct btrfs_worker_thread *worker) return freeit; } +static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker, + struct list_head *prio_head, + struct list_head *head) +{ + struct btrfs_work *work = NULL; + struct list_head *cur = NULL; + + if(!list_empty(prio_head)) + cur = prio_head->next; + + smp_mb(); + if (!list_empty(&worker->prio_pending)) + goto refill; + + if (!list_empty(head)) + cur = head->next; + + if (cur) + goto out; + +refill: + spin_lock_irq(&worker->lock); + list_splice_tail_init(&worker->prio_pending, prio_head); + list_splice_tail_init(&worker->pending, head); + + if (!list_empty(prio_head)) + cur = prio_head->next; + else if (!list_empty(head)) + cur = head->next; + spin_unlock_irq(&worker->lock); + + if (!cur) + goto out_fail; + +out: + work = list_entry(cur, struct btrfs_work, list); + +out_fail: + return work; +} + /* * main loop for servicing work items */ static int worker_loop(void *arg) { struct btrfs_worker_thread *worker = arg; - struct list_head *cur; + struct list_head head; + struct list_head prio_head; struct btrfs_work *work; + + INIT_LIST_HEAD(&head); + INIT_LIST_HEAD(&prio_head); + do { - spin_lock_irq(&worker->lock); -again_locked: +again: while (1) { - if (!list_empty(&worker->prio_pending)) - cur = worker->prio_pending.next; - else if (!list_empty(&worker->pending)) - cur = worker->pending.next; - else + + + work = get_next_work(worker, &prio_head, &head); + if (!work) break; - work = list_entry(cur, struct btrfs_work, list); list_del(&work->list); clear_bit(WORK_QUEUED_BIT, &work->flags); work->worker = worker; - spin_unlock_irq(&worker->lock); work->func(work); @@ -233,9 +275,11 @@ again_locked: check_pending_worker_creates(worker); - spin_lock_irq(&worker->lock); - check_idle_worker(worker); } + + spin_lock_irq(&worker->lock); + check_idle_worker(worker); + if (freezing(current)) { worker->working = 0; spin_unlock_irq(&worker->lock); @@ -274,8 +318,10 @@ again_locked: spin_lock_irq(&worker->lock); set_current_state(TASK_INTERRUPTIBLE); if (!list_empty(&worker->pending) || - !list_empty(&worker->prio_pending)) - goto again_locked; + !list_empty(&worker->prio_pending)) { + spin_unlock_irq(&worker->lock); + goto again; + } /* * this makes sure we get a wakeup when someone -- cgit v1.2.3