Blob Blame History Raw
From 73843a246ff152c1aa6dbe900287c1018c98ff66 Mon Sep 17 00:00:00 2001
From: Mike Christie <michaelc@cs.wisc.edu>
Date: Wed, 29 Jul 2015 04:23:38 -0500
Subject: [PATCH] libceph: add scatterlist messenger data type
References: fate#318836
Patch-mainline: Not yet, SES2 clustered LIO/RBD

LIO uses scatterlist for its page/data management. This patch
adds a scatterlist messenger data type, so LIO can pass its sg
down directly to rbd.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
Acked-by: David Disseldorp <ddiss@suse.de>
[luis: due to 0d9c1ab3be4c ("libceph: preallocate message data items"):
 - dropped changes to ceph_msg_data_type_valid() as this function was
   removed;
 - use ceph_msg_data_add() instead of ceph_msg_data_create() in function
   ceph_msg_data_add_sg (and no need for list_add_tail() either)]
Acked-by: Luis Henriques <lhenriques@suse.com>
---
 include/linux/ceph/messenger.h  |   12 +++++
 include/linux/ceph/osd_client.h |   12 ++++-
 net/ceph/messenger.c            |   95 ++++++++++++++++++++++++++++++++++++++++
 net/ceph/osd_client.c           |   26 ++++++++++
 4 files changed, 144 insertions(+), 1 deletion(-)

--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -79,6 +79,7 @@ enum ceph_msg_data_type {
 	CEPH_MSG_DATA_BIO,	/* data source/destination is a bio list */
 #endif /* CONFIG_BLOCK */
 	CEPH_MSG_DATA_BVECS,	/* data source/destination is a bio_vec array */
+	CEPH_MSG_DATA_SG,	/* data source/destination is a scatterlist */
 };
 
 #ifdef CONFIG_BLOCK
@@ -180,6 +181,11 @@ struct ceph_msg_data {
 			bool		own_pages;
 		};
 		struct ceph_pagelist	*pagelist;
+		struct {
+			struct scatterlist *sgl;
+			unsigned int	sgl_init_offset;
+			u64		sgl_length;
+		};
 	};
 };
 
@@ -204,6 +210,10 @@ struct ceph_msg_data_cursor {
 			struct page	*page;		/* page from list */
 			size_t		offset;		/* bytes from list */
 		};
+		struct {
+			struct scatterlist	*sg;		/* curr sg */
+			unsigned int		sg_consumed;
+		};
 	};
 };
 
@@ -365,6 +375,8 @@ void ceph_msg_data_add_bio(struct ceph_m
 #endif /* CONFIG_BLOCK */
 void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
 			     struct ceph_bvec_iter *bvec_pos);
+extern void ceph_msg_data_add_sg(struct ceph_msg *msg, struct scatterlist *sgl,
+				 unsigned int sgl_init_offset, u64 length);
 
 struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
 			       gfp_t flags, bool can_fail);
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -57,6 +57,7 @@ enum ceph_osd_data_type {
 	CEPH_OSD_DATA_TYPE_BIO,
 #endif /* CONFIG_BLOCK */
 	CEPH_OSD_DATA_TYPE_BVECS,
+	CEPH_OSD_DATA_TYPE_SG,
 };
 
 struct ceph_osd_data {
@@ -75,6 +76,11 @@ struct ceph_osd_data {
 			struct ceph_bio_iter	bio_pos;
 			u32			bio_length;
 		};
+		struct {
+			struct scatterlist *sgl;
+			size_t		sgl_length;
+			unsigned int	sgl_init_offset;
+		};
 #endif /* CONFIG_BLOCK */
 		struct {
 			struct ceph_bvec_iter	bvec_pos;
@@ -425,7 +431,11 @@ void osd_req_op_extent_osd_data_bvecs(st
 void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
 					 unsigned int which,
 					 struct ceph_bvec_iter *bvec_pos);
-
+extern void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *,
+					unsigned int which,
+					struct scatterlist *sgl,
+					unsigned int init_sg_offset,
+					u64 length);
 extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
 					unsigned int which,
 					struct ceph_pagelist *pagelist);
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -922,6 +922,75 @@ static bool ceph_msg_data_bvecs_advance(
 }
 
 /*
+ * For a sg data item, a piece is whatever remains of the next
+ * entry in the current sg entry, or the first entry in the next
+ * sg in the list.
+ */
+static void ceph_msg_data_sg_cursor_init(struct ceph_msg_data_cursor *cursor,
+					 size_t length)
+{
+	struct ceph_msg_data *data = cursor->data;
+	struct scatterlist *sg;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_SG);
+
+	sg = data->sgl;
+	BUG_ON(!sg);
+
+	cursor->resid = min_t(u64, length, data->sgl_length);
+	cursor->sg = sg;
+	cursor->sg_consumed = data->sgl_init_offset;
+	cursor->last_piece = cursor->resid <= sg->length;
+}
+
+static struct page *ceph_msg_data_sg_next(struct ceph_msg_data_cursor *cursor,
+					  size_t *page_offset, size_t *length)
+{
+	struct ceph_msg_data *data = cursor->data;
+	struct scatterlist *sg;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_SG);
+
+	sg = cursor->sg;
+	BUG_ON(!sg);
+
+	*page_offset = sg->offset + cursor->sg_consumed;
+
+	if (cursor->last_piece)
+		*length = cursor->resid;
+	else
+		*length = sg->length - cursor->sg_consumed;
+
+	/* currently support non clustered sg pages */
+	return sg_page(sg);
+}
+
+static bool ceph_msg_data_sg_advance(struct ceph_msg_data_cursor *cursor,
+				     size_t bytes)
+{
+	BUG_ON(cursor->data->type != CEPH_MSG_DATA_SG);
+
+	/* Advance the cursor offset */
+	BUG_ON(cursor->resid < bytes);
+	cursor->resid -= bytes;
+	cursor->sg_consumed += bytes;
+
+	if (!bytes || cursor->sg_consumed < cursor->sg->length)
+		return false;	/* more bytes to process in the current page */
+
+	if (!cursor->resid)
+		return false;	/* no more data */
+
+	/* For WRITE_SAME we have a single sg that is written over and over */
+	if (sg_next(cursor->sg))
+		cursor->sg = sg_next(cursor->sg);
+	cursor->sg_consumed = 0;
+
+	cursor->last_piece = cursor->resid <= cursor->sg->length;
+	return true;
+}
+
+/*
  * For a page array, a piece comes from the first page in the array
  * that has not already been fully consumed.
  */
