diff --git a/Documentation/slow-work.txt b/Documentation/slow-work.txt index 0169c9d9dd16..52bc31433723 100644 --- a/Documentation/slow-work.txt +++ b/Documentation/slow-work.txt @@ -158,6 +158,50 @@ with a requeue pending). This can be used to work out whether an item on which another depends is on the queue, thus allowing a dependent item to be queued after it. +If the above shows an item on which another depends not to be queued, then the +owner of the dependent item might need to wait. However, to avoid locking up +the threads unnecessarily be sleeping in them, it can make sense under some +circumstances to return the work item to the queue, thus deferring it until +some other items have had a chance to make use of the yielded thread. + +To yield a thread and defer an item, the work function should simply enqueue +the work item again and return. However, this doesn't work if there's nothing +actually on the queue, as the thread just vacated will jump straight back into +the item's work function, thus busy waiting on a CPU. + +Instead, the item should use the thread to wait for the dependency to go away, +but rather than using schedule() or schedule_timeout() to sleep, it should use +the following function: + + bool requeue = slow_work_sleep_till_thread_needed( + struct slow_work *work, + signed long *_timeout); + +This will add a second wait and then sleep, such that it will be woken up if +either something appears on the queue that could usefully make use of the +thread - and behind which this item can be queued, or if the event the caller +set up to wait for happens. True will be returned if something else appeared +on the queue and this work function should perhaps return, of false if +something else woke it up. The timeout is as for schedule_timeout(). + +For example: + + wq = bit_waitqueue(&my_flags, MY_BIT); + init_wait(&wait); + requeue = false; + do { + prepare_to_wait(wq, &wait, TASK_UNINTERRUPTIBLE); + if (!test_bit(MY_BIT, &my_flags)) + break; + requeue = slow_work_sleep_till_thread_needed(&my_work, + &timeout); + } while (timeout > 0 && !requeue); + finish_wait(wq, &wait); + if (!test_bit(MY_BIT, &my_flags) + goto do_my_thing; + if (requeue) + return; // to slow_work + =============== ITEM OPERATIONS diff --git a/include/linux/slow-work.h b/include/linux/slow-work.h index bfd3ab4c8898..5035a2691739 100644 --- a/include/linux/slow-work.h +++ b/include/linux/slow-work.h @@ -152,6 +152,9 @@ static inline void delayed_slow_work_cancel(struct delayed_slow_work *dwork) slow_work_cancel(&dwork->work); } +extern bool slow_work_sleep_till_thread_needed(struct slow_work *work, + signed long *_timeout); + #ifdef CONFIG_SYSCTL extern ctl_table slow_work_sysctls[]; #endif diff --git a/kernel/slow-work.c b/kernel/slow-work.c index b763bc2d2670..da94f3c101af 100644 --- a/kernel/slow-work.c +++ b/kernel/slow-work.c @@ -132,6 +132,15 @@ LIST_HEAD(slow_work_queue); LIST_HEAD(vslow_work_queue); DEFINE_SPINLOCK(slow_work_queue_lock); +/* + * The following are two wait queues that get pinged when a work item is placed + * on an empty queue. These allow work items that are hogging a thread by + * sleeping in a way that could be deferred to yield their thread and enqueue + * themselves. + */ +static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation); +static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation); + /* * The thread controls. A variable used to signal to the threads that they * should exit when the queue is empty, a waitqueue used by the threads to wait @@ -305,6 +314,50 @@ static noinline bool slow_work_execute(int id) return true; } +/** + * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work + * work: The work item under execution that wants to sleep + * _timeout: Scheduler sleep timeout + * + * Allow a requeueable work item to sleep on a slow-work processor thread until + * that thread is needed to do some other work or the sleep is interrupted by + * some other event. + * + * The caller must set up a wake up event before calling this and must have set + * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own + * condition before calling this function as no test is made here. + * + * False is returned if there is nothing on the queue; true is returned if the + * work item should be requeued + */ +bool slow_work_sleep_till_thread_needed(struct slow_work *work, + signed long *_timeout) +{ + wait_queue_head_t *wfo_wq; + struct list_head *queue; + + DEFINE_WAIT(wait); + + if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) { + wfo_wq = &vslow_work_queue_waits_for_occupation; + queue = &vslow_work_queue; + } else { + wfo_wq = &slow_work_queue_waits_for_occupation; + queue = &slow_work_queue; + } + + if (!list_empty(queue)) + return true; + + add_wait_queue_exclusive(wfo_wq, &wait); + if (list_empty(queue)) + *_timeout = schedule_timeout(*_timeout); + finish_wait(wfo_wq, &wait); + + return !list_empty(queue); +} +EXPORT_SYMBOL(slow_work_sleep_till_thread_needed); + /** * slow_work_enqueue - Schedule a slow work item for processing * @work: The work item to queue @@ -335,6 +388,8 @@ static noinline bool slow_work_execute(int id) */ int slow_work_enqueue(struct slow_work *work) { + wait_queue_head_t *wfo_wq; + struct list_head *queue; unsigned long flags; int ret; @@ -354,6 +409,14 @@ int slow_work_enqueue(struct slow_work *work) * maintaining our promise */ if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) { + if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) { + wfo_wq = &vslow_work_queue_waits_for_occupation; + queue = &vslow_work_queue; + } else { + wfo_wq = &slow_work_queue_waits_for_occupation; + queue = &slow_work_queue; + } + spin_lock_irqsave(&slow_work_queue_lock, flags); if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags))) @@ -380,11 +443,13 @@ int slow_work_enqueue(struct slow_work *work) if (ret < 0) goto failed; slow_work_mark_time(work); - if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) - list_add_tail(&work->link, &vslow_work_queue); - else - list_add_tail(&work->link, &slow_work_queue); + list_add_tail(&work->link, queue); wake_up(&slow_work_thread_wq); + + /* if someone who could be requeued is sleeping on a + * thread, then ask them to yield their thread */ + if (work->link.prev == queue) + wake_up(wfo_wq); } spin_unlock_irqrestore(&slow_work_queue_lock, flags); @@ -487,9 +552,19 @@ EXPORT_SYMBOL(slow_work_cancel); */ static void delayed_slow_work_timer(unsigned long data) { + wait_queue_head_t *wfo_wq; + struct list_head *queue; struct slow_work *work = (struct slow_work *) data; unsigned long flags; - bool queued = false, put = false; + bool queued = false, put = false, first = false; + + if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) { + wfo_wq = &vslow_work_queue_waits_for_occupation; + queue = &vslow_work_queue; + } else { + wfo_wq = &slow_work_queue_waits_for_occupation; + queue = &slow_work_queue; + } spin_lock_irqsave(&slow_work_queue_lock, flags); if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) { @@ -502,17 +577,18 @@ static void delayed_slow_work_timer(unsigned long data) put = true; } else { slow_work_mark_time(work); - if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) - list_add_tail(&work->link, &vslow_work_queue); - else - list_add_tail(&work->link, &slow_work_queue); + list_add_tail(&work->link, queue); queued = true; + if (work->link.prev == queue) + first = true; } } spin_unlock_irqrestore(&slow_work_queue_lock, flags); if (put) slow_work_put_ref(work); + if (first) + wake_up(wfo_wq); if (queued) wake_up(&slow_work_thread_wq); }