ceph: uninline the data on a file opened for writing

If a ceph file is made up of inline data, uninline that in the ceph_open()
rather than in ceph_page_mkwrite(), ceph_write_iter(), ceph_fallocate() or
ceph_write_begin().

This makes it easier to convert to using the netfs library for VM write
hooks.

Should this also take the inode lock for the duration on uninlining to
prevent a race with truncation?

[ jlayton: fix up folio locking, update i_inline_version after write ]

Signed-off-by: David Howells <dhowells@redhat.com>
Signed-off-by: Jeff Layton <jlayton@kernel.org>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
This commit is contained in:
David Howells 2021-12-15 23:48:33 +00:00 committed by Ilya Dryomov
parent 5b19f1eba4
commit 083db6fd3e
3 changed files with 64 additions and 124 deletions

View file

@ -1317,45 +1317,11 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
struct page **pagep, void **fsdata)
{
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct folio *folio = NULL;
pgoff_t index = pos >> PAGE_SHIFT;
int r;
/*
* Uninlining should have already been done and everything updated, EXCEPT
* for inline_version sent to the MDS.
*/
if (ci->i_inline_version != CEPH_INLINE_NONE) {
unsigned int fgp_flags = FGP_LOCK | FGP_WRITE | FGP_CREAT | FGP_STABLE;
if (aop_flags & AOP_FLAG_NOFS)
fgp_flags |= FGP_NOFS;
folio = __filemap_get_folio(mapping, index, fgp_flags,
mapping_gfp_mask(mapping));
if (!folio)
return -ENOMEM;
/*
* The inline_version on a new inode is set to 1. If that's the
* case, then the folio is brand new and isn't yet Uptodate.
*/
r = 0;
if (index == 0 && ci->i_inline_version != 1) {
if (!folio_test_uptodate(folio)) {
WARN_ONCE(1, "ceph: write_begin called on still-inlined inode (inline_version %llu)!\n",
ci->i_inline_version);
r = -EINVAL;
}
goto out;
}
zero_user_segment(&folio->page, 0, folio_size(folio));
folio_mark_uptodate(folio);
goto out;
}
r = netfs_write_begin(file, inode->i_mapping, pos, len, 0, &folio, NULL,
&ceph_netfs_read_ops, NULL);
out:
if (r == 0)
folio_wait_fscache(folio);
if (r < 0) {
@ -1551,19 +1517,6 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
sb_start_pagefault(inode->i_sb);
ceph_block_sigs(&oldset);
if (ci->i_inline_version != CEPH_INLINE_NONE) {
struct page *locked_page = NULL;
if (off == 0) {
lock_page(page);
locked_page = page;
}
err = ceph_uninline_data(vma->vm_file, locked_page);
if (locked_page)
unlock_page(locked_page);
if (err < 0)
goto out_free;
}
if (off + thp_size(page) <= size)
len = thp_size(page);
else
@ -1620,11 +1573,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
ceph_put_snap_context(snapc);
} while (err == 0);
if (ret == VM_FAULT_LOCKED ||
ci->i_inline_version != CEPH_INLINE_NONE) {
if (ret == VM_FAULT_LOCKED) {
int dirty;
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
&prealloc_cf);
spin_unlock(&ci->i_ceph_lock);
@ -1688,16 +1639,29 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
}
}
int ceph_uninline_data(struct file *filp, struct page *locked_page)
int ceph_uninline_data(struct file *file)
{
struct inode *inode = file_inode(filp);
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_osd_request *req;
struct page *page = NULL;
struct ceph_cap_flush *prealloc_cf;
struct folio *folio = NULL;
struct page *pages[1];
u64 len, inline_version;
int err = 0;
bool from_pagecache = false;
prealloc_cf = ceph_alloc_cap_flush();
if (!prealloc_cf)
return -ENOMEM;
folio = read_mapping_folio(inode->i_mapping, 0, file);
if (IS_ERR(folio)) {
err = PTR_ERR(folio);
goto out;
}
folio_lock(folio);
spin_lock(&ci->i_ceph_lock);
inline_version = ci->i_inline_version;
@ -1708,45 +1672,11 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
if (inline_version == 1 || /* initial version, no data */
inline_version == CEPH_INLINE_NONE)
goto out;
goto out_unlock;
if (locked_page) {
page = locked_page;
WARN_ON(!PageUptodate(page));
} else if (ceph_caps_issued(ci) &
(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
page = find_get_page(inode->i_mapping, 0);
if (page) {
if (PageUptodate(page)) {
from_pagecache = true;
lock_page(page);
} else {
put_page(page);
page = NULL;
}
}
}
if (page) {
len = i_size_read(inode);
if (len > PAGE_SIZE)
len = PAGE_SIZE;
} else {
page = __page_cache_alloc(GFP_NOFS);
if (!page) {
err = -ENOMEM;
goto out;
}
err = __ceph_do_getattr(inode, page,
CEPH_STAT_CAP_INLINE_DATA, true);
if (err < 0) {
/* no inline data */
if (err == -ENODATA)
err = 0;
goto out;
}
len = err;
}
len = i_size_read(inode);
if (len > folio_size(folio))
len = folio_size(folio);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode), 0, &len, 0, 1,
@ -1754,7 +1684,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
NULL, 0, 0, false);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
goto out_unlock;
}
req->r_mtime = inode->i_mtime;
@ -1763,7 +1693,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
ceph_osdc_put_request(req);
if (err < 0)
goto out;
goto out_unlock;
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode), 0, &len, 1, 3,
@ -1772,10 +1702,11 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
ci->i_truncate_size, false);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
goto out_unlock;
}
osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
pages[0] = folio_page(folio, 0);
osd_req_op_extent_osd_data_pages(req, 1, pages, len, 0, false, false);
{
__le64 xattr_buf = cpu_to_le64(inline_version);
@ -1785,7 +1716,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
CEPH_OSD_CMPXATTR_OP_GT,
CEPH_OSD_CMPXATTR_MODE_U64);
if (err)
goto out_put;
goto out_put_req;
}
{
@ -1796,7 +1727,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
"inline_version",
xattr_buf, xattr_len, 0, 0);
if (err)
goto out_put;
goto out_put_req;
}
req->r_mtime = inode->i_mtime;
@ -1807,19 +1738,28 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, len, err);
out_put:
if (!err) {
int dirty;
/* Set to CAP_INLINE_NONE and dirty the caps */
down_read(&fsc->mdsc->snap_rwsem);
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf);
spin_unlock(&ci->i_ceph_lock);
up_read(&fsc->mdsc->snap_rwsem);
if (dirty)
__mark_inode_dirty(inode, dirty);
}
out_put_req:
ceph_osdc_put_request(req);
if (err == -ECANCELED)
err = 0;
out_unlock:
folio_unlock(folio);
folio_put(folio);
out:
if (page && page != locked_page) {
if (from_pagecache) {
unlock_page(page);
put_page(page);
} else
__free_pages(page, 0);
}
ceph_free_cap_flush(prealloc_cf);
dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
inode, ceph_vinop(inode), inline_version, err);
return err;

