Blob Blame History Raw
From: Jeff Layton <jlayton@kernel.org>
Date: Tue, 31 Aug 2021 13:39:13 -0400
Subject: ceph: shut down access to inode when async create fails
Git-commit: 5d6451b1489ad1781a0778cc876bf26a21910413
Patch-mainline: v5.16-rc1
References: jsc#SES-1880

Add proper error handling for when an async create fails. The inode
never existed, so any dirty caps or data are now toast. We already
d_drop the dentry in that case, but the now-stale inode may still be
around. We want to shut down access to these inodes, and ensure that
they can't harbor any more dirty data, which can cause problems at
umount time.

When this occurs, flag such inodes as being SHUTDOWN, and trash any caps
and cap flushes that may be in flight for them, and invalidate the
pagecache for the inode. Add a new helper that can check whether an
inode or an entire mount is now shut down, and call it instead of
accessing the mount_state directly in places where we test that now.

URL: https://tracker.ceph.com/issues/51279
Signed-off-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Acked-by: Luis Henriques <lhenriques@suse.com>
---
 fs/ceph/addr.c   |   16 +++++++++++-----
 fs/ceph/caps.c   |   12 ++++++------
 fs/ceph/export.c |   12 +++++++++++-
 fs/ceph/file.c   |   10 +++++++++-
 fs/ceph/inode.c  |   33 +++++++++++++++++++++++++++++++--
 fs/ceph/locks.c  |    6 ++++++
 fs/ceph/super.h  |   11 +++++++++++
 7 files changed, 85 insertions(+), 15 deletions(-)

