Blob Blame History Raw
From: Ilya Dryomov <idryomov@gmail.com>
Date: Sat, 20 Jan 2018 10:30:11 +0100
Subject: libceph: introduce BVECS data type
Git-commit: b9e281c2b38804984d619e1d9efc4b9020bcb291
Patch-mainline: v4.17-rc1
References: FATE#324714 bsc#1141450

In preparation for rbd "fancy" striping, introduce ceph_bvec_iter for
working with bio_vec array data buffers.  The wrappers are trivial, but
make it look similar to ceph_bio_iter.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Acked-by: Luis Henriques <lhenriques@suse.com>
---
 include/linux/ceph/messenger.h  |   42 ++++++++++++++++++++++
 include/linux/ceph/osd_client.h |    8 ++++
 net/ceph/messenger.c            |   75 ++++++++++++++++++++++++++++++++++++++++
 net/ceph/osd_client.c           |   39 ++++++++++++++++++++
 4 files changed, 164 insertions(+)

--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -75,6 +75,7 @@ enum ceph_msg_data_type {
 #ifdef CONFIG_BLOCK
 	CEPH_MSG_DATA_BIO,	/* data source/destination is a bio list */
 #endif /* CONFIG_BLOCK */
+	CEPH_MSG_DATA_BVECS,	/* data source/destination is a bio_vec array */
 };
 
 static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
@@ -86,6 +87,7 @@ static __inline__ bool ceph_msg_data_typ
 #ifdef CONFIG_BLOCK
 	case CEPH_MSG_DATA_BIO:
 #endif /* CONFIG_BLOCK */
+	case CEPH_MSG_DATA_BVECS:
 		return true;
 	default:
 		return false;
@@ -138,6 +140,42 @@ struct ceph_bio_iter {
 
 #endif /* CONFIG_BLOCK */
 
+struct ceph_bvec_iter {
+	struct bio_vec *bvecs;
+	struct bvec_iter iter;
+};
+
+#define __ceph_bvec_iter_advance_step(it, n, STEP) do {			      \
+	BUG_ON((n) > (it)->iter.bi_size);				      \
+	(void)(STEP);							      \
+	bvec_iter_advance((it)->bvecs, &(it)->iter, (n));		      \
+} while (0)
+
+/*
+ * Advance @it by @n bytes.
+ */
+#define ceph_bvec_iter_advance(it, n)					      \
+	__ceph_bvec_iter_advance_step(it, n, 0)
+
+/*
+ * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec.
+ */
+#define ceph_bvec_iter_advance_step(it, n, BVEC_STEP)			      \
+	__ceph_bvec_iter_advance_step(it, n, ({				      \
+		struct bio_vec bv;					      \
+		struct bvec_iter __cur_iter;				      \
+									      \
+		__cur_iter = (it)->iter;				      \
+		__cur_iter.bi_size = (n);				      \
+		for_each_bvec(bv, (it)->bvecs, __cur_iter, __cur_iter)	      \
+			(void)(BVEC_STEP);				      \
+	}))
+
+#define ceph_bvec_iter_shorten(it, n) do {				      \
+	BUG_ON((n) > (it)->iter.bi_size);				      \
+	(it)->iter.bi_size = (n);					      \
+} while (0)
+
 struct ceph_msg_data {
 	struct list_head		links;	/* ceph_msg->data */
 	enum ceph_msg_data_type		type;
@@ -148,6 +186,7 @@ struct ceph_msg_data {
 			u32			bio_length;
 		};
 #endif /* CONFIG_BLOCK */
+		struct ceph_bvec_iter	bvec_pos;
 		struct {
 			struct page	**pages;	/* NOT OWNER. */
 			size_t		length;		/* total # bytes */
@@ -169,6 +208,7 @@ struct ceph_msg_data_cursor {
 #ifdef CONFIG_BLOCK
 		struct ceph_bio_iter	bio_iter;
 #endif /* CONFIG_BLOCK */
+		struct bvec_iter	bvec_iter;
 		struct {				/* pages */
 			unsigned int	page_offset;	/* offset in page */
 			unsigned short	page_index;	/* index in array */
@@ -335,6 +375,8 @@ extern void ceph_msg_data_add_pagelist(s
 void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
 			   u32 length);
 #endif /* CONFIG_BLOCK */
+void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
+			     struct ceph_bvec_iter *bvec_pos);
 
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 				     bool can_fail);
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -56,6 +56,7 @@ enum ceph_osd_data_type {
 #ifdef CONFIG_BLOCK
 	CEPH_OSD_DATA_TYPE_BIO,
 #endif /* CONFIG_BLOCK */
+	CEPH_OSD_DATA_TYPE_BVECS,
 };
 
 struct ceph_osd_data {
@@ -75,6 +76,7 @@ struct ceph_osd_data {
 			u32			bio_length;
 		};
 #endif /* CONFIG_BLOCK */
+		struct ceph_bvec_iter	bvec_pos;
 	};
 };
 
@@ -409,6 +411,9 @@ void osd_req_op_extent_osd_data_bio(stru
 				    struct ceph_bio_iter *bio_pos,
 				    u32 bio_length);
 #endif /* CONFIG_BLOCK */
+void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
+					 unsigned int which,
+					 struct ceph_bvec_iter *bvec_pos);
 
 extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
 					unsigned int which,
@@ -418,6 +423,9 @@ extern void osd_req_op_cls_request_data_
 					struct page **pages, u64 length,
 					u32 alignment, bool pages_from_pool,
 					bool own_pages);
