Blob Blame History Raw
From c2cf7d03b927d5930b1c633e8f10119c240eb554 Mon Sep 17 00:00:00 2001
From: David Disseldorp <ddiss@suse.de>
Date: Tue, 25 Aug 2015 16:41:26 +0200
Subject: [PATCH] target/rbd: add support for PR register & read keys
Patch-mainline: Not yet, SES2 clustered LIO/RBD
References: fate#318836

Store and retrieve persistent reservation information in a "pr_info"
xattr on the rbd header object.
Persistent reservation information is encoded in an ASCII string, in
order to use the atomic compare-and-set functionality offered by Ceph
OSDs.
A sequence number is maintained alongside the PR info, to allow for
future cache optimisations.

Signed-off-by: David Disseldorp <ddiss@suse.de>
---
 drivers/target/target_core_rbd.c |  911 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 911 insertions(+)

--- a/drivers/target/target_core_rbd.c
+++ b/drivers/target/target_core_rbd.c
@@ -32,6 +32,7 @@
 #include <linux/spinlock.h>
 #include <linux/genhd.h>
 #include <linux/module.h>
+#include <linux/stringify.h>
 #include <asm/unaligned.h>
 
 #include <scsi/scsi_proto.h>
@@ -678,6 +679,915 @@ static bool tcm_rbd_get_write_cache(stru
 	return false;
 }
 