--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -724,7 +724,7 @@ static int ceph_writepages_start(struct
 	     wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
 	     (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
 
-	if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
+	if (ceph_inode_is_shutdown(inode)) {
 		if (ci->i_wrbuffer_ref > 0) {
 			pr_warn_ratelimited(
 				"writepage_start %p %lld forced umount\n",
@@ -1145,12 +1145,12 @@ static struct ceph_snap_context *
 ceph_find_incompatible(struct page *page)
 {
 	struct inode *inode = page->mapping->host;
-	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 
-	if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
-		dout(" page %p forced umount\n", page);
-		return ERR_PTR(-EIO);
+	if (ceph_inode_is_shutdown(inode)) {
+		dout(" page %p %llx:%llx is shutdown\n", page,
+		     ceph_vinop(inode));
+		return ERR_PTR(-ESTALE);
 	}
 
 	for (;;) {
@@ -1345,6 +1345,9 @@ static vm_fault_t ceph_filemap_fault(str
 	sigset_t oldset;
 	vm_fault_t ret = VM_FAULT_SIGBUS;
 
+	if (ceph_inode_is_shutdown(inode))
+		return ret;
+
 	ceph_block_sigs(&oldset);
 
 	dout("filemap_fault %p %llx.%llx %llu trying to get caps\n",
@@ -1436,6 +1439,9 @@ static vm_fault_t ceph_page_mkwrite(stru
 	sigset_t oldset;
 	vm_fault_t ret = VM_FAULT_SIGBUS;
 
+	if (ceph_inode_is_shutdown(inode))
+		return ret;
+
 	prealloc_cf = ceph_alloc_cap_flush();
 	if (!prealloc_cf)
 		return VM_FAULT_OOM;
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1188,11 +1188,11 @@ void ceph_remove_cap(struct ceph_cap *ca
 
 	lockdep_assert_held(&ci->i_ceph_lock);
 
-	fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
+	fsc = ceph_inode_to_client(&ci->vfs_inode);
 	WARN_ON_ONCE(ci->i_auth_cap == cap &&
 		     !list_empty(&ci->i_dirty_item) &&
 		     !fsc->blocklisted &&
-		     READ_ONCE(fsc->mount_state) != CEPH_MOUNT_SHUTDOWN);
+		     !ceph_inode_is_shutdown(&ci->vfs_inode));
 
 	__ceph_remove_cap(cap, queue_release);
 }
@@ -2750,9 +2750,9 @@ again:
 			goto out_unlock;
 		}
 
-		if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
-			dout("get_cap_refs %p forced umount\n", inode);
-			ret = -EIO;
+		if (ceph_inode_is_shutdown(inode)) {
+			dout("get_cap_refs %p inode is shutdown\n", inode);
+			ret = -ESTALE;
 			goto out_unlock;
 		}
 		mds_wanted = __ceph_caps_mds_wanted(ci, false);
@@ -4604,7 +4604,7 @@ int ceph_purge_inode_cap(struct inode *i
 	if (is_auth) {
 		struct ceph_cap_flush *cf;
 
-		if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
+		if (ceph_inode_is_shutdown(inode)) {
 			if (inode->i_data.nrpages > 0)
 				*invalidate = true;
 			if (ci->i_wrbuffer_ref > 0)
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -157,6 +157,11 @@ static struct inode *__lookup_inode(stru
 		ceph_mdsc_put_request(req);
 		if (!inode)
 			return err < 0 ? ERR_PTR(err) : ERR_PTR(-ESTALE);
+	} else {
+		if (ceph_inode_is_shutdown(inode)) {
+			iput(inode);
+			return ERR_PTR(-ESTALE);
+		}
 	}
 	return inode;
 }
@@ -223,8 +228,13 @@ static struct dentry *__snapfh_to_dentry
 		return ERR_PTR(-ESTALE);
 
 	inode = ceph_find_inode(sb, vino);
-	if (inode)
+	if (inode) {
+		if (ceph_inode_is_shutdown(inode)) {
+			iput(inode);
+			return ERR_PTR(-ESTALE);
+		}
 		return d_obtain_alias(inode);
+	}
 
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
 				       USE_ANY_MDS);
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -525,6 +525,7 @@ static void ceph_async_create_cb(struct
 
 	if (result) {
 		struct dentry *dentry = req->r_dentry;
+		struct inode *inode = d_inode(dentry);
 		int pathlen = 0;
 		u64 base = 0;
 		char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
@@ -534,7 +535,8 @@ static void ceph_async_create_cb(struct
 		if (!d_unhashed(dentry))
 			d_drop(dentry);
 
-		/* FIXME: start returning I/O errors on all accesses? */
+		ceph_inode_shutdown(inode);
+
 		pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
 			base, IS_ERR(path) ? "<<bad>>" : path, result);
 		ceph_mdsc_free_path(path, pathlen);
@@ -1526,6 +1528,9 @@ again:
 	dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
 	     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
 
+	if (ceph_inode_is_shutdown(inode))
+		return -ESTALE;
+
 	if (direct_lock)
 		ceph_start_io_direct(inode);
 	else
@@ -1678,6 +1683,9 @@ static ssize_t ceph_write_iter(struct ki
 	loff_t pos;
 	loff_t limit = max(i_size_read(inode), fsc->max_file_size);
 
+	if (ceph_inode_is_shutdown(inode))
+		return -ESTALE;
+
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EROFS;
 
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1841,13 +1841,12 @@ void ceph_queue_inode_work(struct inode
 static void ceph_do_invalidate_pages(struct inode *inode)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	u32 orig_gen;
 	int check = 0;
 
 	mutex_lock(&ci->i_truncate_mutex);
 
-	if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
+	if (ceph_inode_is_shutdown(inode)) {
 		pr_warn_ratelimited("%s: inode %llx.%llx is shut down\n",
 				    __func__, ceph_vinop(inode));
 		mapping_set_error(inode->i_mapping, -EIO);
@@ -2218,6 +2217,9 @@ int ceph_setattr(struct user_namespace *
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EROFS;
 
+	if (ceph_inode_is_shutdown(inode))
+		return -ESTALE;
+
 	err = setattr_prepare(&init_user_ns, dentry, attr);
 	if (err != 0)
 		return err;
@@ -2348,6 +2350,9 @@ int ceph_getattr(struct user_namespace *
 	u32 valid_mask = STATX_BASIC_STATS;
 	int err = 0;
 
+	if (ceph_inode_is_shutdown(inode))
+		return -ESTALE;
+
 	/* Skip the getattr altogether if we're asked not to sync */
 	if (!(flags & AT_STATX_DONT_SYNC)) {
 		err = ceph_do_getattr(inode,
@@ -2395,3 +2400,27 @@ int ceph_getattr(struct user_namespace *
 	stat->result_mask = request_mask & valid_mask;
 	return err;
 }
+
+void ceph_inode_shutdown(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct rb_node *p;
+	int iputs = 0;
+	bool invalidate = false;
+
+	spin_lock(&ci->i_ceph_lock);
+	ci->i_ceph_flags |= CEPH_I_SHUTDOWN;
+	p = rb_first(&ci->i_caps);
+	while (p) {
+		struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
+
+		p = rb_next(p);
+		iputs += ceph_purge_inode_cap(inode, cap, &invalidate);
+	}
+	spin_unlock(&ci->i_ceph_lock);
+
+	if (invalidate)
+		ceph_queue_invalidate(inode);
+	while (iputs--)
+		iput(inode);
+}
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -244,6 +244,9 @@ int ceph_lock(struct file *file, int cmd
 	if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK)
 		return -ENOLCK;
 
+	if (ceph_inode_is_shutdown(inode))
+		return -ESTALE;
+
 	dout("ceph_lock, fl_owner: %p\n", fl->fl_owner);
 
 	/* set wait bit as appropriate, then make command as Ceph expects it*/
@@ -309,6 +312,9 @@ int ceph_flock(struct file *file, int cm
 	if (fl->fl_type & LOCK_MAND)
 		return -EOPNOTSUPP;
 
+	if (ceph_inode_is_shutdown(inode))
+		return -ESTALE;
+
 	dout("ceph_flock, fl_file: %p\n", fl->fl_file);
 
 	spin_lock(&ci->i_ceph_lock);
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -581,6 +581,7 @@ static inline struct inode *ceph_find_in
 #define CEPH_I_ODIRECT		(1 << 11) /* inode in direct I/O mode */
 #define CEPH_ASYNC_CREATE_BIT	(12)	  /* async create in flight for this */
 #define CEPH_I_ASYNC_CREATE	(1 << CEPH_ASYNC_CREATE_BIT)
+#define CEPH_I_SHUTDOWN		(1 << 13) /* inode is no longer usable */
 
 /*
  * Masks of ceph inode work.
@@ -1028,6 +1029,16 @@ extern int ceph_setattr(struct user_name
 extern int ceph_getattr(struct user_namespace *mnt_userns,
 			const struct path *path, struct kstat *stat,
 			u32 request_mask, unsigned int flags);
+void ceph_inode_shutdown(struct inode *inode);
+
+static inline bool ceph_inode_is_shutdown(struct inode *inode)
+{
+	unsigned long flags = READ_ONCE(ceph_inode(inode)->i_ceph_flags);
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	int state = READ_ONCE(fsc->mount_state);
+
+	return (flags & CEPH_I_SHUTDOWN) || state >= CEPH_MOUNT_SHUTDOWN;
+}
 
 /* xattr.c */
 int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int);