+void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
+				       unsigned int which,
+				       struct bio_vec *bvecs, u32 bytes);
 extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
 					unsigned int which,
 					struct page **pages, u64 length,
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -893,6 +893,58 @@ static bool ceph_msg_data_bio_advance(st
 }
 #endif /* CONFIG_BLOCK */
 
+static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
+					size_t length)
+{
+	struct ceph_msg_data *data = cursor->data;
+	struct bio_vec *bvecs = data->bvec_pos.bvecs;
+
+	cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size);
+	cursor->bvec_iter = data->bvec_pos.iter;
+	cursor->bvec_iter.bi_size = cursor->resid;
+
+	BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
+	cursor->last_piece =
+	    cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
+}
+
+static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
+						size_t *page_offset,
+						size_t *length)
+{
+	struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs,
+					   cursor->bvec_iter);
+
+	*page_offset = bv.bv_offset;
+	*length = bv.bv_len;
+	return bv.bv_page;
+}
+
+static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
+					size_t bytes)
+{
+	struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs;
+
+	BUG_ON(bytes > cursor->resid);
+	BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter));
+	cursor->resid -= bytes;
+	bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);
+
+	if (!cursor->resid) {
+		BUG_ON(!cursor->last_piece);
+		return false;   /* no more data */
+	}
+
+	if (!bytes || cursor->bvec_iter.bi_bvec_done)
+		return false;	/* more bytes to process in this segment */
+
+	BUG_ON(cursor->last_piece);
+	BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
+	cursor->last_piece =
+	    cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
+	return true;
+}
+
 /*
  * For a page array, a piece comes from the first page in the array
  * that has not already been fully consumed.
@@ -1076,6 +1128,9 @@ static void __ceph_msg_data_cursor_init(
 		ceph_msg_data_bio_cursor_init(cursor, length);
 		break;
 #endif /* CONFIG_BLOCK */
+	case CEPH_MSG_DATA_BVECS:
+		ceph_msg_data_bvecs_cursor_init(cursor, length);
+		break;
 	case CEPH_MSG_DATA_NONE:
 	default:
 		/* BUG(); */
@@ -1124,6 +1179,9 @@ static struct page *ceph_msg_data_next(s
 		page = ceph_msg_data_bio_next(cursor, page_offset, length);
 		break;
 #endif /* CONFIG_BLOCK */
+	case CEPH_MSG_DATA_BVECS:
+		page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
+		break;
 	case CEPH_MSG_DATA_NONE:
 	default:
 		page = NULL;
@@ -1162,6 +1220,9 @@ static void ceph_msg_data_advance(struct
 		new_piece = ceph_msg_data_bio_advance(cursor, bytes);
 		break;
 #endif /* CONFIG_BLOCK */
+	case CEPH_MSG_DATA_BVECS:
+		new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
+		break;
 	case CEPH_MSG_DATA_NONE:
 	default:
 		BUG();
@@ -3246,6 +3307,20 @@ void ceph_msg_data_add_bio(struct ceph_m
 EXPORT_SYMBOL(ceph_msg_data_add_bio);
 #endif	/* CONFIG_BLOCK */
 
+void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
+			     struct ceph_bvec_iter *bvec_pos)
+{
+	struct ceph_msg_data *data;
+
+	data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS);
+	BUG_ON(!data);
+	data->bvec_pos = *bvec_pos;
+
+	list_add_tail(&data->links, &msg->data);
+	msg->data_length += bvec_pos->iter.bi_size;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
+
 /*
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -156,6 +156,13 @@ static void ceph_osd_data_bio_init(struc
 }
 #endif /* CONFIG_BLOCK */
 
+static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
+				     struct ceph_bvec_iter *bvec_pos)
+{
+	osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
+	osd_data->bvec_pos = *bvec_pos;
+}
+
 #define osd_req_op_data(oreq, whch, typ, fld)				\
 ({									\
 	struct ceph_osd_request *__oreq = (oreq);			\
@@ -230,6 +237,17 @@ void osd_req_op_extent_osd_data_bio(stru
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
 #endif /* CONFIG_BLOCK */
 
+void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
+					 unsigned int which,
+					 struct ceph_bvec_iter *bvec_pos)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+	ceph_osd_data_bvecs_init(osd_data, bvec_pos);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
+
 static void osd_req_op_cls_request_info_pagelist(
 			struct ceph_osd_request *osd_req,
 			unsigned int which, struct ceph_pagelist *pagelist)
@@ -267,6 +285,23 @@ void osd_req_op_cls_request_data_pages(s
 }
 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
 
+void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
+				       unsigned int which,
+				       struct bio_vec *bvecs, u32 bytes)
+{
+	struct ceph_osd_data *osd_data;
+	struct ceph_bvec_iter it = {
+		.bvecs = bvecs,
+		.iter = { .bi_size = bytes },
+	};
+
+	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+	ceph_osd_data_bvecs_init(osd_data, &it);
+	osd_req->r_ops[which].cls.indata_len += bytes;
+	osd_req->r_ops[which].indata_len += bytes;
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
+
 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
 			unsigned int which, struct page **pages, u64 length,
 			u32 alignment, bool pages_from_pool, bool own_pages)
@@ -292,6 +327,8 @@ static u64 ceph_osd_data_length(struct c
 	case CEPH_OSD_DATA_TYPE_BIO:
 		return (u64)osd_data->bio_length;
 #endif /* CONFIG_BLOCK */
+	case CEPH_OSD_DATA_TYPE_BVECS:
+		return osd_data->bvec_pos.iter.bi_size;
 	default:
 		WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
 		return 0;
@@ -832,6 +869,8 @@ static void ceph_osdc_msg_data_add(struc
 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
 		ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
 #endif
+	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
+		ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
 	} else {
 		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
 	}