Blob Blame History Raw
From: Ilya Dryomov <idryomov@gmail.com>
Date: Sat, 20 Jan 2018 10:30:10 +0100
Subject: libceph, rbd: new bio handling code (aka don't clone bios)
Git-commit: 5359a17d2706b86da2af83027343d5eb256f7670
Patch-mainline: v4.17-rc1
References: FATE#324714 bsc#1141450

The reason we clone bios is to be able to give each object request
(and consequently each ceph_osd_data/ceph_msg_data item) its own
pointer to a (list of) bio(s).  The messenger then initializes its
cursor with cloned bio's ->bi_iter, so it knows where to start reading
from/writing to.  That's all the cloned bios are used for: to determine
each object request's starting position in the provided data buffer.

Introduce ceph_bio_iter to do exactly that -- store position within bio
list (i.e. pointer to bio) + position within that bio (i.e. bvec_iter).

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Acked-by: Luis Henriques <lhenriques@suse.com>
---
 drivers/block/rbd.c             |   67 +++++++++++++++-----------
 include/linux/ceph/messenger.h  |   59 ++++++++++++++++++++---
 include/linux/ceph/osd_client.h |   11 ++--
 net/ceph/messenger.c            |  101 +++++++++++++---------------------------
 net/ceph/osd_client.c           |   13 +++--
 5 files changed, 139 insertions(+), 112 deletions(-)

--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -218,7 +218,7 @@ typedef void (*rbd_obj_callback_t)(struc
 
 enum obj_request_type {
 	OBJ_REQUEST_NODATA = 1,
-	OBJ_REQUEST_BIO,
+	OBJ_REQUEST_BIO,	/* pointer into provided bio (list) */
 	OBJ_REQUEST_PAGES,
 };
 
@@ -270,7 +270,7 @@ struct rbd_obj_request {
 
 	enum obj_request_type	type;
 	union {
-		struct bio	*bio_list;
+		struct ceph_bio_iter	bio_pos;
 		struct {
 			struct page	**pages;
 			u32		page_count;
@@ -1283,6 +1283,27 @@ static u64 rbd_segment_length(struct rbd
 	return length;
 }
 
+static void zero_bvec(struct bio_vec *bv)
+{
+	void *buf;
+	unsigned long flags;
+
+	buf = bvec_kmap_irq(bv, &flags);
+	memset(buf, 0, bv->bv_len);
+	flush_dcache_page(bv->bv_page);
+	bvec_kunmap_irq(buf, &flags);
+}
+
+static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
+{
+	struct ceph_bio_iter it = *bio_pos;
+
+	ceph_bio_iter_advance(&it, off);
+	ceph_bio_iter_advance_step(&it, bytes, ({
+		zero_bvec(&bv);
+	}));
+}
+
 /*
  * bio helpers
  */
@@ -1747,13 +1768,14 @@ rbd_img_obj_request_read_callback(struct
 	rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
 	if (obj_request->result == -ENOENT) {
 		if (obj_request->type == OBJ_REQUEST_BIO)
-			zero_bio_chain(obj_request->bio_list, 0);
+			zero_bios(&obj_request->bio_pos, 0, length);
 		else
 			zero_pages(obj_request->pages, 0, length);
 		obj_request->result = 0;
 	} else if (xferred < length && !obj_request->result) {
 		if (obj_request->type == OBJ_REQUEST_BIO)
-			zero_bio_chain(obj_request->bio_list, xferred);
+			zero_bios(&obj_request->bio_pos, xferred,
+				  length - xferred);
 		else
 			zero_pages(obj_request->pages, xferred, length);
 	}
@@ -2068,11 +2090,8 @@ static void rbd_obj_request_destroy(stru
 	rbd_assert(obj_request_type_valid(obj_request->type));
 	switch (obj_request->type) {
 	case OBJ_REQUEST_NODATA:
-		break;		/* Nothing to do */
 	case OBJ_REQUEST_BIO:
-		if (obj_request->bio_list)
-			bio_chain_put(obj_request->bio_list);
-		break;
+		break;		/* Nothing to do */
 	case OBJ_REQUEST_PAGES:
 		/* img_data requests don't own their page array */
 		if (obj_request->pages &&
@@ -2405,7 +2424,7 @@ static void rbd_img_obj_request_fill(str
 
 	if (obj_request->type == OBJ_REQUEST_BIO)
 		osd_req_op_extent_osd_data_bio(osd_request, num_ops,
-					obj_request->bio_list, length);
+					&obj_request->bio_pos, length);
 	else if (obj_request->type == OBJ_REQUEST_PAGES)
 		osd_req_op_extent_osd_data_pages(osd_request, num_ops,
 					obj_request->pages, length,
@@ -2433,8 +2452,7 @@ static int rbd_img_request_fill(struct r
 	struct rbd_device *rbd_dev = img_request->rbd_dev;
 	struct rbd_obj_request *obj_request = NULL;
 	struct rbd_obj_request *next_obj_request;
-	struct bio *bio_list = NULL;
-	unsigned int bio_offset = 0;
+	struct ceph_bio_iter bio_it;
 	struct page **pages = NULL;
 	enum obj_operation_type op_type;
 	u64 img_offset;
@@ -2449,9 +2467,9 @@ static int rbd_img_request_fill(struct r
 	op_type = rbd_img_request_op_type(img_request);
 
 	if (type == OBJ_REQUEST_BIO) {
-		bio_list = data_desc;
+		bio_it = *(struct ceph_bio_iter *)data_desc;
 		rbd_assert(img_offset ==
-			   bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
+			   bio_it.iter.bi_sector << SECTOR_SHIFT);
 	} else if (type == OBJ_REQUEST_PAGES) {
 		pages = data_desc;
 	}
@@ -2477,17 +2495,8 @@ static int rbd_img_request_fill(struct r
 		rbd_img_obj_request_add(img_request, obj_request);
 
 		if (type == OBJ_REQUEST_BIO) {
-			unsigned int clone_size;
-
-			rbd_assert(length <= (u64)UINT_MAX);
-			clone_size = (unsigned int)length;
-			obj_request->bio_list =
-					bio_chain_clone_range(&bio_list,
-								&bio_offset,
-								clone_size,
-								GFP_NOIO);
-			if (!obj_request->bio_list)
-				goto out_unwind;
+			obj_request->bio_pos = bio_it;
+			ceph_bio_iter_advance(&bio_it, length);
 		} else if (type == OBJ_REQUEST_PAGES) {
 			unsigned int page_count;
 
@@ -3019,7 +3028,7 @@ static void rbd_img_parent_read(struct r
 
 	if (obj_request->type == OBJ_REQUEST_BIO)
 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-						obj_request->bio_list);
+						&obj_request->bio_pos);
 	else
 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
 						obj_request->pages);
@@ -4139,9 +4148,13 @@ static void rbd_queue_workfn(struct work
 	if (op_type == OBJ_OP_DISCARD)
 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
 					      NULL);
-	else
+	else {
+		struct ceph_bio_iter bio_it = { .bio = rq->bio,
+						.iter = rq->bio->bi_iter };
+
 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-					      rq->bio);
+					      &bio_it);
+	}
 	if (result)
 		goto err_img_request;
 
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -92,14 +92,60 @@ static __inline__ bool ceph_msg_data_typ
 	}
 }
 
+#ifdef CONFIG_BLOCK
+
+struct ceph_bio_iter {
+	struct bio *bio;
+	struct bvec_iter iter;
+};
+
+#define __ceph_bio_iter_advance_step(it, n, STEP) do {			      \
+	unsigned int __n = (n), __cur_n;				      \
+									      \
+	while (__n) {							      \
+		BUG_ON(!(it)->iter.bi_size);				      \
+		__cur_n = min((it)->iter.bi_size, __n);			      \
+		(void)(STEP);						      \
+		bio_advance_iter((it)->bio, &(it)->iter, __cur_n);	      \
+		if (!(it)->iter.bi_size && (it)->bio->bi_next) {	      \
+			dout("__ceph_bio_iter_advance_step next bio\n");      \
+			(it)->bio = (it)->bio->bi_next;			      \
+			(it)->iter = (it)->bio->bi_iter;		      \
+		}							      \
+		__n -= __cur_n;						      \
+	}								      \
+} while (0)
+
+/*
+ * Advance @it by @n bytes.
+ */
+#define ceph_bio_iter_advance(it, n)					      \
+	__ceph_bio_iter_advance_step(it, n, 0)
+
+/*
+ * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec.
+ */
+#define ceph_bio_iter_advance_step(it, n, BVEC_STEP)			      \
+	__ceph_bio_iter_advance_step(it, n, ({				      \
+		struct bio_vec bv;					      \
+		struct bvec_iter __cur_iter;				      \
+									      \
+		__cur_iter = (it)->iter;				      \
+		__cur_iter.bi_size = __cur_n;				      \
+		__bio_for_each_segment(bv, (it)->bio, __cur_iter, __cur_iter) \
+			(void)(BVEC_STEP);				      \
+	}))
+
+#endif /* CONFIG_BLOCK */
+
 struct ceph_msg_data {
 	struct list_head		links;	/* ceph_msg->data */
 	enum ceph_msg_data_type		type;
 	union {
 #ifdef CONFIG_BLOCK
 		struct {
-			struct bio	*bio;
-			size_t		bio_length;
+			struct ceph_bio_iter	bio_pos;
+			u32			bio_length;
 		};
 #endif /* CONFIG_BLOCK */
 		struct {
@@ -121,10 +167,7 @@ struct ceph_msg_data_cursor {
 	bool			need_crc;	/* crc update needed */
 	union {
 #ifdef CONFIG_BLOCK
-		struct {				/* bio */
-			struct bio	*bio;		/* bio from list */
-			struct bvec_iter bvec_iter;
-		};
+		struct ceph_bio_iter	bio_iter;
 #endif /* CONFIG_BLOCK */
 		struct {				/* pages */
 			unsigned int	page_offset;	/* offset in page */
@@ -289,8 +332,8 @@ extern void ceph_msg_data_add_pages(stru
 extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
 				struct ceph_pagelist *pagelist);
 #ifdef CONFIG_BLOCK
-extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
-				size_t length);
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
+			   u32 length);
 #endif /* CONFIG_BLOCK */
 
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -71,8 +71,8 @@ struct ceph_osd_data {
 		struct ceph_pagelist	*pagelist;
 #ifdef CONFIG_BLOCK
 		struct {
-			struct bio	*bio;		/* list of bios */
-			size_t		bio_length;	/* total in list */
+			struct ceph_bio_iter	bio_pos;
+			u32			bio_length;
 		};
 #endif /* CONFIG_BLOCK */
 	};
@@ -404,9 +404,10 @@ extern void osd_req_op_extent_osd_data_p
 					unsigned int which,
 					struct ceph_pagelist *pagelist);
 #ifdef CONFIG_BLOCK
-extern void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *,
-					unsigned int which,
-					struct bio *bio, size_t bio_length);
+void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
+				    unsigned int which,
+				    struct ceph_bio_iter *bio_pos,
+				    u32 bio_length);
 #endif /* CONFIG_BLOCK */
 
 extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -838,90 +838,57 @@ static void ceph_msg_data_bio_cursor_ini
 					size_t length)
 {
 	struct ceph_msg_data *data = cursor->data;
-	struct bio *bio;
+	struct ceph_bio_iter *it = &cursor->bio_iter;
 
-	BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+	cursor->resid = min_t(size_t, length, data->bio_length);
+	*it = data->bio_pos;
+	if (cursor->resid < it->iter.bi_size)
+		it->iter.bi_size = cursor->resid;
 
-	bio = data->bio;
-	BUG_ON(!bio);
-
-	cursor->resid = min(length, data->bio_length);
-	cursor->bio = bio;
-	cursor->bvec_iter = bio->bi_iter;
-	cursor->last_piece =
-		cursor->resid <= bio_iter_len(bio, cursor->bvec_iter);
+	BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
+	cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
 }
 
 static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
 						size_t *page_offset,
 						size_t *length)
 {
-	struct ceph_msg_data *data = cursor->data;
-	struct bio *bio;
-	struct bio_vec bio_vec;
-
-	BUG_ON(data->type != CEPH_MSG_DATA_BIO);
-
-	bio = cursor->bio;
-	BUG_ON(!bio);
+	struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
+					   cursor->bio_iter.iter);
 
-	bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
-
-	*page_offset = (size_t) bio_vec.bv_offset;
-	BUG_ON(*page_offset >= PAGE_SIZE);
-	if (cursor->last_piece) /* pagelist offset is always 0 */
-		*length = cursor->resid;
-	else
-		*length = (size_t) bio_vec.bv_len;
-	BUG_ON(*length > cursor->resid);
-	BUG_ON(*page_offset + *length > PAGE_SIZE);
-
-	return bio_vec.bv_page;
+	*page_offset = bv.bv_offset;
+	*length = bv.bv_len;
+	return bv.bv_page;
 }
 
 static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
 					size_t bytes)
 {
-	struct bio *bio;
-	struct bio_vec bio_vec;
-
-	BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
-
-	bio = cursor->bio;
-	BUG_ON(!bio);
-
-	bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
+	struct ceph_bio_iter *it = &cursor->bio_iter;
 
-	/* Advance the cursor offset */
-
-	BUG_ON(cursor->resid < bytes);
+	BUG_ON(bytes > cursor->resid);
+	BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
 	cursor->resid -= bytes;
+	bio_advance_iter(it->bio, &it->iter, bytes);
 
-	bio_advance_iter(bio, &cursor->bvec_iter, bytes);
+	if (!cursor->resid) {
+		BUG_ON(!cursor->last_piece);
+		return false;   /* no more data */
+	}
 
-	if (bytes < bio_vec.bv_len)
+	if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done))
 		return false;	/* more bytes to process in this segment */
 
-	/* Move on to the next segment, and possibly the next bio */
-
-	if (!cursor->bvec_iter.bi_size) {
-		bio = bio->bi_next;
-		cursor->bio = bio;
-		if (bio)
-			cursor->bvec_iter = bio->bi_iter;
-		else
-			memset(&cursor->bvec_iter, 0,
-			       sizeof(cursor->bvec_iter));
-	}
-
-	if (!cursor->last_piece) {
-		BUG_ON(!cursor->resid);
-		BUG_ON(!bio);
-		/* A short read is OK, so use <= rather than == */
-		if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter))
-			cursor->last_piece = true;
+	if (!it->iter.bi_size) {
+		it->bio = it->bio->bi_next;
+		it->iter = it->bio->bi_iter;
+		if (cursor->resid < it->iter.bi_size)
+			it->iter.bi_size = cursor->resid;
 	}
 
+	BUG_ON(cursor->last_piece);
+	BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
+	cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
 	return true;
 }
 #endif /* CONFIG_BLOCK */
@@ -1162,9 +1129,11 @@ static struct page *ceph_msg_data_next(s
 		page = NULL;
 		break;
 	}
+
 	BUG_ON(!page);
 	BUG_ON(*page_offset + *length > PAGE_SIZE);
 	BUG_ON(!*length);
+	BUG_ON(*length > cursor->resid);
 	if (last_piece)
 		*last_piece = cursor->last_piece;
 
@@ -3261,16 +3230,14 @@ void ceph_msg_data_add_pagelist(struct c
 EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
 
 #ifdef	CONFIG_BLOCK
-void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
-		size_t length)
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
+			   u32 length)
 {
 	struct ceph_msg_data *data;
 
-	BUG_ON(!bio);
-
 	data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
 	BUG_ON(!data);
-	data->bio = bio;
+	data->bio_pos = *bio_pos;
 	data->bio_length = length;
 
 	list_add_tail(&data->links, &msg->data);
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -147,10 +147,11 @@ static void ceph_osd_data_pagelist_init(
 
 #ifdef CONFIG_BLOCK
 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
-			struct bio *bio, size_t bio_length)
+				   struct ceph_bio_iter *bio_pos,
+				   u32 bio_length)
 {
 	osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
-	osd_data->bio = bio;
+	osd_data->bio_pos = *bio_pos;
 	osd_data->bio_length = bio_length;
 }
 #endif /* CONFIG_BLOCK */
@@ -217,12 +218,14 @@ EXPORT_SYMBOL(osd_req_op_extent_osd_data
 
 #ifdef CONFIG_BLOCK
 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
-			unsigned int which, struct bio *bio, size_t bio_length)
+				    unsigned int which,
+				    struct ceph_bio_iter *bio_pos,
+				    u32 bio_length)
 {
 	struct ceph_osd_data *osd_data;
 
 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
-	ceph_osd_data_bio_init(osd_data, bio, bio_length);
+	ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
 #endif /* CONFIG_BLOCK */
@@ -827,7 +830,7 @@ static void ceph_osdc_msg_data_add(struc
 		ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
 #ifdef CONFIG_BLOCK
 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
-		ceph_msg_data_add_bio(msg, osd_data->bio, length);
+		ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
 #endif
 	} else {
 		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);