Blob Blame History Raw
From: Ilya Dryomov <idryomov@gmail.com>
Date: Mon, 19 Jun 2017 12:18:05 +0200
Subject: libceph: respect RADOS_BACKOFF backoffs
Git-commit: a02a946dfe9633d7e0202359836f6b5217a62824
Patch-mainline: v4.13-rc1
References: FATE#324714

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Acked-by: Luis Henriques <lhenriques@suse.com>
---
 include/linux/ceph/ceph_fs.h    |   1 +
 include/linux/ceph/osd_client.h |  45 +++
 include/linux/ceph/osdmap.h     |   1 +
 include/linux/ceph/rados.h      |   6 +
 net/ceph/ceph_common.c          |   1 +
 net/ceph/debugfs.c              |  74 +++++
 net/ceph/osd_client.c           | 593 ++++++++++++++++++++++++++++++++++++++++
 net/ceph/osdmap.c               |  16 ++
 8 files changed, 737 insertions(+)

diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index ad078ebe25d6..edf5b04b918a 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -147,6 +147,7 @@ struct ceph_dir_layout {
 #define CEPH_MSG_OSD_OP                 42
 #define CEPH_MSG_OSD_OPREPLY            43
 #define CEPH_MSG_WATCH_NOTIFY           44
+#define CEPH_MSG_OSD_BACKOFF            61
 
 
 /* watch-notify operations */
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 62c672bcbb31..c6d96a5f46fd 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -1,6 +1,7 @@
 #ifndef _FS_CEPH_OSD_CLIENT_H
 #define _FS_CEPH_OSD_CLIENT_H
 
+#include <linux/bitrev.h>
 #include <linux/completion.h>
 #include <linux/kref.h>
 #include <linux/mempool.h>
@@ -36,6 +37,8 @@ struct ceph_osd {
 	struct ceph_connection o_con;
 	struct rb_root o_requests;
 	struct rb_root o_linger_requests;
+	struct rb_root o_backoff_mappings;
+	struct rb_root o_backoffs_by_id;
 	struct list_head o_osd_lru;
 	struct ceph_auth_handshake o_auth;
 	unsigned long lru_ttl;
@@ -275,6 +278,48 @@ struct ceph_watch_item {
 	struct ceph_entity_addr addr;
 };
 
+struct ceph_spg_mapping {
+	struct rb_node node;
+	struct ceph_spg spgid;
+
+	struct rb_root backoffs;
+};
+
+struct ceph_hobject_id {
+	void *key;
+	size_t key_len;
+	void *oid;
+	size_t oid_len;
+	u64 snapid;
+	u32 hash;
+	u8 is_max;
+	void *nspace;
+	size_t nspace_len;
+	s64 pool;
+
+	/* cache */
+	u32 hash_reverse_bits;
+};
+
+static inline void ceph_hoid_build_hash_cache(struct ceph_hobject_id *hoid)
+{
+	hoid->hash_reverse_bits = bitrev32(hoid->hash);
+}
+
+/*
+ * PG-wide backoff: [begin, end)
+ * per-object backoff: begin == end
+ */
+struct ceph_osd_backoff {
+	struct rb_node spg_node;
+	struct rb_node id_node;
+
+	struct ceph_spg spgid;
+	u64 id;
+	struct ceph_hobject_id *begin;
+	struct ceph_hobject_id *end;
+};
+
 #define CEPH_LINGER_ID_START	0xffff000000000000ULL
 
 struct ceph_osd_client {
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index 060d059acbf8..fe6d189bdd30 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -32,6 +32,7 @@ struct ceph_spg {
 };
 
 int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs);
+int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs);
 
 #define CEPH_POOL_FLAG_HASHPSPOOL	(1ULL << 0) /* hash pg seed and pool id
 						       together */
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 5d0018782d50..385db08bb8b2 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -439,6 +439,12 @@ enum {
 
 const char *ceph_osd_watch_op_name(int o);
 
+enum {
+	CEPH_OSD_BACKOFF_OP_BLOCK = 1,
+	CEPH_OSD_BACKOFF_OP_ACK_BLOCK = 2,
+	CEPH_OSD_BACKOFF_OP_UNBLOCK = 3,
+};
+
 /*
  * an individual object operation.  each may be accompanied by some data
  * payload
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 47e94b560ba0..3d265c5cb6d0 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -85,6 +85,7 @@ const char *ceph_msg_type_name(int type)
 	case CEPH_MSG_OSD_OP: return "osd_op";
 	case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
 	case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
+	case CEPH_MSG_OSD_BACKOFF: return "osd_backoff";
 	default: return "unknown";
 	}
 }
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index c0089f8ccaeb..017f15c575f8 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -243,6 +243,73 @@ static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd)
 	mutex_unlock(&osd->lock);
 }
 
+static void dump_snapid(struct seq_file *s, u64 snapid)
+{
+	if (snapid == CEPH_NOSNAP)
+		seq_puts(s, "head");
+	else if (snapid == CEPH_SNAPDIR)
+		seq_puts(s, "snapdir");
+	else
+		seq_printf(s, "%llx", snapid);
+}
+
+static void dump_name_escaped(struct seq_file *s, unsigned char *name,
+			      size_t len)
+{
+	size_t i;
+
+	for (i = 0; i < len; i++) {
+		if (name[i] == '%' || name[i] == ':' || name[i] == '/' ||
+		    name[i] < 32 || name[i] >= 127) {
+			seq_printf(s, "%%%02x", name[i]);
+		} else {
+			seq_putc(s, name[i]);
+		}
+	}
+}
+
+static void dump_hoid(struct seq_file *s, const struct ceph_hobject_id *hoid)
+{
+	if (hoid->snapid == 0 && hoid->hash == 0 && !hoid->is_max &&
+	    hoid->pool == S64_MIN) {
+		seq_puts(s, "MIN");
+		return;
+	}
+	if (hoid->is_max) {
+		seq_puts(s, "MAX");
+		return;
+	}
+	seq_printf(s, "%lld:%08x:", hoid->pool, hoid->hash_reverse_bits);
+	dump_name_escaped(s, hoid->nspace, hoid->nspace_len);
+	seq_putc(s, ':');
+	dump_name_escaped(s, hoid->key, hoid->key_len);
+	seq_putc(s, ':');
+	dump_name_escaped(s, hoid->oid, hoid->oid_len);
+	seq_putc(s, ':');
+	dump_snapid(s, hoid->snapid);
+}
+
+static void dump_backoffs(struct seq_file *s, struct ceph_osd *osd)
+{
+	struct rb_node *n;
+
+	mutex_lock(&osd->lock);
+	for (n = rb_first(&osd->o_backoffs_by_id); n; n = rb_next(n)) {
+		struct ceph_osd_backoff *backoff =
+		    rb_entry(n, struct ceph_osd_backoff, id_node);
+
+		seq_printf(s, "osd%d\t", osd->o_osd);
+		dump_spgid(s, &backoff->spgid);
+		seq_printf(s, "\t%llu\t", backoff->id);
+		dump_hoid(s, backoff->begin);
+		seq_putc(s, '\t');
+		dump_hoid(s, backoff->end);
+		seq_putc(s, '\n');
+	}
+
+	mutex_unlock(&osd->lock);
+}
+
 static int osdc_show(struct seq_file *s, void *pp)
 {
 	struct ceph_client *client = s->private;
@@ -268,6 +335,13 @@ static int osdc_show(struct seq_file *s, void *pp)
 	}
 	dump_linger_requests(s, &osdc->homeless_osd);
 
+	seq_puts(s, "BACKOFFS\n");
+	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+		dump_backoffs(s, osd);
+	}
+
 	up_read(&osdc->lock);
 	return 0;
 }
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 620aa43a6a0a..86a9737d8e3f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -50,6 +50,7 @@ static void link_linger(struct ceph_osd *osd,
 			struct ceph_osd_linger_request *lreq);
 static void unlink_linger(struct ceph_osd *osd,
 			  struct ceph_osd_linger_request *lreq);
+static void clear_backoffs(struct ceph_osd *osd);
 
 #if 1
 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
@@ -1019,6 +1020,8 @@ static void osd_init(struct ceph_osd *osd)
 	RB_CLEAR_NODE(&osd->o_node);
 	osd->o_requests = RB_ROOT;
 	osd->o_linger_requests = RB_ROOT;
+	osd->o_backoff_mappings = RB_ROOT;
+	osd->o_backoffs_by_id = RB_ROOT;
 	INIT_LIST_HEAD(&osd->o_osd_lru);
 	INIT_LIST_HEAD(&osd->o_keepalive_item);
 	osd->o_incarnation = 1;
@@ -1030,6 +1033,8 @@ static void osd_cleanup(struct ceph_osd *osd)
 	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
+	WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
+	WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
 	WARN_ON(!list_empty(&osd->o_osd_lru));
 	WARN_ON(!list_empty(&osd->o_keepalive_item));
 
@@ -1150,6 +1155,7 @@ static void close_osd(struct ceph_osd *osd)
 		unlink_linger(osd, lreq);
 		link_linger(&osdc->homeless_osd, lreq);
 	}
+	clear_backoffs(osd);
 
 	__remove_osd_from_lru(osd);
 	erase_osd(&osdc->osds, osd);
@@ -1431,6 +1437,328 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
 	return ct_res;
 }
 
+static struct ceph_spg_mapping *alloc_spg_mapping(void)
+{
+	struct ceph_spg_mapping *spg;
+
+	spg = kmalloc(sizeof(*spg), GFP_NOIO);
+	if (!spg)
+		return NULL;
+
+	RB_CLEAR_NODE(&spg->node);
+	spg->backoffs = RB_ROOT;
+	return spg;
+}
+
+static void free_spg_mapping(struct ceph_spg_mapping *spg)
+{
+	WARN_ON(!RB_EMPTY_NODE(&spg->node));
+	WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
+
+	kfree(spg);
+}
+
+/*
+ * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
+ * ceph_pg_mapping.  Used to track OSD backoffs -- a backoff [range] is
+ * defined only within a specific spgid; it does not pass anything to
+ * children on split, or to another primary.
+ */
+DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
+		 RB_BYPTR, const struct ceph_spg *, node)
+
+static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
+{
+	return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
+}
+
+static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
+				   void **pkey, size_t *pkey_len)
+{
+	if (hoid->key_len) {
+		*pkey = hoid->key;
+		*pkey_len = hoid->key_len;
+	} else {
+		*pkey = hoid->oid;
+		*pkey_len = hoid->oid_len;
+	}
+}
+
+static int compare_names(const void *name1, size_t name1_len,
+			 const void *name2, size_t name2_len)
+{
+	int ret;
+
+	ret = memcmp(name1, name2, min(name1_len, name2_len));
+	if (!ret) {
+		if (name1_len < name2_len)
+			ret = -1;
+		else if (name1_len > name2_len)
+			ret = 1;
+	}
+	return ret;
+}
+
+static int hoid_compare(const struct ceph_hobject_id *lhs,
+			const struct ceph_hobject_id *rhs)
+{
+	void *effective_key1, *effective_key2;
+	size_t effective_key1_len, effective_key2_len;
+	int ret;
+
+	if (lhs->is_max < rhs->is_max)
+		return -1;
+	if (lhs->is_max > rhs->is_max)
+		return 1;
+
+	if (lhs->pool < rhs->pool)
+		return -1;
+	if (lhs->pool > rhs->pool)
+		return 1;
+
+	if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
+		return -1;
+	if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
+		return 1;
+
+	ret = compare_names(lhs->nspace, lhs->nspace_len,
+			    rhs->nspace, rhs->nspace_len);
+	if (ret)
+		return ret;
+
+	hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
+	hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
+	ret = compare_names(effective_key1, effective_key1_len,
+			    effective_key2, effective_key2_len);
+	if (ret)
+		return ret;
+
+	ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
+	if (ret)
+		return ret;
+
+	if (lhs->snapid < rhs->snapid)
+		return -1;
+	if (lhs->snapid > rhs->snapid)
+		return 1;
+
+	return 0;
+}
+
+/*
+ * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
+ * compat stuff here.
+ *
+ * Assumes @hoid is zero-initialized.
+ */
+static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
+{
+	u8 struct_v;
+	u32 struct_len;
+	int ret;
+
+	ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
+				  &struct_len);
+	if (ret)
+		return ret;
+
+	if (struct_v < 4) {
+		pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
+		goto e_inval;
+	}
+
+	hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
+						GFP_NOIO);
+	if (IS_ERR(hoid->key)) {
+		ret = PTR_ERR(hoid->key);
+		hoid->key = NULL;
+		return ret;
+	}
+
+	hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
+						GFP_NOIO);
+	if (IS_ERR(hoid->oid)) {
+		ret = PTR_ERR(hoid->oid);
+		hoid->oid = NULL;
+		return ret;
+	}
+
+	ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
+	ceph_decode_32_safe(p, end, hoid->hash, e_inval);
+	ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
+
+	hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
+						   GFP_NOIO);
+	if (IS_ERR(hoid->nspace)) {
+		ret = PTR_ERR(hoid->nspace);
+		hoid->nspace = NULL;
+		return ret;
+	}
+
+	ceph_decode_64_safe(p, end, hoid->pool, e_inval);
+
+	ceph_hoid_build_hash_cache(hoid);
+	return 0;
+
+e_inval:
+	return -EINVAL;
+}
+
+static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
+{
+	return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
+	       4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
+}
+
+static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
+{
+	ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
+	ceph_encode_string(p, end, hoid->key, hoid->key_len);
+	ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
+	ceph_encode_64(p, hoid->snapid);
+	ceph_encode_32(p, hoid->hash);
+	ceph_encode_8(p, hoid->is_max);
+	ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
+	ceph_encode_64(p, hoid->pool);
+}
+
+static void free_hoid(struct ceph_hobject_id *hoid)
+{
+	if (hoid) {
+		kfree(hoid->key);
+		kfree(hoid->oid);
+		kfree(hoid->nspace);
+		kfree(hoid);
+	}
+}
+
+static struct ceph_osd_backoff *alloc_backoff(void)
+{
+	struct ceph_osd_backoff *backoff;
+
+	backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
+	if (!backoff)
+		return NULL;
+
+	RB_CLEAR_NODE(&backoff->spg_node);
+	RB_CLEAR_NODE(&backoff->id_node);
+	return backoff;
+}
+
+static void free_backoff(struct ceph_osd_backoff *backoff)
+{
+	WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
+	WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
+
+	free_hoid(backoff->begin);
+	free_hoid(backoff->end);
+	kfree(backoff);
+}
+
+/*
+ * Within a specific spgid, backoffs are managed by ->begin hoid.
+ */
+DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
+			RB_BYVAL, spg_node);
+
+static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
+					    const struct ceph_hobject_id *hoid)
+{
+	struct rb_node *n = root->rb_node;
+
+	while (n) {
+		struct ceph_osd_backoff *cur =
+		    rb_entry(n, struct ceph_osd_backoff, spg_node);
+		int cmp;
+
+		cmp = hoid_compare(hoid, cur->begin);
+		if (cmp < 0) {
+			n = n->rb_left;
+		} else if (cmp > 0) {
+			if (hoid_compare(hoid, cur->end) < 0)
+				return cur;
+
+			n = n->rb_right;
+		} else {
+			return cur;
+		}
+	}
+
+	return NULL;
+}
+
+/*
+ * Each backoff has a unique id within its OSD session.
+ */
+DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
+
+static void clear_backoffs(struct ceph_osd *osd)
+{
+	while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
+		struct ceph_spg_mapping *spg =
+		    rb_entry(rb_first(&osd->o_backoff_mappings),
+			     struct ceph_spg_mapping, node);
+
+		while (!RB_EMPTY_ROOT(&spg->backoffs)) {
+			struct ceph_osd_backoff *backoff =
+			    rb_entry(rb_first(&spg->backoffs),
+				     struct ceph_osd_backoff, spg_node);
+
+			erase_backoff(&spg->backoffs, backoff);
+			erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
+			free_backoff(backoff);
+		}
+		erase_spg_mapping(&osd->o_backoff_mappings, spg);
+		free_spg_mapping(spg);
+	}
+}
+
+/*
+ * Set up a temporary, non-owning view into @t.
+ */
+static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
+				  const struct ceph_osd_request_target *t)
+{
+	hoid->key = NULL;
+	hoid->key_len = 0;
+	hoid->oid = t->target_oid.name;
+	hoid->oid_len = t->target_oid.name_len;
+	hoid->snapid = CEPH_NOSNAP;
+	hoid->hash = t->pgid.seed;
+	hoid->is_max = false;
+	if (t->target_oloc.pool_ns) {
+		hoid->nspace = t->target_oloc.pool_ns->str;
+		hoid->nspace_len = t->target_oloc.pool_ns->len;
+	} else {
+		hoid->nspace = NULL;
+		hoid->nspace_len = 0;
+	}
+	hoid->pool = t->target_oloc.pool;
+	ceph_hoid_build_hash_cache(hoid);
+}
+
+static bool should_plug_request(struct ceph_osd_request *req)
+{
+	struct ceph_osd *osd = req->r_osd;
+	struct ceph_spg_mapping *spg;
+	struct ceph_osd_backoff *backoff;
+	struct ceph_hobject_id hoid;
+
+	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
+	if (!spg)
+		return false;
+
+	hoid_fill_from_target(&hoid, &req->r_t);
+	backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
+	if (!backoff)
+		return false;
+
+	dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
+	     __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
+	     backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
+	return true;
+}
+
 static void setup_request_data(struct ceph_osd_request *req,
 			       struct ceph_msg *msg)
 {
@@ -1707,6 +2035,10 @@ static void send_request(struct ceph_osd_request *req)
 	verify_osd_locked(osd);
 	WARN_ON(osd->o_osd != req->r_t.osd);
 
+	/* backoff? */
+	if (should_plug_request(req))
+		return;
+
 	/*
 	 * We may have a previously queued request message hanging
 	 * around.  Cancel it to avoid corrupting the msgr.
@@ -3527,6 +3859,8 @@ static void kick_osd_requests(struct ceph_osd *osd)
 {
 	struct rb_node *n;
 
+	clear_backoffs(osd);
+
 	for (n = rb_first(&osd->o_requests); n; ) {
 		struct ceph_osd_request *req =
 		    rb_entry(n, struct ceph_osd_request, r_node);
@@ -3572,6 +3906,261 @@ static void osd_fault(struct ceph_connection *con)
 	up_write(&osdc->lock);
 }
 
+struct MOSDBackoff {
+	struct ceph_spg spgid;
+	u32 map_epoch;
+	u8 op;
+	u64 id;
+	struct ceph_hobject_id *begin;
+	struct ceph_hobject_id *end;
+};
+
+static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
+{
+	void *p = msg->front.iov_base;
+	void *const end = p + msg->front.iov_len;
+	u8 struct_v;
+	u32 struct_len;
+	int ret;
+
+	ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
+	if (ret)
+		return ret;
+
+	ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
+	if (ret)
+		return ret;
+
+	ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
+	ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
+	ceph_decode_8_safe(&p, end, m->op, e_inval);
+	ceph_decode_64_safe(&p, end, m->id, e_inval);
+
+	m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
+	if (!m->begin)
+		return -ENOMEM;
+
+	ret = decode_hoid(&p, end, m->begin);
+	if (ret) {
+		free_hoid(m->begin);
+		return ret;
+	}
+
+	m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
+	if (!m->end) {
+		free_hoid(m->begin);
+		return -ENOMEM;
+	}
+
+	ret = decode_hoid(&p, end, m->end);
+	if (ret) {
+		free_hoid(m->begin);
+		free_hoid(m->end);
+		return ret;
+	}
+
+	return 0;
+
+e_inval:
+	return -EINVAL;
+}
+
+static struct ceph_msg *create_backoff_message(
+				const struct ceph_osd_backoff *backoff,
+				u32 map_epoch)
+{
+	struct ceph_msg *msg;
+	void *p, *end;
+	int msg_size;
+
+	msg_size = CEPH_ENCODING_START_BLK_LEN +
+			CEPH_PGID_ENCODING_LEN + 1; /* spgid */
+	msg_size += 4 + 1 + 8; /* map_epoch, op, id */
+	msg_size += CEPH_ENCODING_START_BLK_LEN +
+			hoid_encoding_size(backoff->begin);
+	msg_size += CEPH_ENCODING_START_BLK_LEN +
+			hoid_encoding_size(backoff->end);
+
+	msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
+	if (!msg)
+		return NULL;
+
+	p = msg->front.iov_base;
+	end = p + msg->front_alloc_len;
+
+	encode_spgid(&p, &backoff->spgid);
+	ceph_encode_32(&p, map_epoch);
+	ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
+	ceph_encode_64(&p, backoff->id);
+	encode_hoid(&p, end, backoff->begin);
+	encode_hoid(&p, end, backoff->end);
+	BUG_ON(p != end);
+
+	msg->front.iov_len = p - msg->front.iov_base;
+	msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
+	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+
+	return msg;
+}
+
+static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
+{
+	struct ceph_spg_mapping *spg;
+	struct ceph_osd_backoff *backoff;
+	struct ceph_msg *msg;
+
+	dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
+	     m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
+
+	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
+	if (!spg) {
+		spg = alloc_spg_mapping();
+		if (!spg) {
+			pr_err("%s failed to allocate spg\n", __func__);
+			return;
+		}
+		spg->spgid = m->spgid; /* struct */
+		insert_spg_mapping(&osd->o_backoff_mappings, spg);
+	}
+
+	backoff = alloc_backoff();
+	if (!backoff) {
+		pr_err("%s failed to allocate backoff\n", __func__);
+		return;
+	}
+	backoff->spgid = m->spgid; /* struct */
+	backoff->id = m->id;
+	backoff->begin = m->begin;
+	m->begin = NULL; /* backoff now owns this */
+	backoff->end = m->end;
+	m->end = NULL;   /* ditto */
+
+	insert_backoff(&spg->backoffs, backoff);
+	insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
+
+	/*
+	 * Ack with original backoff's epoch so that the OSD can
+	 * discard this if there was a PG split.
+	 */
+	msg = create_backoff_message(backoff, m->map_epoch);
+	if (!msg) {
+		pr_err("%s failed to allocate msg\n", __func__);
+		return;
+	}
+	ceph_con_send(&osd->o_con, msg);
+}
+
+static bool target_contained_by(const struct ceph_osd_request_target *t,
+				const struct ceph_hobject_id *begin,
+				const struct ceph_hobject_id *end)
+{
+	struct ceph_hobject_id hoid;
+	int cmp;
+
+	hoid_fill_from_target(&hoid, t);
+	cmp = hoid_compare(&hoid, begin);
+	return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
+}
+
+static void handle_backoff_unblock(struct ceph_osd *osd,
+				   const struct MOSDBackoff *m)
+{
+	struct ceph_spg_mapping *spg;
+	struct ceph_osd_backoff *backoff;
+	struct rb_node *n;
+
+	dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
+	     m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
+
+	backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
+	if (!backoff) {
+		pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
+		       __func__, osd->o_osd, m->spgid.pgid.pool,
+		       m->spgid.pgid.seed, m->spgid.shard, m->id);
+		return;
+	}
+
+	if (hoid_compare(backoff->begin, m->begin) &&
+	    hoid_compare(backoff->end, m->end)) {
+		pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
+		       __func__, osd->o_osd, m->spgid.pgid.pool,
+		       m->spgid.pgid.seed, m->spgid.shard, m->id);
+		/* unblock it anyway... */
+	}
+
+	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
+	BUG_ON(!spg);
+
+	erase_backoff(&spg->backoffs, backoff);
+	erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
+	free_backoff(backoff);
+
+	if (RB_EMPTY_ROOT(&spg->backoffs)) {
+		erase_spg_mapping(&osd->o_backoff_mappings, spg);
+		free_spg_mapping(spg);
+	}
+
+	for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+		struct ceph_osd_request *req =
+		    rb_entry(n, struct ceph_osd_request, r_node);
+
+		if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
+			/*
+			 * Match against @m, not @backoff -- the PG may
+			 * have split on the OSD.
+			 */
+			if (target_contained_by(&req->r_t, m->begin, m->end)) {
+				/*
+				 * If no other installed backoff applies,
+				 * resend.
+				 */
+				send_request(req);
+			}
+		}
+	}
+}
+
+static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
+{
+	struct ceph_osd_client *osdc = osd->o_osdc;
+	struct MOSDBackoff m;
+	int ret;
+
+	down_read(&osdc->lock);
+	if (!osd_registered(osd)) {
+		dout("%s osd%d unknown\n", __func__, osd->o_osd);
+		up_read(&osdc->lock);
+		return;
+	}
+	WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
+
+	mutex_lock(&osd->lock);
+	ret = decode_MOSDBackoff(msg, &m);
+	if (ret) {
+		pr_err("failed to decode MOSDBackoff: %d\n", ret);
+		ceph_msg_dump(msg);
+		goto out_unlock;
+	}
+
+	switch (m.op) {
+	case CEPH_OSD_BACKOFF_OP_BLOCK:
+		handle_backoff_block(osd, &m);
+		break;
+	case CEPH_OSD_BACKOFF_OP_UNBLOCK:
+		handle_backoff_unblock(osd, &m);
+		break;
+	default:
+		pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
+	}
+
+	free_hoid(m.begin);
+	free_hoid(m.end);
+
+out_unlock:
+	mutex_unlock(&osd->lock);
+	up_read(&osdc->lock);
+}
+
 /*
  * Process osd watch notifications
  */
@@ -4509,6 +5098,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 	case CEPH_MSG_OSD_OPREPLY:
 		handle_reply(osd, msg);
 		break;
+	case CEPH_MSG_OSD_BACKOFF:
+		handle_backoff(osd, msg);
+		break;
 	case CEPH_MSG_WATCH_NOTIFY:
 		handle_watch_notify(osdc, msg);
 		break;
@@ -4631,6 +5223,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 	*skip = 0;
 	switch (type) {
 	case CEPH_MSG_OSD_MAP:
+	case CEPH_MSG_OSD_BACKOFF:
 	case CEPH_MSG_WATCH_NOTIFY:
 		return alloc_msg_with_page_vector(hdr);
 	case CEPH_MSG_OSD_OPREPLY:
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 1e2e190a4c2a..1d87a736221b 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -418,6 +418,22 @@ int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs)
 	return 0;
 }
 
+int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs)
+{
+	int ret;
+
+	ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid);
+	if (ret)
+		return ret;
+
+	if (lhs->shard < rhs->shard)
+		return -1;
+	if (lhs->shard > rhs->shard)
+		return 1;
+
+	return 0;
+}
+
 /*
  * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
  * to a set of osds) and primary_temp (explicit primary setting)