Blob Blame History Raw
From 54c1ee4d614d52844cf24c46d8415bf1392021d0 Mon Sep 17 00:00:00 2001
From: Waiman Long <longman@redhat.com>
Date: Tue, 22 Mar 2022 11:20:58 -0400
Subject: [PATCH] locking/rwsem: Conditionally wake waiters in reader/writer
 slowpaths
Git-commit: 54c1ee4d614d52844cf24c46d8415bf1392021d0
Patch-mainline: v5.19-rc1
References: bsc#1207270

In an analysis of a recent vmcore, a reader-owned rwsem was found with
385 readers but no writer in the wait queue. That is kind of unusual
but it may be caused by some race conditions that we have not fully
understood yet. In such a case, all the readers in the wait queue should
join the other reader-owners and acquire the read lock.

In rwsem_down_write_slowpath(), an incoming writer will try to
wake up the front readers under such circumstance. That is not
the case for rwsem_down_read_slowpath(), add a new helper function
rwsem_cond_wake_waiter() to do wakeup and use it in both reader and
writer slowpaths to have a consistent and correct behavior.

Signed-off-by: Waiman Long <longman@redhat.com>
Signed-off-by: Peter Zijlstra (Intel) <peterz@infradead.org>
Link: https://lkml.kernel.org/r/20220322152059.2182333-3-longman@redhat.com
Signed-off-by: Jiri Wiesner <jwiesner@suse.de>
---
 kernel/locking/rwsem.c | 68 ++++++++++++++++++++----------------------
 1 file changed, 32 insertions(+), 36 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index b077b1b2d4b3..03cb97a8e4cd 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -901,7 +901,7 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
  */
 static inline void clear_nonspinnable(struct rw_semaphore *sem)
 {
-	if (rwsem_test_oflags(sem, RWSEM_NONSPINNABLE))
+	if (unlikely(rwsem_test_oflags(sem, RWSEM_NONSPINNABLE)))
 		atomic_long_andnot(RWSEM_NONSPINNABLE, &sem->owner);
 }
 
@@ -925,6 +925,31 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
 }
 #endif
 
+/*
+ * Prepare to wake up waiter(s) in the wait queue by putting them into the
+ * given wake_q if the rwsem lock owner isn't a writer. If rwsem is likely
+ * reader-owned, wake up read lock waiters in queue front or wake up any
+ * front waiter otherwise.
+
+ * This is being called from both reader and writer slow paths.
+ */
+static inline void rwsem_cond_wake_waiter(struct rw_semaphore *sem, long count,
+					  struct wake_q_head *wake_q)
+{
+	enum rwsem_wake_type wake_type;
+
+	if (count & RWSEM_WRITER_MASK)
+		return;
+
+	if (count & RWSEM_READER_MASK) {
+		wake_type = RWSEM_WAKE_READERS;
+	} else {
+		wake_type = RWSEM_WAKE_ANY;
+		clear_nonspinnable(sem);
+	}
+	rwsem_mark_wake(sem, wake_type, wake_q);
+}
+
 /*
  * Wait for the read lock to be granted
  */
@@ -935,7 +960,6 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
 	long rcnt = (count >> RWSEM_READER_SHIFT);
 	struct rwsem_waiter waiter;
 	DEFINE_WAKE_Q(wake_q);
-	bool wake = false;
 
 	/*
 	 * To prevent a constant stream of readers from starving a sleeping
@@ -996,22 +1020,11 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
 	/* we're now waiting on the lock, but no longer actively locking */
 	count = atomic_long_add_return(adjustment, &sem->count);
 
-	/*
-	 * If there are no active locks, wake the front queued process(es).
-	 *
-	 * If there are no writers and we are first in the queue,
-	 * wake our own waiter to join the existing active readers !
-	 */
-	if (!(count & RWSEM_LOCK_MASK)) {
-		clear_nonspinnable(sem);
-		wake = true;
-	}
-	if (wake || (!(count & RWSEM_WRITER_MASK) &&
-		    (adjustment & RWSEM_FLAG_WAITERS)))
-		rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
-
+	rwsem_cond_wake_waiter(sem, count, &wake_q);
 	raw_spin_unlock_irq(&sem->wait_lock);
-	wake_up_q(&wake_q);
+
+	if (!wake_q_empty(&wake_q))
+		wake_up_q(&wake_q);
 
 	/* wait to be given the lock */
 	for (;;) {
@@ -1050,7 +1063,6 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
 static struct rw_semaphore __sched *
 rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
 {
-	long count;
 	struct rwsem_waiter waiter;
 	DEFINE_WAKE_Q(wake_q);
 
@@ -1074,23 +1086,8 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
 
 	/* we're now waiting on the lock */
 	if (rwsem_first_waiter(sem) != &waiter) {
-		count = atomic_long_read(&sem->count);
-
-		/*
-		 * If there were already threads queued before us and:
-		 *  1) there are no active locks, wake the front
-		 *     queued process(es) as the handoff bit might be set.
-		 *  2) there are no active writers and some readers, the lock
-		 *     must be read owned; so we try to wake any read lock
-		 *     waiters that were queued ahead of us.
-		 */
-		if (count & RWSEM_WRITER_MASK)
-			goto wait;
-
-		rwsem_mark_wake(sem, (count & RWSEM_READER_MASK)
-					? RWSEM_WAKE_READERS
-					: RWSEM_WAKE_ANY, &wake_q);
-
+		rwsem_cond_wake_waiter(sem, atomic_long_read(&sem->count),
+				       &wake_q);
 		if (!wake_q_empty(&wake_q)) {
 			/*
 			 * We want to minimize wait_lock hold time especially
@@ -1105,7 +1102,6 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
 		atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
 	}
 
-wait:
 	/* wait until we successfully acquire the lock */
 	set_current_state(state);
 	for (;;) {
-- 
2.35.3