View file

@ -207,6 +207,7 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
struct ceph_mount_options *opt =
ceph_inode_to_client(&ci->vfs_inode)->mount_options;
struct ceph_file_info *fi;
int ret;
dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
inode->i_mode, isdir ? "dir" : "regular");
@ -240,7 +241,22 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
INIT_LIST_HEAD(&fi->rw_contexts);
fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen);
if ((file->f_mode & FMODE_WRITE) &&
ci->i_inline_version != CEPH_INLINE_NONE) {
ret = ceph_uninline_data(file);
if (ret < 0)
goto error;
}
return 0;
error:
ceph_fscache_unuse_cookie(inode, file->f_mode & FMODE_WRITE);
ceph_put_fmode(ci, fi->fmode, 1);
kmem_cache_free(ceph_file_cachep, fi);
/* wake up anyone waiting for caps on this inode */
wake_up_all(&ci->i_cap_wq);
return ret;
}
/*
@ -1041,7 +1057,6 @@ static void ceph_aio_complete(struct inode *inode,
}
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
&aio_req->prealloc_cf);
spin_unlock(&ci->i_ceph_lock);
@ -1778,12 +1793,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
if (err)
goto out;
if (ci->i_inline_version != CEPH_INLINE_NONE) {
err = ceph_uninline_data(file, NULL);
if (err < 0)
goto out;
}
dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
inode, ceph_vinop(inode), pos, count, i_size_read(inode));
if (!(fi->flags & CEPH_F_SYNC) && !direct_lock)
@ -1855,7 +1864,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
int dirty;
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
&prealloc_cf);
spin_unlock(&ci->i_ceph_lock);
@ -2109,12 +2117,6 @@ static long ceph_fallocate(struct file *file, int mode,
goto unlock;
}
if (ci->i_inline_version != CEPH_INLINE_NONE) {
ret = ceph_uninline_data(file, NULL);
if (ret < 0)
goto unlock;
}
size = i_size_read(inode);
/* Are we punching a hole beyond EOF? */
@ -2139,7 +2141,6 @@ static long ceph_fallocate(struct file *file, int mode,
if (!ret) {
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
&prealloc_cf);
spin_unlock(&ci->i_ceph_lock);
@ -2532,7 +2533,6 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
}
/* Mark Fw dirty */
spin_lock(&dst_ci->i_ceph_lock);
dst_ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
spin_unlock(&dst_ci->i_ceph_lock);
if (dirty)

View file

@ -1213,7 +1213,7 @@ extern void __ceph_touch_fmode(struct ceph_inode_info *ci,
/* addr.c */
extern const struct address_space_operations ceph_aops;
extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
extern int ceph_uninline_data(struct file *filp, struct page *locked_page);
extern int ceph_uninline_data(struct file *file);
extern int ceph_pool_perm_check(struct inode *inode, int need);
extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invalidate);