Blob Blame History Raw
From: Ilya Dryomov <idryomov@gmail.com>
Date: Tue, 15 May 2018 15:47:58 +0200
Subject: libceph: introduce ceph_osdc_abort_requests()
Git-commit: 66850df58529eefc61cb96b895991508547503bf
Patch-mainline: v4.18-rc1
References: FATE#324714

This will be used by the filesystem for "umount -f".

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Acked-by: Luis Henriques <lhenriques@suse.com>
---
 include/linux/ceph/osd_client.h |    2 +
 net/ceph/osd_client.c           |   67 +++++++++++++++++++++++++++++++++++++---
 2 files changed, 64 insertions(+), 5 deletions(-)

--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -341,6 +341,7 @@ struct ceph_osd_client {
 	struct rb_root         linger_map_checks;
 	atomic_t               num_requests;
 	atomic_t               num_homeless;
+	int                    abort_err;
 	struct delayed_work    timeout_work;
 	struct delayed_work    osds_timeout_work;
 #ifdef CONFIG_DEBUG_FS
@@ -372,6 +373,7 @@ extern void ceph_osdc_handle_reply(struc
 extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
 				 struct ceph_msg *msg);
 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb);
+void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err);
 
 extern void osd_req_op_init(struct ceph_osd_request *osd_req,
 			    unsigned int which, u16 opcode, u32 flags);
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -992,6 +992,38 @@ EXPORT_SYMBOL(ceph_osdc_new_request);
 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
 
+/*
+ * Call @fn on each OSD request as long as @fn returns 0.
+ */
+static void for_each_request(struct ceph_osd_client *osdc,
+			int (*fn)(struct ceph_osd_request *req, void *arg),
+			void *arg)
+{
+	struct rb_node *n, *p;
+
+	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+		for (p = rb_first(&osd->o_requests); p; ) {
+			struct ceph_osd_request *req =
+			    rb_entry(p, struct ceph_osd_request, r_node);
+
+			p = rb_next(p);
+			if (fn(req, arg))
+				return;
+		}
+	}
+
+	for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
+		struct ceph_osd_request *req =
+		    rb_entry(p, struct ceph_osd_request, r_node);
+
+		p = rb_next(p);
+		if (fn(req, arg))
+			return;
+	}
+}
+
 static bool osd_homeless(struct ceph_osd *osd)
 {
 	return osd->o_osd == CEPH_HOMELESS_OSD;
@@ -2099,9 +2131,9 @@ static void __submit_request(struct ceph
 	struct ceph_osd_client *osdc = req->r_osdc;
 	struct ceph_osd *osd;
 	enum calc_target_result ct_res;
+	int err = 0;
 	bool need_send = false;
 	bool promoted = false;
-	bool need_abort = false;
 
 	WARN_ON(req->r_tid);
 	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
@@ -2117,7 +2149,10 @@ again:
 		goto promote;
 	}
 
-	if (osdc->osdmap->epoch < osdc->epoch_barrier) {
+	if (osdc->abort_err) {
+		dout("req %p abort_err %d\n", req, osdc->abort_err);
+		err = osdc->abort_err;
+	} else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
 		dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
 		     osdc->epoch_barrier);
 		req->r_t.paused = true;
@@ -2142,7 +2177,7 @@ again:
 		req->r_t.paused = true;
 		maybe_request_map(osdc);
 		if (req->r_abort_on_full)
-			need_abort = true;
+			err = -ENOSPC;
 	} else if (!osd_homeless(osd)) {
 		need_send = true;
 	} else {
@@ -2159,8 +2194,8 @@ again:
 	link_request(osd, req);
 	if (need_send)
 		send_request(req);
-	else if (need_abort)
-		complete_request(req, -ENOSPC);
+	else if (err)
+		complete_request(req, err);
 	mutex_unlock(&osd->lock);
 
 	if (ct_res == CALC_TARGET_POOL_DNE)
@@ -2274,6 +2309,28 @@ static void abort_request(struct ceph_os
 	complete_request(req, err);
 }
 
+static int abort_fn(struct ceph_osd_request *req, void *arg)
+{
+	int err = *(int *)arg;
+
+	abort_request(req, err);
+	return 0; /* continue iteration */
+}
+
+/*
+ * Abort all in-flight requests with @err and arrange for all future
+ * requests to be failed immediately.
+ */
+void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err)
+{
+	dout("%s osdc %p err %d\n", __func__, osdc, err);
+	down_write(&osdc->lock);
+	for_each_request(osdc, abort_fn, &err);
+	osdc->abort_err = err;
+	up_write(&osdc->lock);
+}
+EXPORT_SYMBOL(ceph_osdc_abort_requests);
+
 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
 {
 	if (likely(eb > osdc->epoch_barrier)) {