+#define TCM_RBD_PR_INFO_XATTR_KEY "pr_info"
+#define TCM_RBD_PR_INFO_XATTR_VERS 1
+
+#define TCM_RBD_PR_INFO_XATTR_FIELD_VER		0
+#define TCM_RBD_PR_INFO_XATTR_FIELD_SEQ		1
+#define TCM_RBD_PR_INFO_XATTR_FIELD_GEN		2
+#define TCM_RBD_PR_INFO_XATTR_FIELD_NUM_REGS	3
+#define TCM_RBD_PR_INFO_XATTR_FIELD_REGS_START	4
+
+/* don't allow encoded PR info to exceed 8K */
+#define TCM_RBD_PR_INFO_XATTR_MAX_SIZE 8192
+
+/*
+ * TRANSPORT_IQN_LEN + strlen(",i,0x") + sizeof(u64) * 2 + strlen(",")
+ *	+ TRANSPORT_IQN_LEN + strlen(",t,0x") + sizeof(u32) * 2 + sizeof("\0") =
+ */
+#define TCM_RBD_PR_IT_NEXUS_MAXLEN	484
+
+/* number of retries amid concurrent PR info changes from other nodes */
+#define TCM_RBD_PR_REG_MAX_RETRIES	5
+
+/*
+ * Persistent reservation info. This structure is converted to and from a
+ * string for storage within an RBD object xattr. String based storage allows
+ * us to use xattr compare and write operations for atomic PR info updates.
+ */
+struct tcm_rbd_pr_reg {
+	struct list_head regs_node;
+	u64 key;		/* registered key */
+	/* I-T nexus for registration */
+	char it_nexus[TCM_RBD_PR_IT_NEXUS_MAXLEN];
+};
+#define TCM_RBD_PR_INFO_XATTR_ENCODED_PR_REG_MAXLEN		\
+	((sizeof("0x") + sizeof(u64) * 2) + sizeof(" ") +	\
+	 TCM_RBD_PR_IT_NEXUS_MAXLEN + sizeof("\n"))
+
+struct tcm_rbd_pr_info {
+	u32 vers;		/* on disk format version number */
+	u32 seq; 		/* sequence number bumped every xattr write */
+	u32 gen; 		/* PR generation number */
+	u32 num_regs;		/* number of registrations */
+	struct list_head regs;	/* list of registrations */
+};
+#define TCM_RBD_PR_INFO_XATTR_ENCODED_MAXLEN(_num_regs)			\
+	((sizeof("0x") + sizeof(u32) * 2) + sizeof("\n") +		\
+	 (sizeof("0x") + sizeof(u32) * 2) + sizeof("\n") +		\
+	 (sizeof("0x") + sizeof(u32) * 2) + sizeof("\n") +		\
+	 (sizeof("0x") + sizeof(u32) * 2) + sizeof("\n") +		\
+	 (TCM_RBD_PR_INFO_XATTR_ENCODED_PR_REG_MAXLEN * _num_regs) +	\
+	 sizeof("\0"))
+
+static int
+tcm_rbd_gen_it_nexus(struct se_session *se_sess,
+		     char *nexus_buf,
+		     size_t buflen)
+{
+	struct se_portal_group *se_tpg;
+	const struct target_core_fabric_ops *tfo;
+	int rc;
+
+	if (!se_sess || !se_sess->se_node_acl || !se_sess->se_tpg) {
+		pr_warn("invalid session for IT nexus generation\n");
+		return -EINVAL;
+	}
+
+	se_tpg = se_sess->se_tpg;
+	tfo = se_tpg->se_tpg_tfo;
+
+	rc = snprintf(nexus_buf, buflen, "%s,i,0x%llx,%s,t,0x%x",
+		      se_sess->se_node_acl->initiatorname,
+		      se_sess->sess_bin_isid,
+		      tfo->tpg_get_wwn(se_tpg),
+		      (u32)tfo->tpg_get_tag(se_tpg));
+	if ((rc < 0) || (rc >= buflen)) {
+		pr_err("error formatting reserve cookie\n");
+		return -EINVAL;
+	}
+
+	return 0;
+}
+
+static void
+tcm_rbd_pr_info_free(struct tcm_rbd_pr_info *pr_info)
+{
+	struct tcm_rbd_pr_reg *reg;
+	struct tcm_rbd_pr_reg *reg_n;
+
+	list_for_each_entry_safe(reg, reg_n, &pr_info->regs, regs_node) {
+		kfree(reg);
+	}
+	kfree(pr_info);
+}
+
+static int
+tcm_rbd_pr_info_append_reg(struct tcm_rbd_pr_info *pr_info, char *nexus,
+			   u64 key)
+{
+	struct tcm_rbd_pr_reg *reg;
+
+	reg = kmalloc(sizeof(*reg), GFP_KERNEL);
+	if (!reg) {
+		return -ENOMEM;
+	}
+
+	reg->key = key;
+	strlcpy(reg->it_nexus, nexus, ARRAY_SIZE(reg->it_nexus));
+
+	list_add_tail(&reg->regs_node, &pr_info->regs);
+	pr_info->num_regs++;
+
+	dout("appended pr_info reg: 0x%llx\n", reg->key);
+
+	return 0;
+}
+
+static void
+tcm_rbd_pr_info_clear_reg(struct tcm_rbd_pr_info *pr_info,
+			  struct tcm_rbd_pr_reg *reg)
+{
+	list_del(&reg->regs_node);
+	pr_info->num_regs--;
+
+	dout("deleted pr_info reg: 0x%llx\n", reg->key);
+
+	kfree(reg);
+}
+
+static int
+tcm_rbd_pr_info_unregister_reg(struct tcm_rbd_pr_info *pr_info,
+			       struct tcm_rbd_pr_reg *reg)
+{
+	/*
+	 * TODO
+	 * If the I_T nexus is a reservation holder, the persistent reservation
+	 * is of an all registrants type, and the I_T nexus is the last
+	 * remaining registered I_T nexus, then the device server shall also
+	 * release the persistent reservation.
+	 */
+
+	tcm_rbd_pr_info_clear_reg(pr_info, reg);
+
+	return 0;
+}
+
+static int
+tcm_rbd_pr_info_vers_decode(char *str, u32 *vers)
+{
+	int rc;
+
+	BUG_ON(!vers);
+	rc = sscanf(str, "0x%08x", vers);
+	if (rc != 1) {
+		pr_err("failed to decode PR info version in: %s\n", str);
+		return -EINVAL;
+	}
+
+	if (*vers != TCM_RBD_PR_INFO_XATTR_VERS) {
+		pr_err("unsupported PR info version: %u\n", *vers);
+		return -EINVAL;
+	}
+
+	dout("processed pr_info version: %u\n", *vers);
+	return 0;
+}
+
+static int
+tcm_rbd_pr_info_seq_decode(char *str, u32 *seq)
+{
+	int rc;
+
+	BUG_ON(!seq);
+	rc = sscanf(str, "0x%08x", seq);
+	if (rc != 1) {
+		pr_err("failed to decode PR info seqnum in: %s\n", str);
+		return -EINVAL;
+	}
+
+	dout("processed pr_info seqnum: %u\n", *seq);
+	return 0;
+}
+
+static int
+tcm_rbd_pr_info_gen_decode(char *str, u32 *gen)
+{
+	int rc;
+
+	BUG_ON(!gen);
+	rc = sscanf(str, "0x%08x", gen);
+	if (rc != 1) {
+		pr_err("failed to parse PR gen: %s\n", str);
+		return -EINVAL;
+	}
+	dout("processed pr_info generation: %s\n", str);
+	return 0;
+}
+
+static int
+tcm_rbd_pr_info_num_regs_decode(char *str, u32 *num_regs)
+{
+	int rc;
+
+	BUG_ON(!num_regs);
+	rc = sscanf(str, "0x%08x", num_regs);
+	if (rc != 1) {
+		pr_err("failed to parse PR num regs: %s\n", str);
+		return -EINVAL;
+	}
+	dout("processed pr_info num_regs: %s\n", str);
+	return 0;
+}
+
+static int
+tcm_rbd_pr_info_reg_decode(char *str, struct tcm_rbd_pr_reg **_reg)
+{
+	struct tcm_rbd_pr_reg *reg;
+	int rc;
+
+	BUG_ON(!_reg);
+	reg = kzalloc(sizeof(*reg), GFP_KERNEL);
+	if (!reg) {
+		return -ENOMEM;
+	}
+
+	/* registration key and I-T nexus with space separator */
+	rc = sscanf(str, "0x%016llx %" __stringify(TCM_RBD_PR_IT_NEXUS_MAXLEN)
+		    "s", &reg->key, reg->it_nexus);
+	if (rc != 2) {
+		pr_err("failed to parse PR reg: %s\n", str);
+		kfree(reg);
+		return -EINVAL;
+	}
+
+	dout("processed pr_info reg: %s\n", str);
+	*_reg = reg;
+	return 0;
+}
+
+static int
+tcm_rbd_pr_info_decode(char *pr_xattr,
+		       int pr_xattr_len,
+		       struct tcm_rbd_pr_info **_pr_info)
+{
+	struct tcm_rbd_pr_info *pr_info;
+	int rc;
+	int field;
+	int i;
+	char *p;
+	char *str;
+	char *end;
+
+	BUG_ON(!_pr_info);
+
+	if (!pr_xattr_len) {
+		pr_err("zero length PR xattr\n");
+		return -EINVAL;
+	}
+
+	dout("decoding PR xattr: %s\n", pr_xattr);
+
+	pr_info = kzalloc(sizeof(*pr_info), GFP_KERNEL);
+	if (!pr_info) {
+		return -ENOMEM;
+	}
+
+	INIT_LIST_HEAD(&pr_info->regs);
+
+	p = pr_xattr;
+	end = pr_xattr + pr_xattr_len;
+	field = 0;
+	i = 0;
+	/*
+	 * '\n' separator between header fields and each reg entry.
+	 * reg subfields are further separated by ' '.
+	 */
+	for (str = strsep(&p, "\n"); str && *str != '\0' && (p < end);
+						str = strsep(&p, "\n")) {
+
+		if (field == TCM_RBD_PR_INFO_XATTR_FIELD_VER) {
+			rc = tcm_rbd_pr_info_vers_decode(str, &pr_info->vers);
+			if (rc < 0) {
+				goto err_info_free;
+			}
+		} else if (field == TCM_RBD_PR_INFO_XATTR_FIELD_SEQ) {
+			rc = tcm_rbd_pr_info_seq_decode(str, &pr_info->seq);
+			if (rc < 0) {
+				goto err_info_free;
+			}
+		} else if (field == TCM_RBD_PR_INFO_XATTR_FIELD_GEN) {
+			rc = tcm_rbd_pr_info_gen_decode(str, &pr_info->gen);
+			if (rc < 0) {
+				goto err_info_free;
+			}
+		} else if (field == TCM_RBD_PR_INFO_XATTR_FIELD_NUM_REGS) {
+			rc = tcm_rbd_pr_info_num_regs_decode(str,
+							    &pr_info->num_regs);
+			if (rc < 0) {
+				goto err_info_free;
+			}
+		} else if (field >= TCM_RBD_PR_INFO_XATTR_FIELD_REGS_START) {
+			struct tcm_rbd_pr_reg *reg;
+			rc = tcm_rbd_pr_info_reg_decode(str, &reg);
+			if (rc < 0) {
+				goto err_info_free;
+			}
+			list_add_tail(&reg->regs_node, &pr_info->regs);
+			i++;
+		} else {
+			dout("skipping parsing of field %d\n", field);
+		}
+
+		field++;
+	}
+
+	if (field <= TCM_RBD_PR_INFO_XATTR_FIELD_NUM_REGS) {
+		pr_err("pr_info missing basic fields, stopped at %d\n", field);
+		rc = -EINVAL;
+		goto err_info_free;
+	}
+
+	if (i != pr_info->num_regs) {
+		pr_err("processed %d registrations, expected %d\n",
+			 i, pr_info->num_regs);
+		rc = -EINVAL;
+		goto err_info_free;
+	}
+
+	dout("successfully processed all PR data\n");
+	*_pr_info = pr_info;
+
+	return 0;
+
+err_info_free:
+	tcm_rbd_pr_info_free(pr_info);
+	return rc;
+}
+
+static int
+tcm_rbd_pr_info_vers_seq_gen_encode(char *buf, size_t buf_remain, u32 vers,
+				    u32 seq, u32 gen)
+{
+	int rc;
+
+	rc = snprintf(buf, buf_remain, "0x%08x\n0x%08x\n0x%08x\n",
+		      vers, seq, gen);
+	if ((rc < 0) || (rc >= buf_remain)) {
+		pr_err("failed to encode PR vers, seq and gen\n");
+		return -EINVAL;
+	}
+
+	return rc;
+}
+
+static int
+tcm_rbd_pr_info_num_regs_encode(char *buf, size_t buf_remain, u32 num_regs)
+{
+	int rc;
+
+	rc = snprintf(buf, buf_remain, "0x%08x\n", num_regs);
+	if ((rc < 0) || (rc >= buf_remain)) {
+		pr_err("failed to encode PR num_regs\n");
+		return -EINVAL;
+	}
+
+	return rc;
+}
+
+static int
+tcm_rbd_pr_info_reg_encode(char *buf, size_t buf_remain,
+			   struct tcm_rbd_pr_reg *reg)
+{
+	int rc;
+
+	rc = snprintf(buf, buf_remain, "0x%016llx %s\n", reg->key, reg->it_nexus);
+	if ((rc < 0) || (rc >= buf_remain)) {
+		pr_err("failed to encode PR registration\n");
+		return -EINVAL;
+	}
+
+	return rc;
+}
+
+static int
+tcm_rbd_pr_info_encode(struct tcm_rbd_pr_info *pr_info,
+		       char **_pr_xattr,
+		       int *pr_xattr_len)
+{
+	struct tcm_rbd_pr_reg *reg;
+	char *pr_xattr;
+	char *p;
+	size_t buf_remain;
+	int rc;
+	int i;
+
+	if (pr_info->vers != TCM_RBD_PR_INFO_XATTR_VERS) {
+		pr_err("unsupported PR info version: %u\n", pr_info->vers);
+		return -EINVAL;
+	}
+
+	buf_remain = TCM_RBD_PR_INFO_XATTR_ENCODED_MAXLEN(pr_info->num_regs);
+	if (buf_remain > TCM_RBD_PR_INFO_XATTR_MAX_SIZE) {
+		pr_err("PR info too large for encoding: %zd\n", buf_remain);
+		return -EINVAL;
+	}
+
+	dout("encoding PR info: vers=%u, seq=%u, gen=%u, num regs=%u into %zd "
+	     "bytes\n", pr_info->vers, pr_info->seq, pr_info->gen,
+	     pr_info->num_regs, buf_remain);
+
+	pr_xattr = kmalloc(buf_remain, GFP_KERNEL);
+	if (!pr_xattr) {
+		return -ENOMEM;
+	}
+
+	p = pr_xattr;
+	rc = tcm_rbd_pr_info_vers_seq_gen_encode(p, buf_remain, pr_info->vers,
+						 pr_info->seq, pr_info->gen);
+	if (rc < 0) {
+		rc = -EINVAL;
+		goto err_xattr_free;
+	}
+
+	p += rc;
+	buf_remain -= rc;
+
+	rc = tcm_rbd_pr_info_num_regs_encode(p, buf_remain, pr_info->num_regs);
+	if (rc < 0) {
+		rc = -EINVAL;
+		goto err_xattr_free;
+	}
+
+	p += rc;
+	buf_remain -= rc;
+
+	i = 0;
+	list_for_each_entry(reg, &pr_info->regs, regs_node) {
+		rc = tcm_rbd_pr_info_reg_encode(p, buf_remain, reg);
+		 if (rc < 0) {
+			rc = -EINVAL;
+			goto err_xattr_free;
+		}
+
+		p += rc;
+		buf_remain -= rc;
+		i++;
+	}
+
+	if (i != pr_info->num_regs) {
+		pr_err("mismatch between PR num_regs and list entries!\n");
+		rc = -EINVAL;
+		goto err_xattr_free;
+	}
+
+	*_pr_xattr = pr_xattr;
+	/* +1 to include null term */
+	*pr_xattr_len = (p - pr_xattr) + 1;
+
+	dout("successfully encoded all %d PR regs into %d bytes: %s\n",
+	     pr_info->num_regs, *pr_xattr_len, pr_xattr);
+
+	return 0;
+
+err_xattr_free:
+	kfree(pr_xattr);
+	return rc;
+}
+
+static int
+tcm_rbd_pr_info_mock_empty(struct tcm_rbd_dev *tcm_rbd_dev,
+			   struct tcm_rbd_pr_info **_pr_info)
+{
+	struct tcm_rbd_pr_info *pr_info;
+
+	pr_info = kzalloc(sizeof(*pr_info), GFP_KERNEL);
+	if (!pr_info) {
+		return -ENOMEM;
+	}
+
+	pr_info->vers = TCM_RBD_PR_INFO_XATTR_VERS;
+	INIT_LIST_HEAD(&pr_info->regs);
+
+	*_pr_info = pr_info;
+	dout("successfully initialized mock PR info\n");
+
+	return 0;
+}
+
+static int
+tcm_rbd_pr_info_init(struct tcm_rbd_dev *tcm_rbd_dev,
+		     struct tcm_rbd_pr_info **_pr_info,
+		     char **_pr_xattr, int *_pr_xattr_len)
+
+{
+	struct tcm_rbd_pr_info *pr_info;
+	char *pr_xattr = NULL;
+	int pr_xattr_len = 0;
+	int rc;
+
+	pr_info = kzalloc(sizeof(*pr_info), GFP_KERNEL);
+	if (!pr_info) {
+		return -ENOMEM;
+	}
+
+	pr_info->vers = TCM_RBD_PR_INFO_XATTR_VERS;
+	INIT_LIST_HEAD(&pr_info->regs);
+	pr_info->seq = 1;
+
+	rc = tcm_rbd_pr_info_encode(pr_info, &pr_xattr,
+				    &pr_xattr_len);
+	if (rc) {
+		pr_warn("failed to encode PR xattr: %d\n", rc);
+		goto err_info_free;
+	}
+
+	rc = rbd_dev_setxattr(tcm_rbd_dev->rbd_dev,
+			      TCM_RBD_PR_INFO_XATTR_KEY,
+			      pr_xattr, pr_xattr_len);
+	if (rc) {
+		pr_warn("failed to set PR xattr: %d\n", rc);
+		goto err_xattr_free;
+	}
+
+	*_pr_info = pr_info;
+	if (_pr_xattr) {
+		BUG_ON(!_pr_xattr_len);
+		*_pr_xattr = pr_xattr;
+		*_pr_xattr_len = pr_xattr_len;
+	} else {
+		kfree(pr_xattr);
+	}
+	dout("successfully initialized PR info\n");
+
+	return 0;
+
+err_xattr_free:
+	kfree(pr_xattr);
+err_info_free:
+	kfree(pr_info);
+	return rc;
+}
+
+static int
+tcm_rbd_pr_info_get(struct tcm_rbd_dev *tcm_rbd_dev,
+		    struct tcm_rbd_pr_info **_pr_info,
+		    char **_pr_xattr, int *_pr_xattr_len)
+{
+	int rc;
+	char *pr_xattr = NULL;
+	char *dup_xattr = NULL;
+	int pr_xattr_len = 0;
+	struct tcm_rbd_pr_info *pr_info = NULL;
+
+	BUG_ON(!_pr_info);
+
+	rc = rbd_dev_getxattr(tcm_rbd_dev->rbd_dev, TCM_RBD_PR_INFO_XATTR_KEY,
+			      TCM_RBD_PR_INFO_XATTR_MAX_SIZE,
+			      (void **)&pr_xattr, &pr_xattr_len);
+	if (rc) {
+		if (rc != -ENODATA)
+			pr_warn("failed to obtain PR xattr: %d\n", rc);
+		return rc;
+	}
+	if (_pr_xattr) {
+		/* dup before decode, which trashes @pr_xattr */
+		dup_xattr = kstrdup(pr_xattr, GFP_KERNEL);
+		if (!dup_xattr) {
+			rc = -ENOMEM;
+			goto err_xattr_free;
+		}
+	}
+
+	rc = tcm_rbd_pr_info_decode(pr_xattr, pr_xattr_len, &pr_info);
+	if (rc) {
+		pr_warn("failed to decode PR xattr: %d\n", rc);
+		goto err_dup_xattr_free;
+	}
+
+	if (_pr_xattr) {
+		BUG_ON(!_pr_xattr_len);
+		*_pr_xattr = dup_xattr;
+		*_pr_xattr_len = pr_xattr_len;
+	}
+	kfree(pr_xattr);
+	*_pr_info = pr_info;
+	dout("successfully obtained PR info\n");
+
+	return 0;
+
+err_dup_xattr_free:
+	kfree(dup_xattr);
+err_xattr_free:
+	kfree(pr_xattr);
+	return rc;
+}
+
+static int
+tcm_rbd_pr_info_replace(struct tcm_rbd_dev *tcm_rbd_dev,
+			char *pr_xattr_old, int pr_xattr_len_old,
+			struct tcm_rbd_pr_info *pr_info_new)
+{
+	int rc;
+	char *pr_xattr_new = NULL;
+	int pr_xattr_len_new = 0;
+
+	BUG_ON(!pr_xattr_old || !pr_info_new);
+
+	/* bump seqnum prior to xattr write. Not rolled back on failure */
+	pr_info_new->seq++;
+	rc = tcm_rbd_pr_info_encode(pr_info_new, &pr_xattr_new,
+				    &pr_xattr_len_new);
+	if (rc) {
+		pr_warn("failed to encode PR xattr: %d\n", rc);
+		return rc;
+	}
+
+	if (pr_xattr_len_new > TCM_RBD_PR_INFO_XATTR_MAX_SIZE) {
+		pr_err("unable to store oversize (%d) PR info: %s\n",
+		       pr_xattr_len_new, pr_xattr_new);
+		rc = -E2BIG;
+		goto err_xattr_new_free;
+	}
+
+	rc = rbd_dev_cmpsetxattr(tcm_rbd_dev->rbd_dev,
+				 TCM_RBD_PR_INFO_XATTR_KEY,
+				 pr_xattr_old, pr_xattr_len_old,
+				 pr_xattr_new, pr_xattr_len_new);
+	if (rc) {
+		pr_warn("failed to set PR xattr: %d\n", rc);
+		goto err_xattr_new_free;
+	}
+
+	dout("successfully replaced PR info\n");
+	rc = 0;
+err_xattr_new_free:
+	kfree(pr_xattr_new);
+
+	return 0;
+}
+
+static sense_reason_t
+tcm_rbd_execute_pr_read_keys(struct se_cmd *cmd, unsigned char *buf,
+			     u32 buf_len)
+{
+	struct se_device *dev = cmd->se_dev;
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	struct tcm_rbd_pr_info *pr_info = NULL;
+	struct tcm_rbd_pr_reg *reg;
+	u32 add_len = 0, off = 8;
+	int rc;
+
+	BUG_ON(buf_len < 8);
+
+	dout("getting pr_info for buf: %p, %u\n", buf, buf_len);
+
+	rc = tcm_rbd_pr_info_get(tcm_rbd_dev, &pr_info, NULL, NULL);
+	if (rc == -ENODATA) {
+		dout("PR info not present for read, mocking empty\n");
+		rc = tcm_rbd_pr_info_mock_empty(tcm_rbd_dev, &pr_info);
+	}
+	if (rc < 0)
+		return TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
+
+	dout("packing read_keys response buf: %p, %u\n", buf, buf_len);
+
+	buf[0] = ((pr_info->gen >> 24) & 0xff);
+	buf[1] = ((pr_info->gen >> 16) & 0xff);
+	buf[2] = ((pr_info->gen >> 8) & 0xff);
+	buf[3] = (pr_info->gen & 0xff);
+
+	dout("packed gen %u in read_keys response\n", pr_info->gen);
+
+	list_for_each_entry(reg, &pr_info->regs, regs_node) {
+		/*
+		 * Check for overflow of 8byte PRI READ_KEYS payload and
+		 * next reservation key list descriptor.
+		 */
+		if ((add_len + 8) > (buf_len - 8))
+			break;
+
+		buf[off++] = ((reg->key >> 56) & 0xff);
+		buf[off++] = ((reg->key >> 48) & 0xff);
+		buf[off++] = ((reg->key >> 40) & 0xff);
+		buf[off++] = ((reg->key >> 32) & 0xff);
+		buf[off++] = ((reg->key >> 24) & 0xff);
+		buf[off++] = ((reg->key >> 16) & 0xff);
+		buf[off++] = ((reg->key >> 8) & 0xff);
+		buf[off++] = (reg->key & 0xff);
+		dout("packed key 0x%llx in read_keys response\n", reg->key);
+
+		add_len += 8;
+	}
+
+	buf[4] = ((add_len >> 24) & 0xff);
+	buf[5] = ((add_len >> 16) & 0xff);
+	buf[6] = ((add_len >> 8) & 0xff);
+	buf[7] = (add_len & 0xff);
+	dout("packed len %u in read_keys response\n", add_len);
+	tcm_rbd_pr_info_free(pr_info);
+
+	return TCM_NO_SENSE;
+}
+
+/* handle PR registration for a currently unregistered I_T nexus */
+static sense_reason_t
+tcm_rbd_execute_pr_register_new(struct tcm_rbd_pr_info *pr_info, u64 old_key,
+				u64 new_key, char *it_nexus,
+				bool ignore_existing)
+{
+	sense_reason_t ret;
+	int rc;
+
+	dout("PR registration for unregistered nexus: %s\n", it_nexus);
+
+	if (!ignore_existing && (old_key != 0)) {
+		ret = TCM_RESERVATION_CONFLICT;
+		goto out;
+	}
+	if (new_key == 0) {
+		ret = TCM_NO_SENSE;
+		goto out;
+	}
+	/*
+	 * Register the I_T nexus on which the command was received with
+	 * the value specified in the SERVICE ACTION RESERVATION KEY
+	 * field.
+	 */
+	rc = tcm_rbd_pr_info_append_reg(pr_info, it_nexus, new_key);
+	if (rc < 0) {
+		ret = TCM_OUT_OF_RESOURCES;
+		goto out;
+	}
+
+	ret = TCM_NO_SENSE;
+out:
+	return ret;
+}
+
+/* handle PR registration for a currently registered I_T nexus */
+static sense_reason_t
+tcm_rbd_execute_pr_register_existing(struct tcm_rbd_pr_info *pr_info,
+				     u64 old_key, u64 new_key, char *it_nexus,
+				     struct tcm_rbd_pr_reg *existing_reg,
+				     bool ignore_existing)
+{
+	sense_reason_t ret;
+	int rc;
+
+	dout("PR registration for registered nexus: %s\n", it_nexus);
+
+	if (!ignore_existing && (old_key != existing_reg->key)) {
+		ret = TCM_RESERVATION_CONFLICT;
+		goto out;
+	}
+
+	if (new_key == 0) {
+		/* unregister */
+		rc = tcm_rbd_pr_info_unregister_reg(pr_info,
+						    existing_reg);
+		if (rc < 0) {
+			ret = TCM_OUT_OF_RESOURCES;
+			goto out;
+		}
+	} else {
+		/* update key */
+		existing_reg->key = new_key;
+	}
+
+	ret = TCM_NO_SENSE;
+out:
+	return ret;
+}
+
+static sense_reason_t
+tcm_rbd_execute_pr_register(struct se_cmd *cmd, u64 old_key,
+			    u64 new_key, bool aptpl, bool all_tg_pt,
+			    bool spec_i_pt, bool ignore_existing)
+{
+	struct se_device *dev = cmd->se_dev;
+	struct tcm_rbd_dev *tcm_rbd_dev = TCM_RBD_DEV(dev);
+	char nexus_buf[TCM_RBD_PR_IT_NEXUS_MAXLEN];
+	struct tcm_rbd_pr_info *pr_info;
+	struct tcm_rbd_pr_reg *reg;
+	struct tcm_rbd_pr_reg *existing_reg;
+	char *pr_xattr;
+	int pr_xattr_len;
+	int rc;
+	sense_reason_t ret;
+	int retries = 0;
+
+	if (!aptpl) {
+		/*
+		 * Currently unsupported by block layer API (hch):
+		 * reservations not persistent through a power loss are
+		 * basically useless, so I decided to force them on in the API.
+		 */
+		pr_warn("PR register with aptpl unset. Treating as aptpl=1\n");
+		aptpl = true;
+	}
+
+	if (all_tg_pt || spec_i_pt) {
+		/* TODO: Currently unsupported by block layer API. */
+		pr_err("failing PR register with all_tg_pt=%d spec_i_pt=%d\n",
+			 all_tg_pt, spec_i_pt);
+		return TCM_INVALID_CDB_FIELD;
+	}
+
+	rc = tcm_rbd_gen_it_nexus(cmd->se_sess, nexus_buf,
+				  ARRAY_SIZE(nexus_buf));
+	if (rc < 0)
+		return TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
+
+	dout("generated nexus: %s\n", nexus_buf);
+
+retry:
+	pr_info = NULL;
+	pr_xattr = NULL;
+	pr_xattr_len = 0;
+	rc = tcm_rbd_pr_info_get(tcm_rbd_dev, &pr_info, &pr_xattr,
+				 &pr_xattr_len);
+	if ((rc == -ENODATA) && (retries == 0)) {
+		pr_warn("PR info not present, initializing\n");
+		rc = tcm_rbd_pr_info_init(tcm_rbd_dev, &pr_info, &pr_xattr,
+					  &pr_xattr_len);
+	}
+	if (rc < 0) {
+		pr_err("failed to obtain PR info\n");
+		return TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
+	}
+
+	/* check for an existing registration */
+	existing_reg = NULL;
+	list_for_each_entry(reg, &pr_info->regs, regs_node) {
+		if (!strcmp(reg->it_nexus, nexus_buf)) {
+			dout("found existing PR reg for %s\n", nexus_buf);
+			existing_reg = reg;
+			break;
+		}
+	}
+
+	if (!existing_reg) {
+		ret = tcm_rbd_execute_pr_register_new(pr_info, old_key, new_key,
+						      nexus_buf,
+						      ignore_existing);
+	} else {
+		ret = tcm_rbd_execute_pr_register_existing(pr_info, old_key,
+							   new_key, nexus_buf,
+							   existing_reg,
+							   ignore_existing);
+	}
+	if (ret) {
+		goto err_out;
+	}
+
+	/*
+	 * The Persistent Reservations Generation (PRGENERATION) field shall
+	 * contain the value of a 32-bit wrapping counter that the device server
+	 * shall update (e.g., increment) during the processing of any
+	 * PERSISTENT RESERVE OUT command as described in table 216 (see
+	 * 6.16.2). The PRgeneration value shall not be updated by a PERSISTENT
+	 * RESERVE IN command or by a PERSISTENT RESERVE OUT command that is
+	 * terminated due to an error or reservation conflict.
+	 */
+	pr_info->gen++;
+	/*
+	 * TODO:
+	 * Regardless of the APTPL bit value the PRgeneration value shall be set
+	 * to zero by a power on.
+	 */
+
+	rc = tcm_rbd_pr_info_replace(tcm_rbd_dev, pr_xattr, pr_xattr_len,
+				     pr_info);
+	if (rc == -ECANCELED) {
+		char *pr_xattr_changed = NULL;
+		int pr_xattr_changed_len = 0;
+		/* PR info has changed since we read it */
+		rc = rbd_dev_getxattr(tcm_rbd_dev->rbd_dev,
+				      TCM_RBD_PR_INFO_XATTR_KEY,
+				      TCM_RBD_PR_INFO_XATTR_MAX_SIZE,
+				      (void **)&pr_xattr_changed,
+				      &pr_xattr_changed_len);
+		pr_warn("atomic PR info update failed due to parallel "
+			"change, expected(%d) %s, now(%d) %s\n",
+			pr_xattr_len, pr_xattr, pr_xattr_changed_len,
+			pr_xattr_changed);
+		retries++;
+		if (retries <= TCM_RBD_PR_REG_MAX_RETRIES) {
+			tcm_rbd_pr_info_free(pr_info);
+			kfree(pr_xattr);
+			goto retry;
+		}
+	}
+	if (rc < 0) {
+		pr_err("atomic PR info update failed: %d\n", rc);
+		ret = TCM_LOGICAL_UNIT_COMMUNICATION_FAILURE;
+		goto err_out;
+	}
+
+	ret = TCM_NO_SENSE;
+err_out:
+	tcm_rbd_pr_info_free(pr_info);
+	kfree(pr_xattr);
+	return ret;
+}
+
+static struct target_pr_ops tcm_rbd_pr_ops = {
+	.pr_read_keys		= tcm_rbd_execute_pr_read_keys,
+
+	.pr_register		= tcm_rbd_execute_pr_register,
+};
+
 static const struct target_backend_ops tcm_rbd_ops = {
 	.name				= "rbd",
 	.inquiry_prod			= "RBD",
@@ -698,6 +1608,7 @@ static const struct target_backend_ops t
 	.get_io_min			= tcm_rbd_get_io_min,
 	.get_io_opt			= tcm_rbd_get_io_opt,
 	.get_write_cache		= tcm_rbd_get_write_cache,
+	.pr_ops				= &tcm_rbd_pr_ops,
 	.tb_dev_attrib_attrs		= sbc_attrib_attrs,
 };