Blob Blame History Raw
From 6e5841905a904ab7194994133ed7fd14789d9b6d Mon Sep 17 00:00:00 2001
From: David Disseldorp <ddiss@suse.de>
Date: Mon, 12 Oct 2020 00:58:54 +0200
Subject: [PATCH] target/rbd: support COMPARE_AND_WRITE
References: fate#318836, bsc#1177090
Patch-mainline: Not yet, SES clustered LIO/RBD

Add an RBD specific atomic compare-and-write handler which uses
previous helpers for generating compound cmpext+write OSD requests.
The miscompare status and offset are propagated for sense marshalling
via the cmd.scsi_asc and cmd.bad_sector fields respectively.

Reviewed-by: Roman Penyaev <rpenyaev@suse.com>
[ddiss: kABI: propagate sense via scsi_asc and bad_sector]
Signed-off-by: David Disseldorp <ddiss@suse.de>
Reviewed-by: Luis Henriques <lhenriques@suse.com>
---
 drivers/target/target_core_rbd.c | 184 ++++++++++++++++++++++++++++++-
 1 file changed, 181 insertions(+), 3 deletions(-)

diff --git a/drivers/target/target_core_rbd.c b/drivers/target/target_core_rbd.c
index 2c654ec33a01..09e662ec689c 100644
--- a/drivers/target/target_core_rbd.c
+++ b/drivers/target/target_core_rbd.c
@@ -120,8 +120,11 @@ static int tcm_rbd_configure_device(struct se_device *dev)
 	dev->dev_attrib.max_write_same_len = 0xFFFF;
 	dev->dev_attrib.is_nonrot = 1;
 
-	/* disable LIO non-atomic handling of compare and write */
-	dev->dev_attrib.emulate_caw = 0;
+	/*
+	 * TODO fail if RBD stripe unit isn't a multiple of SCSI block size:
+	 * multi-object compare-and-write isn't atomic.
+	 */
+
 	/* disable standalone reservation handling */
 	dev->dev_attrib.emulate_pr = 0;
 
@@ -500,6 +503,171 @@ tcm_rbd_execute_write_same(struct se_cmd *cmd)
 	return sense;
 }
 
