Blob Blame History Raw
From 683661a69ef6e4643ce378b39bcd4c8820101cec Mon Sep 17 00:00:00 2001
From: Mike Christie <michaelc@cs.wisc.edu>
Date: Wed, 29 Jul 2015 04:23:41 -0500
Subject: [PATCH] libceph: support bidirectional requests
References: fate#318836
Patch-mainline: Not yet, SES2 clustered LIO/RBD

The next patch will add support for SCSI's compare and write
command. This command sends N bytes, compares them to N bytes on disk,
then returns success or the offset in the buffer where a miscompare
occured. For Ceph support, I implemented this as a multiple op request:

1. a new CMPEXT (compare extent) operation that compare N bytes
and if a miscompare occured then returns the offset it miscompared
and also returns the buffer.
2. a write request. If the CMPEXT succeeds then this will be executed.

This patch modifies libceph so it can support both a request buffer
and response buffer for extent based IO, so the CMPEXT command can
send its comparision buffer and also receive the failed buffer if needed.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
Acked-by: David Disseldorp <ddiss@suse.de>
[ddiss@suse.de: rebase against d15f9d694b]
[ddiss@suse.de: add missing osd_client.h prototypes]
Acked-by: Luis Henriques <lhenriques@suse.com>
[lhenriques@suse.com: rebase against 5b64640cf65b]
---
 fs/ceph/addr.c                  |    6 +-
 fs/ceph/file.c                  |    2 
 include/linux/ceph/osd_client.h |    8 ++
 net/ceph/osd_client.c           |  108 +++++++++++++++++++++++++++++++---------
 4 files changed, 94 insertions(+), 30 deletions(-)

