From 73843a246ff152c1aa6dbe900287c1018c98ff66 Mon Sep 17 00:00:00 2001
From: Mike Christie <michaelc@cs.wisc.edu>
Date: Wed, 29 Jul 2015 04:23:38 -0500
Subject: [PATCH] libceph: add scatterlist messenger data type
References: fate#318836
Patch-mainline: Not yet, SES2 clustered LIO/RBD
LIO uses scatterlist for its page/data management. This patch
adds a scatterlist messenger data type, so LIO can pass its sg
down directly to rbd.
Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
Acked-by: David Disseldorp <ddiss@suse.de>
---
include/linux/ceph/messenger.h | 13 +++++
include/linux/ceph/osd_client.h | 12 ++++-
net/ceph/messenger.c | 96 ++++++++++++++++++++++++++++++++++++++++
net/ceph/osd_client.c | 26 ++++++++++
4 files changed, 146 insertions(+), 1 deletion(-)
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -73,6 +73,7 @@ enum ceph_msg_data_type {
#ifdef CONFIG_BLOCK
CEPH_MSG_DATA_BIO, /* data source/destination is a bio list */
#endif /* CONFIG_BLOCK */
+ CEPH_MSG_DATA_SG, /* data source/destination is a scatterlist */
};
static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
@@ -84,6 +85,7 @@ static __inline__ bool ceph_msg_data_typ
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_SG:
return true;
default:
return false;
@@ -106,6 +108,11 @@ struct ceph_msg_data {
unsigned int alignment; /* first page */
};
struct ceph_pagelist *pagelist;
+ struct {
+ struct scatterlist *sgl;
+ unsigned int sgl_init_offset;
+ u64 sgl_length;
+ };
};
};
@@ -133,6 +140,10 @@ struct ceph_msg_data_cursor {
struct page *page; /* page from list */
size_t offset; /* bytes from list */
};
+ struct {
+ struct scatterlist *sg; /* curr sg */
+ unsigned int sg_consumed;
+ };
};
};
@@ -290,6 +301,8 @@ extern void ceph_msg_data_add_pagelist(s
extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
size_t length);
#endif /* CONFIG_BLOCK */
+extern void ceph_msg_data_add_sg(struct ceph_msg *msg, struct scatterlist *sgl,
+ unsigned int sgl_init_offset, u64 length);
extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
bool can_fail);
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -53,6 +53,7 @@ enum ceph_osd_data_type {
#ifdef CONFIG_BLOCK
CEPH_OSD_DATA_TYPE_BIO,
#endif /* CONFIG_BLOCK */
+ CEPH_OSD_DATA_TYPE_SG,
};
struct ceph_osd_data {
@@ -71,6 +72,11 @@ struct ceph_osd_data {
struct bio *bio; /* list of bios */
size_t bio_length; /* total in list */
};
+ struct {
+ struct scatterlist *sgl;
+ size_t sgl_length;
+ unsigned int sgl_init_offset;
+ };
#endif /* CONFIG_BLOCK */
};
};
@@ -343,7 +349,11 @@ extern void osd_req_op_extent_osd_data_b
unsigned int which,
struct bio *bio, size_t bio_length);
#endif /* CONFIG_BLOCK */
-
+extern void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *,
+ unsigned int which,
+ struct scatterlist *sgl,
+ unsigned int init_sg_offset,
+ u64 length);
extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
unsigned int which,
struct ceph_pagelist *pagelist);
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -926,6 +926,75 @@ static bool ceph_msg_data_bio_advance(st
#endif /* CONFIG_BLOCK */
/*
+ * For a sg data item, a piece is whatever remains of the next
+ * entry in the current sg entry, or the first entry in the next
+ * sg in the list.
+ */
+static void ceph_msg_data_sg_cursor_init(struct ceph_msg_data_cursor *cursor,
+ size_t length)
+{
+ struct ceph_msg_data *data = cursor->data;
+ struct scatterlist *sg;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_SG);
+
+ sg = data->sgl;
+ BUG_ON(!sg);
+
+ cursor->resid = min_t(u64, length, data->sgl_length);
+ cursor->sg = sg;
+ cursor->sg_consumed = data->sgl_init_offset;
+ cursor->last_piece = cursor->resid <= sg->length;
+}
+
+static struct page *ceph_msg_data_sg_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
+{
+ struct ceph_msg_data *data = cursor->data;
+ struct scatterlist *sg;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_SG);
+
+ sg = cursor->sg;
+ BUG_ON(!sg);
+
+ *page_offset = sg->offset + cursor->sg_consumed;
+
+ if (cursor->last_piece)
+ *length = cursor->resid;
+ else
+ *length = sg->length - cursor->sg_consumed;
+
+ /* currently support non clustered sg pages */
+ return sg_page(sg);
+}
+
+static bool ceph_msg_data_sg_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
+{
+ BUG_ON(cursor->data->type != CEPH_MSG_DATA_SG);
+
+ /* Advance the cursor offset */
+ BUG_ON(cursor->resid < bytes);
+ cursor->resid -= bytes;
+ cursor->sg_consumed += bytes;
+
+ if (!bytes || cursor->sg_consumed < cursor->sg->length)
+ return false; /* more bytes to process in the current page */
+
+ if (!cursor->resid)
+ return false; /* no more data */
+
+ /* For WRITE_SAME we have a single sg that is written over and over */
+ if (sg_next(cursor->sg))
+ cursor->sg = sg_next(cursor->sg);
+ cursor->sg_consumed = 0;
+
+ cursor->last_piece = cursor->resid <= cursor->sg->length;
+ return true;
+}
+
+/*
* For a page array, a piece comes from the first page in the array
* that has not already been fully consumed.
*/
@@ -1108,6 +1177,9 @@ static void __ceph_msg_data_cursor_init(
ceph_msg_data_bio_cursor_init(cursor, length);
break;
#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_SG:
+ ceph_msg_data_sg_cursor_init(cursor, length);
+ break;
case CEPH_MSG_DATA_NONE:
default:
/* BUG(); */
@@ -1156,6 +1228,9 @@ static struct page *ceph_msg_data_next(s
page = ceph_msg_data_bio_next(cursor, page_offset, length);
break;
#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_SG:
+ page = ceph_msg_data_sg_next(cursor, page_offset, length);
+ break;
case CEPH_MSG_DATA_NONE:
default:
page = NULL;
@@ -1192,6 +1267,9 @@ static void ceph_msg_data_advance(struct
new_piece = ceph_msg_data_bio_advance(cursor, bytes);
break;
#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_SG:
+ new_piece = ceph_msg_data_sg_advance(cursor, bytes);
+ break;
case CEPH_MSG_DATA_NONE:
default:
BUG();
@@ -3274,6 +3352,24 @@ void ceph_msg_data_add_bio(struct ceph_m
EXPORT_SYMBOL(ceph_msg_data_add_bio);
#endif /* CONFIG_BLOCK */
+void ceph_msg_data_add_sg(struct ceph_msg *msg, struct scatterlist *sgl,
+ unsigned int sgl_init_offset, u64 length)
+{
+ struct ceph_msg_data *data;
+
+ BUG_ON(!sgl);
+
+ data = ceph_msg_data_create(CEPH_MSG_DATA_SG);
+ BUG_ON(!data);
+ data->sgl = sgl;
+ data->sgl_length = length;
+ data->sgl_init_offset = sgl_init_offset;
+
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += length;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_sg);
+
/*
* construct a new message with given type, size
* the new msg has a ref count of 1.
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -153,6 +153,16 @@ static void ceph_osd_data_bio_init(struc
}
#endif /* CONFIG_BLOCK */
+static void ceph_osd_data_sg_init(struct ceph_osd_data *osd_data,
+ struct scatterlist *sgl,
+ unsigned int init_sg_offset, u64 length)
+{
+ osd_data->type = CEPH_OSD_DATA_TYPE_SG;
+ osd_data->sgl = sgl;
+ osd_data->sgl_length = length;
+ osd_data->sgl_init_offset = init_sg_offset;
+}
+
#define osd_req_op_data(oreq, whch, typ, fld) \
({ \
struct ceph_osd_request *__oreq = (oreq); \
@@ -225,6 +235,17 @@ void osd_req_op_extent_osd_data_bio(stru
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
#endif /* CONFIG_BLOCK */
+void osd_req_op_extent_osd_data_sg(struct ceph_osd_request *osd_req,
+ unsigned int which, struct scatterlist *sgl,
+ unsigned int init_sg_offset, u64 length)
+{
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+ ceph_osd_data_sg_init(osd_data, sgl, init_sg_offset, length);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_sg);
+
static void osd_req_op_cls_request_info_pagelist(
struct ceph_osd_request *osd_req,
unsigned int which, struct ceph_pagelist *pagelist)
@@ -287,6 +308,8 @@ static u64 ceph_osd_data_length(struct c
case CEPH_OSD_DATA_TYPE_BIO:
return (u64)osd_data->bio_length;
#endif /* CONFIG_BLOCK */
+ case CEPH_OSD_DATA_TYPE_SG:
+ return osd_data->sgl_length;
default:
WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
return 0;
@@ -819,6 +842,9 @@ static void ceph_osdc_msg_data_add(struc
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
ceph_msg_data_add_bio(msg, osd_data->bio, length);
#endif
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_SG) {
+ ceph_msg_data_add_sg(msg, osd_data->sgl,
+ osd_data->sgl_init_offset, length);
} else {
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
}