Blob Blame History Raw
From: "Michael S. Tsirkin" <mst@redhat.com>
Date: Fri, 26 Jan 2018 01:36:27 +0200
Subject: ptr_ring: keep consumer_head valid at all times
Patch-mainline: v4.16-rc1
Git-commit: 406de7555424d93849166684d0bd172743d2a30c
References: bsc#1109837

The comment near __ptr_ring_peek says:

 * If ring is never resized, and if the pointer is merely
 * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty.

but this was in fact never possible since consumer_head would sometimes
point outside the ring. Refactor the code so that it's always
pointing within a ring.

Fixes: c5ad119fb6c09 ("net: sched: pfifo_fast use skb_array")
Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
Acked-by: John Fastabend <john.fastabend@gmail.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
Acked-by: Thomas Bogendoerfer <tbogendoerfer@suse.de>
---
 include/linux/ptr_ring.h |   25 ++++++++++++++++---------
 1 file changed, 16 insertions(+), 9 deletions(-)

--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -248,22 +248,28 @@ static inline void __ptr_ring_discard_on
 	/* Fundamentally, what we want to do is update consumer
 	 * index and zero out the entry so producer can reuse it.
 	 * Doing it naively at each consume would be as simple as:
-	 *       r->queue[r->consumer++] = NULL;
-	 *       if (unlikely(r->consumer >= r->size))
-	 *               r->consumer = 0;
+	 *       consumer = r->consumer;
+	 *       r->queue[consumer++] = NULL;
+	 *       if (unlikely(consumer >= r->size))
+	 *               consumer = 0;
+	 *       r->consumer = consumer;
 	 * but that is suboptimal when the ring is full as producer is writing
 	 * out new entries in the same cache line.  Defer these updates until a
 	 * batch of entries has been consumed.
 	 */
-	int head = r->consumer_head++;
+	/* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
+	 * to work correctly.
+	 */
+	int consumer_head = r->consumer_head;
+	int head = consumer_head++;
 
 	/* Once we have processed enough entries invalidate them in
 	 * the ring all at once so producer can reuse their space in the ring.
 	 * We also do this when we reach end of the ring - not mandatory
 	 * but helps keep the implementation simple.
 	 */
-	if (unlikely(r->consumer_head - r->consumer_tail >= r->batch ||
-		     r->consumer_head >= r->size)) {
+	if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
+		     consumer_head >= r->size)) {
 		/* Zero out entries in the reverse order: this way we touch the
 		 * cache line that producer might currently be reading the last;
 		 * producer won't make progress and touch other cache lines
@@ -271,12 +277,13 @@ static inline void __ptr_ring_discard_on
 		 */
 		while (likely(head >= r->consumer_tail))
 			r->queue[head--] = NULL;
-		r->consumer_tail = r->consumer_head;
+		r->consumer_tail = consumer_head;
 	}
-	if (unlikely(r->consumer_head >= r->size)) {
-		r->consumer_head = 0;
+	if (unlikely(consumer_head >= r->size)) {
+		consumer_head = 0;
 		r->consumer_tail = 0;
 	}
+	r->consumer_head = consumer_head;
 }
 
 static inline void *__ptr_ring_consume(struct ptr_ring *r)