@@ -1107,6 +1176,9 @@ static void __ceph_msg_data_cursor_init(
 	case CEPH_MSG_DATA_BVECS:
 		ceph_msg_data_bvecs_cursor_init(cursor, length);
 		break;
+	case CEPH_MSG_DATA_SG:
+		ceph_msg_data_sg_cursor_init(cursor, length);
+		break;
 	case CEPH_MSG_DATA_NONE:
 	default:
 		/* BUG(); */
@@ -1155,6 +1227,9 @@ static struct page *ceph_msg_data_next(s
 	case CEPH_MSG_DATA_BVECS:
 		page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
 		break;
+	case CEPH_MSG_DATA_SG:
+		page = ceph_msg_data_sg_next(cursor, page_offset, length);
+		break;
 	case CEPH_MSG_DATA_NONE:
 	default:
 		page = NULL;
@@ -1196,6 +1271,9 @@ static void ceph_msg_data_advance(struct
 	case CEPH_MSG_DATA_BVECS:
 		new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
 		break;
+	case CEPH_MSG_DATA_SG:
+		new_piece = ceph_msg_data_sg_advance(cursor, bytes);
+		break;
 	case CEPH_MSG_DATA_NONE:
 	default:
 		BUG();
@@ -3311,6 +3389,23 @@ void ceph_msg_data_add_bvecs(struct ceph
 }
 EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
 
+void ceph_msg_data_add_sg(struct ceph_msg *msg, struct scatterlist *sgl,
+			  unsigned int sgl_init_offset, u64 length)
+{
+	struct ceph_msg_data *data;
+
+	BUG_ON(!sgl);
+
+	data = ceph_msg_data_add(msg);
+	data->type = CEPH_MSG_DATA_SG;
+	data->sgl = sgl;
+	data->sgl_length = length;
+	data->sgl_init_offset = sgl_init_offset;
+
+	msg->data_length += length;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_sg);
+
 /*
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -169,6 +169,16 @@ static void ceph_osd_data_bvecs_init(str
 	osd_data->num_bvecs = num_bvecs;
 }
 
+static void ceph_osd_data_sg_init(struct ceph_osd_data *osd_data,
+				  struct scatterlist *sgl,
+				  unsigned int init_sg_offset, u64 length)
+{
+	osd_data->type = CEPH_OSD_DATA_TYPE_SG;
+	osd_data->sgl = sgl;
+	osd_data->sgl_length = length;
+	osd_data->sgl_init_offset = init_sg_offset;
+}
+
 #define osd_req_op_data(oreq, whch, typ, fld)				\
 ({									\
 	struct ceph_osd_request *__oreq = (oreq);			\
@@ -270,6 +280,17 @@ void osd_req_op_extent_osd_data_bvec_pos
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
 
+void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *osd_req,
+			unsigned int which, struct scatterlist *sgl,
+			unsigned int init_sg_offset, u64 length)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+	ceph_osd_data_sg_init(osd_data, sgl, init_sg_offset, length);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_sg);
+
 static void osd_req_op_cls_request_info_pagelist(
 			struct ceph_osd_request *osd_req,
 			unsigned int which, struct ceph_pagelist *pagelist)
@@ -352,6 +373,8 @@ static u64 ceph_osd_data_length(struct c
 #endif /* CONFIG_BLOCK */
 	case CEPH_OSD_DATA_TYPE_BVECS:
 		return osd_data->bvec_pos.iter.bi_size;
+	case CEPH_OSD_DATA_TYPE_SG:
+		return osd_data->sgl_length;
 	default:
 		WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
 		return 0;
@@ -959,6 +982,9 @@ static void ceph_osdc_msg_data_add(struc
 #endif
 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
 		ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
+	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_SG) {
+		ceph_msg_data_add_sg(msg, osd_data->sgl,
+				     osd_data->sgl_init_offset, length);
 	} else {
 		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
 	}