--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -268,7 +268,7 @@ static void finish_read(struct ceph_osd_
 	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
 
 	/* unlock all pages, zeroing any data we didn't read */
-	osd_data = osd_req_op_extent_osd_data(req, 0);
+	osd_data = osd_req_op_extent_osd_response_data(req, 0);
 	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
 	num_pages = calc_pages_for((u64)osd_data->alignment,
 					(u64)osd_data->length);
@@ -633,7 +633,7 @@ static void writepages_finish(struct cep
 		if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
 			break;
 
-		osd_data = osd_req_op_extent_osd_data(req, i);
+		osd_data = osd_req_op_extent_osd_request_data(req, i);
 		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
 		num_pages = calc_pages_for((u64)osd_data->alignment,
 					   (u64)osd_data->length);
@@ -669,7 +669,7 @@ static void writepages_finish(struct cep
 
 	ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
 
-	osd_data = osd_req_op_extent_osd_data(req, 0);
+	osd_data = osd_req_op_extent_osd_request_data(req, 0);
 	if (osd_data->pages_from_pool)
 		mempool_free(osd_data->pages,
 			     ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -622,7 +622,7 @@ static void ceph_aio_complete_req(struct
 	int rc = req->r_result;
 	struct inode *inode = req->r_inode;
 	struct ceph_aio_request *aio_req = req->r_priv;
-	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
+	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_response_data(req, 0);
 	int num_pages = calc_pages_for((u64)osd_data->alignment,
 				       osd_data->length);
 
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -90,7 +90,8 @@ struct ceph_osd_req_op {
 			u64 offset, length;
 			u64 truncate_size;
 			u32 truncate_seq;
-			struct ceph_osd_data osd_data;
+			struct ceph_osd_data request_data;
+			struct ceph_osd_data response_data;
 		} extent;
 		struct {
 			u32 name_len;
@@ -270,7 +271,10 @@ extern void osd_req_op_extent_init(struc
 extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
 					unsigned int which, u64 length);
 
-extern struct ceph_osd_data *osd_req_op_extent_osd_data(
+extern struct ceph_osd_data *osd_req_op_extent_osd_request_data(
+					struct ceph_osd_request *osd_req,
+					unsigned int which);
+extern struct ceph_osd_data *osd_req_op_extent_osd_response_data(
 					struct ceph_osd_request *osd_req,
 					unsigned int which);
 extern struct ceph_osd_data *osd_req_op_cls_response_data(
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -147,12 +147,20 @@ osd_req_op_raw_data_in(struct ceph_osd_r
 }
 
 struct ceph_osd_data *
-osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
-			unsigned int which)
+osd_req_op_extent_osd_request_data(struct ceph_osd_request *osd_req,
+				   unsigned int which)
 {
-	return osd_req_op_data(osd_req, which, extent, osd_data);
+	return osd_req_op_data(osd_req, which, extent, request_data);
 }
-EXPORT_SYMBOL(osd_req_op_extent_osd_data);
+EXPORT_SYMBOL(osd_req_op_extent_osd_request_data);
+
+struct ceph_osd_data *
+osd_req_op_extent_osd_response_data(struct ceph_osd_request *osd_req,
+				    unsigned int which)
+{
+	return osd_req_op_data(osd_req, which, extent, response_data);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_response_data);
 
 struct ceph_osd_data *
 osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
@@ -180,21 +188,46 @@ void osd_req_op_extent_osd_data_pages(st
 			u64 length, u32 alignment,
 			bool pages_from_pool, bool own_pages)
 {
-	struct ceph_osd_data *osd_data;
+	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
 
-	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
-	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
-				pages_from_pool, own_pages);
+	switch (op->op) {
+	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_ZERO:
+	case CEPH_OSD_OP_TRUNCATE:
+		ceph_osd_data_pages_init(&op->extent.response_data, pages,
+					 length, alignment, pages_from_pool,
+					 own_pages);
+		break;
+	case CEPH_OSD_OP_WRITE:
+		ceph_osd_data_pages_init(&op->extent.request_data, pages,
+					  length, alignment, pages_from_pool,
+					  own_pages);
+		break;
+	default:
+		BUG();
+	}
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
 
 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
 			unsigned int which, struct ceph_pagelist *pagelist)
 {
-	struct ceph_osd_data *osd_data;
+	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
 
-	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
-	ceph_osd_data_pagelist_init(osd_data, pagelist);
+	switch (op->op) {
+	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_ZERO:
+	case CEPH_OSD_OP_TRUNCATE:
+		ceph_osd_data_pagelist_init(&op->extent.response_data,
+					    pagelist);
+		break;
+	case CEPH_OSD_OP_WRITE:
+		ceph_osd_data_pagelist_init(&op->extent.request_data,
+					    pagelist);
+		break;
+	default:
+		BUG();
+	}
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
 
@@ -202,10 +235,22 @@ EXPORT_SYMBOL(osd_req_op_extent_osd_data
 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
 			unsigned int which, struct bio *bio, size_t bio_length)
 {
-	struct ceph_osd_data *osd_data;
+	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
 
-	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
-	ceph_osd_data_bio_init(osd_data, bio, bio_length);
+	switch (op->op) {
+	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_ZERO:
+	case CEPH_OSD_OP_TRUNCATE:
+		ceph_osd_data_bio_init(&op->extent.response_data, bio,
+				       bio_length);
+		break;
+	case CEPH_OSD_OP_WRITE:
+		ceph_osd_data_bio_init(&op->extent.request_data, bio,
+				       bio_length);
+		break;
+	default:
+		BUG();
+	}
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
 #endif /* CONFIG_BLOCK */
@@ -214,10 +259,22 @@ void osd_req_op_extent_osd_data_sg(struc
 			unsigned int which, struct scatterlist *sgl,
 			unsigned int init_sg_offset, u64 length)
 {
-	struct ceph_osd_data *osd_data;
+	struct ceph_osd_req_op *op = &osd_req->r_ops[which];
 
-	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
-	ceph_osd_data_sg_init(osd_data, sgl, init_sg_offset, length);
+	switch (op->op) {
+	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_ZERO:
+	case CEPH_OSD_OP_TRUNCATE:
+		ceph_osd_data_sg_init(&op->extent.response_data,
+				      sgl, init_sg_offset, length);
+		break;
+	case CEPH_OSD_OP_WRITE:
+		ceph_osd_data_sg_init(&op->extent.request_data,
+				      sgl, init_sg_offset, length);
+		break;
+	default:
+		BUG();
+	}
 }
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_sg);
 
@@ -309,9 +366,11 @@ static void osd_req_op_data_release(stru
 
 	switch (op->op) {
 	case CEPH_OSD_OP_READ:
+		ceph_osd_data_release(&op->extent.response_data);
+		break;
 	case CEPH_OSD_OP_WRITE:
 	case CEPH_OSD_OP_WRITEFULL:
-		ceph_osd_data_release(&op->extent.osd_data);
+		ceph_osd_data_release(&op->extent.request_data);
 		break;
 	case CEPH_OSD_OP_CALL:
 		ceph_osd_data_release(&op->cls.request_info);
@@ -703,21 +762,22 @@ static u64 osd_req_encode_op(struct ceph
 	case CEPH_OSD_OP_WRITEFULL:
 	case CEPH_OSD_OP_ZERO:
 	case CEPH_OSD_OP_TRUNCATE:
-		if (src->op == CEPH_OSD_OP_WRITE ||
-		    src->op == CEPH_OSD_OP_WRITEFULL)
-			request_data_len = src->extent.length;
 		dst->extent.offset = cpu_to_le64(src->extent.offset);
 		dst->extent.length = cpu_to_le64(src->extent.length);
 		dst->extent.truncate_size =
 			cpu_to_le64(src->extent.truncate_size);
 		dst->extent.truncate_seq =
 			cpu_to_le32(src->extent.truncate_seq);
-		osd_data = &src->extent.osd_data;
 		if (src->op == CEPH_OSD_OP_WRITE ||
-		    src->op == CEPH_OSD_OP_WRITEFULL)
+		    src->op == CEPH_OSD_OP_WRITEFULL) {
+			osd_data = &src->extent.request_data;
 			ceph_osdc_msg_data_add(req->r_request, osd_data);
-		else
+
+			request_data_len = src->extent.length;
+		} else {
+			osd_data = &src->extent.response_data;
 			ceph_osdc_msg_data_add(req->r_reply, osd_data);
+		}
 		break;
 	case CEPH_OSD_OP_CALL:
 		dst->cls.class_len = src->cls.class_len;