+static void tcm_rbd_cmp_and_write_callback(struct rbd_img_request *img_request,
+					   int result)
+{
+	struct se_cmd *cmd = img_request->lio_cmd_data;
+	struct tcm_rbd_cmd *trc = cmd->priv;
+
+	cmd->priv = NULL;
+	if (result <= -MAX_ERRNO) {
+		/*
+		 * OSDs return -MAX_ERRNO - offset_of_mismatch
+		 * This offset calculation would be incorrect if we supported
+		 * compare-and-write with multi-object striping.
+		 */
+		cmd->bad_sector = (sector_t)(-1 * (result + MAX_ERRNO));
+		/*
+		 * kABI: we can't easily propagate TCM_MISCOMPARE_VERIFY here,
+		 * so signal it via the scsi_asc field.
+		 */
+		cmd->scsi_asc = 0x1d;	/* MISCOMPARE DURING VERIFY OPERATION */
+		pr_debug("COMPARE_AND_WRITE: miscompare at offset %llu\n",
+			 cmd->bad_sector);
+		target_complete_cmd(cmd, SAM_STAT_CHECK_CONDITION);
+	} else if (result) {
+		target_complete_cmd(cmd, SAM_STAT_CHECK_CONDITION);
+	} else {
+		target_complete_cmd(cmd, SAM_STAT_GOOD);
+	}
+
+	if (trc) {
+		rbd_img_request_put(trc->img_request);
+		kfree(trc->bvecs);
+		kfree(trc);
+	}
+}
+
+static sense_reason_t
+tcm_rbd_execute_cmp_and_write(struct se_cmd *cmd)
+{
+	struct se_device *dev = cmd->se_dev;
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	struct rbd_device *rbd_dev = tcm_rbd_dev->rbd_dev;
+	struct tcm_rbd_cmd *trc;
+	struct rbd_img_request *img_request;
+	struct ceph_snap_context *snapc = NULL;
+	u64 mapping_size;
+	sense_reason_t sense = TCM_NO_SENSE;
+	u64 offset = rbd_lba_shift(dev, cmd->t_task_lba);
+	u64 length = rbd_lba_shift(dev, cmd->t_task_nolb);
+	int result;
+
+	if (!length) {
+		dout("zero-length compare-and-write request\n");
+		target_complete_cmd(cmd, SAM_STAT_GOOD);
+		return TCM_NO_SENSE;
+	}
+
+	if (rbd_dev->spec->snap_id != CEPH_NOSNAP) {
+		pr_warn("compare-and-write on read-only snapshot");
+		sense = TCM_WRITE_PROTECTED;
+		goto err;
+	}
+
+	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
+		pr_warn("request for non-existent snapshot");
+		BUG_ON(rbd_dev->spec->snap_id == CEPH_NOSNAP);
+		sense = TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
+		goto err;
+	}
+
+	if (offset && length > U64_MAX - offset + 1) {
+		pr_warn("bad request range (%llu~%llu)", offset, length);
+		sense = TCM_INVALID_CDB_FIELD;
+		goto err;	/* Shouldn't happen */
+	}
+
+	down_read(&rbd_dev->header_rwsem);
+	mapping_size = rbd_dev->mapping.size;
+	snapc = rbd_dev->header.snapc;
+	ceph_get_snap_context(snapc);
+	up_read(&rbd_dev->header_rwsem);
+
+	/*
+	 * No need to take dev->caw_sem here, as the IO is mapped to a compound
+	 * compare+write OSD request, which is handled atomically by the OSD.
+	 */
+
+	if (offset + length > mapping_size) {
+		pr_warn("beyond EOD (%llu~%llu > %llu)", offset,
+			length, mapping_size);
+		if (!tcm_rbd_dev->emulate_legacy_capacity) {
+			sense = TCM_ADDRESS_OUT_OF_RANGE;
+			goto err_snapc;
+		}
+	}
+
+	/* need twice as much data for each compare & write operation */
+	if (cmd->data_length < length * 2) {
+		sense = TCM_INVALID_CDB_FIELD;
+		goto err_snapc;
+	}
+
+	trc = kzalloc(sizeof(struct tcm_rbd_cmd), GFP_KERNEL);
+	if (!trc) {
+		sense = TCM_OUT_OF_RESOURCES;
+		goto err_snapc;
+	}
+
+	img_request = rbd_img_request_create(rbd_dev, OBJ_OP_CMP_AND_WRITE,
+					     snapc,
+					     tcm_rbd_cmp_and_write_callback);
+	if (!img_request) {
+		sense = TCM_OUT_OF_RESOURCES;
+		goto err_trc;
+	}
+	snapc = NULL; /* img_request consumes a ref */
+	trc->img_request = img_request;
+
+	pr_debug("rbd_dev %p compare-and-write img_req %p %llu~%llu\n",
+		 rbd_dev, img_request, offset, length);
+
+	/*
+	 * data in cmd->t_data_sg is arrange as:
+	 * [len * data for compare | len * data for write]
+	 */
+	result = tcm_rbd_sgl_to_bvecs(cmd->t_data_sg, cmd->t_data_nents,
+				      &trc->bvecs);
+	if (!result) {
+		struct ceph_file_extent img_extent = {
+			.fe_off = offset,
+			.fe_len = length,
+		};
+		result = rbd_img_fill_cmp_and_write_from_bvecs(img_request,
+						      &img_extent,
+						      trc->bvecs);
+	}
+
+	if (result == -ENOMEM) {
+		sense = TCM_OUT_OF_RESOURCES;
+		goto err_img_request;
+	} else if (result) {
+		sense = TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
+		goto err_img_request;
+	}
+
+	img_request->lio_cmd_data = cmd;
+	cmd->priv = trc;
+
+	rbd_img_handle_request(img_request, 0);
+
+	return TCM_NO_SENSE;
+
+err_img_request:
+	rbd_img_request_put(img_request);
+err_trc:
+	kfree(trc->bvecs);
+	kfree(trc);
+err_snapc:
+	if (sense)
+		pr_warn("RBD compare-and-write %llx at %llx sense %d",
+			length, offset, sense);
+	ceph_put_snap_context(snapc);
+err:
+	return sense;
+}
+
 enum {
 	Opt_udev_path, Opt_readonly, Opt_force, Opt_err
 };
@@ -680,7 +848,17 @@ static struct sbc_ops tcm_rbd_sbc_ops = {
 static sense_reason_t
 tcm_rbd_parse_cdb(struct se_cmd *cmd)
 {
-	return sbc_parse_cdb(cmd, &tcm_rbd_sbc_ops);
+	sense_reason_t sense = sbc_parse_cdb(cmd, &tcm_rbd_sbc_ops);
+	if (sense)
+		return sense;
+
+	/* we provide our own atomic COMPARE_AND_WRITE handler */
+	if (cmd->se_cmd_flags & SCF_COMPARE_AND_WRITE) {
+		cmd->execute_cmd = tcm_rbd_execute_cmp_and_write;
+		cmd->transport_complete_callback = NULL;
+	}
+
+	return TCM_NO_SENSE;
 }
 
 static bool tcm_rbd_get_write_cache(struct se_device *dev)
-- 
2.26.2