Blob Blame History Raw
From: Jan Kara <jack@suse.cz>
Subject: [PATCH] sbitmap: Avoid lockups when waker gets preempted
References: bsc#1209118
Patch-mainline: Never, sbitmap code got rewritten

When the process performing wakeup on the wait queue gets preempted and this
wait queue has the only waiter, we can loose many wakeups and in the end we
may end up with waiter in the wait queue without anybody able to wake him up.

The scenario can by like: We have waitqueue 0 active.

  CPU0                              CPU1
  __sbq_wake_up()
    decrements wait_cnt to 0
    wake_up_nr() -> wakes waiters
                                    queues IOs until all tags are used
                                    blk_mq_get_tag() finds no tag ->
                                      adds waiter to waitqueue 0
                                    all IOs complete - wakeups discarded because 
                                      waitqueue 0 still has wait_cnt == 0.
   set wait_cnt to 8

And waiter in waitqueue 0 sleeps indefinitely now because there's no IO to
complete to wake him up.

Fix the problem by waking up some waiters if we don't find any valid wait queue
to perform wake up on. This makes sure we don't loose all the wake ups.

Signed-off-by: Jan Kara <jack@suse.cz>

---
 lib/sbitmap.c |   22 ++++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)

--- a/lib/sbitmap.c
+++ b/lib/sbitmap.c
@@ -532,6 +532,7 @@ EXPORT_SYMBOL_GPL(sbitmap_queue_min_shal
 static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
 {
 	int i, wake_index;
+	struct sbq_wait_state *active_ws = NULL;
 
 	if (!atomic_read(&sbq->ws_active))
 		return NULL;
@@ -540,15 +541,28 @@ static struct sbq_wait_state *sbq_wake_p
 	for (i = 0; i < SBQ_WAIT_QUEUES; i++) {
 		struct sbq_wait_state *ws = &sbq->ws[wake_index];
 
-		if (waitqueue_active(&ws->wait) && atomic_read(&ws->wait_cnt) > 0) {
-			if (wake_index != atomic_read(&sbq->wake_index))
-				atomic_set(&sbq->wake_index, wake_index);
-			return ws;
+		if (waitqueue_active(&ws->wait)) {
+			if (atomic_read(&ws->wait_cnt) > 0) {
+				if (wake_index != atomic_read(&sbq->wake_index))
+					atomic_set(&sbq->wake_index, wake_index);
+				return ws;
+			}
+			active_ws = ws;
 		}
 
 		wake_index = sbq_index_inc(wake_index);
 	}
 
+	/*
+	 * There are active waitqueues but all are in the process of being
+	 * woken. Perform wakeup on some waitqueue to avoid loosing the wakeup.
+	 * This is actually important in case task performing wakeup gets
+	 * preempted and lots of other wakeup events happen before it gets
+	 * scheduled again.
+	 */
+	if (active_ws)
+		wake_up_nr(&active_ws->wait, READ_ONCE(sbq->wake_batch));
+
 	return NULL;
 }