libceph: support sparse reads on msgr2 secure codepath

Add a new init_sgs_pages helper that populates the scatterlist from
an arbitrary point in an array of pages.

Change setup_message_sgs to take an optional pointer to an array of
pages. If that's set, then the scatterlist will be set using that
array instead of the cursor.

When given a sparse read on a secure connection, decrypt the data
in-place rather than into the final destination, by passing it the
in_enc_pages array.

After decrypting, run the sparse_read state machine in a loop, copying
data from the decrypted pages until it's complete.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
Reviewed-by: Xiubo Li <xiubli@redhat.com>
Reviewed-and-tested-by: Luís Henriques <lhenriques@suse.de>
Reviewed-by: Milind Changire <mchangir@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
This commit is contained in:
Jeff Layton 2022-03-23 12:17:15 -04:00 committed by Ilya Dryomov
parent ec3bc567ea
commit f36217e35c
1 changed files with 110 additions and 10 deletions

View File

@ -969,12 +969,48 @@ static void init_sgs_cursor(struct scatterlist **sg,
}
}
/**
* init_sgs_pages: set up scatterlist on an array of page pointers
* @sg: scatterlist to populate
* @pages: pointer to page array
* @dpos: position in the array to start (bytes)
* @dlen: len to add to sg (bytes)
* @pad: pointer to pad destination (if any)
*
* Populate the scatterlist from the page array, starting at an arbitrary
* byte in the array and running for a specified length.
*/
static void init_sgs_pages(struct scatterlist **sg, struct page **pages,
int dpos, int dlen, u8 *pad)
{
int idx = dpos >> PAGE_SHIFT;
int off = offset_in_page(dpos);
int resid = dlen;
do {
int len = min(resid, (int)PAGE_SIZE - off);
sg_set_page(*sg, pages[idx], len, off);
*sg = sg_next(*sg);
off = 0;
++idx;
resid -= len;
} while (resid);
if (need_padding(dlen)) {
sg_set_buf(*sg, pad, padding_len(dlen));
*sg = sg_next(*sg);
}
}
static int setup_message_sgs(struct sg_table *sgt, struct ceph_msg *msg,
u8 *front_pad, u8 *middle_pad, u8 *data_pad,
void *epilogue, bool add_tag)
void *epilogue, struct page **pages, int dpos,
bool add_tag)
{
struct ceph_msg_data_cursor cursor;
struct scatterlist *cur_sg;
int dlen = data_len(msg);
int sg_cnt;
int ret;
@ -988,9 +1024,15 @@ static int setup_message_sgs(struct sg_table *sgt, struct ceph_msg *msg,
if (middle_len(msg))
sg_cnt += calc_sg_cnt(msg->middle->vec.iov_base,
middle_len(msg));
if (data_len(msg)) {
ceph_msg_data_cursor_init(&cursor, msg, data_len(msg));
sg_cnt += calc_sg_cnt_cursor(&cursor);
if (dlen) {
if (pages) {
sg_cnt += calc_pages_for(dpos, dlen);
if (need_padding(dlen))
sg_cnt++;
} else {
ceph_msg_data_cursor_init(&cursor, msg, dlen);
sg_cnt += calc_sg_cnt_cursor(&cursor);
}
}
ret = sg_alloc_table(sgt, sg_cnt, GFP_NOIO);
@ -1004,9 +1046,13 @@ static int setup_message_sgs(struct sg_table *sgt, struct ceph_msg *msg,
if (middle_len(msg))
init_sgs(&cur_sg, msg->middle->vec.iov_base, middle_len(msg),
middle_pad);
if (data_len(msg)) {
ceph_msg_data_cursor_init(&cursor, msg, data_len(msg));
init_sgs_cursor(&cur_sg, &cursor, data_pad);
if (dlen) {
if (pages) {
init_sgs_pages(&cur_sg, pages, dpos, dlen, data_pad);
} else {
ceph_msg_data_cursor_init(&cursor, msg, dlen);
init_sgs_cursor(&cur_sg, &cursor, data_pad);
}
}
WARN_ON(!sg_is_last(cur_sg));
@ -1041,10 +1087,53 @@ static int decrypt_control_remainder(struct ceph_connection *con)
padded_len(rem_len) + CEPH_GCM_TAG_LEN);
}
/* Process sparse read data that lives in a buffer */
static int process_v2_sparse_read(struct ceph_connection *con,
struct page **pages, int spos)
{
struct ceph_msg_data_cursor *cursor = &con->v2.in_cursor;
int ret;
for (;;) {
char *buf = NULL;
ret = con->ops->sparse_read(con, cursor, &buf);
if (ret <= 0)
return ret;
dout("%s: sparse_read return %x buf %p\n", __func__, ret, buf);
do {
int idx = spos >> PAGE_SHIFT;
int soff = offset_in_page(spos);
struct page *spage = con->v2.in_enc_pages[idx];
int len = min_t(int, ret, PAGE_SIZE - soff);
if (buf) {
memcpy_from_page(buf, spage, soff, len);
buf += len;
} else {
struct bio_vec bv;
get_bvec_at(cursor, &bv);
len = min_t(int, len, bv.bv_len);
memcpy_page(bv.bv_page, bv.bv_offset,
spage, soff, len);
ceph_msg_data_advance(cursor, len);
}
spos += len;
ret -= len;
} while (ret);
}
}
static int decrypt_tail(struct ceph_connection *con)
{
struct sg_table enc_sgt = {};
struct sg_table sgt = {};
struct page **pages = NULL;
bool sparse = con->in_msg->sparse_read;
int dpos = 0;
int tail_len;
int ret;
@ -1055,9 +1144,14 @@ static int decrypt_tail(struct ceph_connection *con)
if (ret)
goto out;
if (sparse) {
dpos = padded_len(front_len(con->in_msg) + padded_len(middle_len(con->in_msg)));
pages = con->v2.in_enc_pages;
}
ret = setup_message_sgs(&sgt, con->in_msg, FRONT_PAD(con->v2.in_buf),
MIDDLE_PAD(con->v2.in_buf), DATA_PAD(con->v2.in_buf),
con->v2.in_buf, true);
MIDDLE_PAD(con->v2.in_buf), DATA_PAD(con->v2.in_buf),
con->v2.in_buf, pages, dpos, true);
if (ret)
goto out;
@ -1067,6 +1161,12 @@ static int decrypt_tail(struct ceph_connection *con)
if (ret)
goto out;
if (sparse && data_len(con->in_msg)) {
ret = process_v2_sparse_read(con, con->v2.in_enc_pages, dpos);
if (ret)
goto out;
}
WARN_ON(!con->v2.in_enc_page_cnt);
ceph_release_page_vector(con->v2.in_enc_pages,
con->v2.in_enc_page_cnt);
@ -1590,7 +1690,7 @@ static int prepare_message_secure(struct ceph_connection *con)
encode_epilogue_secure(con, false);
ret = setup_message_sgs(&sgt, con->out_msg, zerop, zerop, zerop,
&con->v2.out_epil, false);
&con->v2.out_epil, NULL, 0, false);
if (ret)
goto out;