Blob Blame History Raw
From: David Howells <dhowells@redhat.com>
Date: Wed, 15 Dec 2021 23:48:33 +0000
Subject: ceph: uninline the data on a file opened for writing
Git-commit: 083db6fd3e73fc4f9ee74d2da1012a94216d6a60
Patch-mainline: v5.18-rc1
References: jsc#SES-1880

If a ceph file is made up of inline data, uninline that in the ceph_open()
rather than in ceph_page_mkwrite(), ceph_write_iter(), ceph_fallocate() or
ceph_write_begin().

This makes it easier to convert to using the netfs library for VM write
hooks.

Should this also take the inode lock for the duration on uninlining to
prevent a race with truncation?

[ jlayton: fix up folio locking, update i_inline_version after write ]

Signed-off-by: David Howells <dhowells@redhat.com>
Signed-off-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Acked-by: Luis Henriques <lhenriques@suse.com>
---
 fs/ceph/addr.c  |  148 +++++++++++++++++---------------------------------------
 fs/ceph/file.c  |   32 ++++++------
 fs/ceph/super.h |    2 
 3 files changed, 63 insertions(+), 119 deletions(-)

--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1315,41 +1315,11 @@ static int ceph_write_begin(struct file
 			    struct page **pagep, void **fsdata)
 {
 	struct inode *inode = file_inode(file);
-	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct page *page = NULL;
-	pgoff_t index = pos >> PAGE_SHIFT;
 	int r;
 
-	/*
-	 * Uninlining should have already been done and everything updated, EXCEPT
-	 * for inline_version sent to the MDS.
-	 */
-	if (ci->i_inline_version != CEPH_INLINE_NONE) {
-		page = grab_cache_page_write_begin(mapping, index, flags);
-		if (!page)
-			return -ENOMEM;
-
-		/*
-		 * The inline_version on a new inode is set to 1. If that's the
-		 * case, then the page is brand new and isn't yet Uptodate.
-		 */
-		r = 0;
-		if (index == 0 && ci->i_inline_version != 1) {
-			if (!PageUptodate(page)) {
-				WARN_ONCE(1, "ceph: write_begin called on still-inlined inode (inline_version %llu)!\n",
-					  ci->i_inline_version);
-				r = -EINVAL;
-			}
-			goto out;
-		}
-		zero_user_segment(page, 0, thp_size(page));
-		SetPageUptodate(page);
-		goto out;
-	}
-
 	r = netfs_write_begin(file, inode->i_mapping, pos, len, 0, &page, NULL,
 			      &ceph_netfs_read_ops, NULL);
-out:
 	if (r == 0)
 		wait_on_page_fscache(page);
 	if (r < 0) {
@@ -1544,19 +1514,6 @@ static vm_fault_t ceph_page_mkwrite(stru
 	sb_start_pagefault(inode->i_sb);
 	ceph_block_sigs(&oldset);
 
-	if (ci->i_inline_version != CEPH_INLINE_NONE) {
-		struct page *locked_page = NULL;
-		if (off == 0) {
-			lock_page(page);
-			locked_page = page;
-		}
-		err = ceph_uninline_data(vma->vm_file, locked_page);
-		if (locked_page)
-			unlock_page(locked_page);
-		if (err < 0)
-			goto out_free;
-	}
-
 	if (off + thp_size(page) <= size)
 		len = thp_size(page);
 	else
@@ -1613,11 +1570,9 @@ static vm_fault_t ceph_page_mkwrite(stru
 		ceph_put_snap_context(snapc);
 	} while (err == 0);
 
-	if (ret == VM_FAULT_LOCKED ||
-	    ci->i_inline_version != CEPH_INLINE_NONE) {
+	if (ret == VM_FAULT_LOCKED) {
 		int dirty;
 		spin_lock(&ci->i_ceph_lock);
-		ci->i_inline_version = CEPH_INLINE_NONE;
 		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
 					       &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
@@ -1681,16 +1636,29 @@ void ceph_fill_inline_data(struct inode
 	}
 }
 
-int ceph_uninline_data(struct file *filp, struct page *locked_page)
+int ceph_uninline_data(struct file *file)
 {
-	struct inode *inode = file_inode(filp);
+	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_osd_request *req;
+	struct ceph_cap_flush *prealloc_cf;
 	struct page *page = NULL;
+	struct page *pages[1];
 	u64 len, inline_version;
 	int err = 0;
-	bool from_pagecache = false;
+
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
+	page = read_mapping_page(inode->i_mapping, 0, file);
+	if (IS_ERR(page)) {
+		err = PTR_ERR(page);
+		goto out;
+	}
+
+	lock_page(page);
 
 	spin_lock(&ci->i_ceph_lock);
 	inline_version = ci->i_inline_version;
@@ -1701,45 +1669,11 @@ int ceph_uninline_data(struct file *filp
 
 	if (inline_version == 1 || /* initial version, no data */
 	    inline_version == CEPH_INLINE_NONE)
-		goto out;
+		goto out_unlock;
 
-	if (locked_page) {
-		page = locked_page;
-		WARN_ON(!PageUptodate(page));
-	} else if (ceph_caps_issued(ci) &
-		   (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
-		page = find_get_page(inode->i_mapping, 0);
-		if (page) {
-			if (PageUptodate(page)) {
-				from_pagecache = true;
-				lock_page(page);
-			} else {
-				put_page(page);
-				page = NULL;
-			}
-		}
-	}
-
-	if (page) {
-		len = i_size_read(inode);
-		if (len > PAGE_SIZE)
-			len = PAGE_SIZE;
-	} else {
-		page = __page_cache_alloc(GFP_NOFS);
-		if (!page) {
-			err = -ENOMEM;
-			goto out;
-		}
-		err = __ceph_do_getattr(inode, page,
-					CEPH_STAT_CAP_INLINE_DATA, true);
-		if (err < 0) {
-			/* no inline data */
-			if (err == -ENODATA)
-				err = 0;
-			goto out;
-		}
-		len = err;
-	}
+	len = i_size_read(inode);
+	if (len > page_size(page))
+		len = page_size(page);
 
 	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 				    ceph_vino(inode), 0, &len, 0, 1,
@@ -1747,7 +1681,7 @@ int ceph_uninline_data(struct file *filp
 				    NULL, 0, 0, false);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
-		goto out;
+		goto out_unlock;
 	}
 
 	req->r_mtime = inode->i_mtime;
@@ -1756,7 +1690,7 @@ int ceph_uninline_data(struct file *filp
 		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
 	ceph_osdc_put_request(req);
 	if (err < 0)
-		goto out;
+		goto out_unlock;
 
 	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 				    ceph_vino(inode), 0, &len, 1, 3,
@@ -1765,10 +1699,11 @@ int ceph_uninline_data(struct file *filp
 				    ci->i_truncate_size, false);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
-		goto out;
+		goto out_unlock;
 	}
 
-	osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
+	pages[0] = page;
+	osd_req_op_extent_osd_data_pages(req, 1, pages, len, 0, false, false);
 
 	{
 		__le64 xattr_buf = cpu_to_le64(inline_version);
@@ -1778,7 +1713,7 @@ int ceph_uninline_data(struct file *filp
 					    CEPH_OSD_CMPXATTR_OP_GT,
 					    CEPH_OSD_CMPXATTR_MODE_U64);
 		if (err)
-			goto out_put;
+			goto out_put_req;
 	}
 
 	{
@@ -1789,7 +1724,7 @@ int ceph_uninline_data(struct file *filp
 					    "inline_version",
 					    xattr_buf, xattr_len, 0, 0);
 		if (err)
-			goto out_put;
+			goto out_put_req;
 	}
 
 	req->r_mtime = inode->i_mtime;
@@ -1800,19 +1735,28 @@ int ceph_uninline_data(struct file *filp
 	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
 				  req->r_end_latency, len, err);
 
-out_put:
+	if (!err) {
+		int dirty;
+
+		/* Set to CAP_INLINE_NONE and dirty the caps */
+		down_read(&fsc->mdsc->snap_rwsem);
+		spin_lock(&ci->i_ceph_lock);
+		ci->i_inline_version = CEPH_INLINE_NONE;
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf);
+		spin_unlock(&ci->i_ceph_lock);
+		up_read(&fsc->mdsc->snap_rwsem);
+		if (dirty)
+			__mark_inode_dirty(inode, dirty);
+	}
+out_put_req:
 	ceph_osdc_put_request(req);
 	if (err == -ECANCELED)
 		err = 0;
+out_unlock:
+	unlock_page(page);
+	put_page(page);
 out:
-	if (page && page != locked_page) {
-		if (from_pagecache) {
-			unlock_page(page);
-			put_page(page);
-		} else
-			__free_pages(page, 0);
-	}
-
+	ceph_free_cap_flush(prealloc_cf);
 	dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
 	     inode, ceph_vinop(inode), inline_version, err);
 	return err;
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -207,6 +207,7 @@ static int ceph_init_file_info(struct in
 	struct ceph_mount_options *opt =
 		ceph_inode_to_client(&ci->vfs_inode)->mount_options;
 	struct ceph_file_info *fi;
+	int ret;
 
 	dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
 			inode->i_mode, isdir ? "dir" : "regular");
@@ -240,7 +241,22 @@ static int ceph_init_file_info(struct in
 	INIT_LIST_HEAD(&fi->rw_contexts);
 	fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen);
 
+	if ((file->f_mode & FMODE_WRITE) &&
+	    ci->i_inline_version != CEPH_INLINE_NONE) {
+		ret = ceph_uninline_data(file);
+		if (ret < 0)
+			goto error;
+	}
+
 	return 0;
+
+error:
+	ceph_fscache_unuse_cookie(inode, file->f_mode & FMODE_WRITE);
+	ceph_put_fmode(ci, fi->fmode, 1);
+	kmem_cache_free(ceph_file_cachep, fi);
+	/* wake up anyone waiting for caps on this inode */
+	wake_up_all(&ci->i_cap_wq);
+	return ret;
 }
 
 /*
@@ -1041,7 +1057,6 @@ static void ceph_aio_complete(struct ino
 		}
 
 		spin_lock(&ci->i_ceph_lock);
-		ci->i_inline_version = CEPH_INLINE_NONE;
 		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
 					       &aio_req->prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
@@ -1778,12 +1793,6 @@ retry_snap:
 	if (err)
 		goto out;
 
-	if (ci->i_inline_version != CEPH_INLINE_NONE) {
-		err = ceph_uninline_data(file, NULL);
-		if (err < 0)
-			goto out;
-	}
-
 	dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
 	     inode, ceph_vinop(inode), pos, count, i_size_read(inode));
 	if (!(fi->flags & CEPH_F_SYNC) && !direct_lock)
@@ -1855,7 +1864,6 @@ retry_snap:
 		int dirty;
 
 		spin_lock(&ci->i_ceph_lock);
-		ci->i_inline_version = CEPH_INLINE_NONE;
 		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
 					       &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
@@ -2109,12 +2117,6 @@ static long ceph_fallocate(struct file *
 		goto unlock;
 	}
 
-	if (ci->i_inline_version != CEPH_INLINE_NONE) {
-		ret = ceph_uninline_data(file, NULL);
-		if (ret < 0)
-			goto unlock;
-	}
-
 	size = i_size_read(inode);
 
 	/* Are we punching a hole beyond EOF? */
@@ -2139,7 +2141,6 @@ static long ceph_fallocate(struct file *
 
 	if (!ret) {
 		spin_lock(&ci->i_ceph_lock);
-		ci->i_inline_version = CEPH_INLINE_NONE;
 		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
 					       &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
@@ -2532,7 +2533,6 @@ static ssize_t __ceph_copy_file_range(st
 	}
 	/* Mark Fw dirty */
 	spin_lock(&dst_ci->i_ceph_lock);
-	dst_ci->i_inline_version = CEPH_INLINE_NONE;
 	dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
 	spin_unlock(&dst_ci->i_ceph_lock);
 	if (dirty)
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -1213,7 +1213,7 @@ extern void __ceph_touch_fmode(struct ce
 /* addr.c */
 extern const struct address_space_operations ceph_aops;
 extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
-extern int ceph_uninline_data(struct file *filp, struct page *locked_page);
+extern int ceph_uninline_data(struct file *file);
 extern int ceph_pool_perm_check(struct inode *inode, int need);
 extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
 int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invalidate);