bcachefs: Run btree updates after write out of write_point

In the write path, after the write to the block device(s) complete we
have to punt to process context to do the btree update.

Instead of using the work item embedded in op->cl, this patch switches
to a per write-point work item. This helps with two different issues:

 - lock contention: btree updates to the same writepoint will (usually)
   be updating the same alloc keys
 - context switch overhead: when we're bottlenecked on btree updates,
   having a thread (running out of a work item) checking the write point
   for completed ops is cheaper than queueing up a new work item and
   waking up a kworker.

In an arbitrary benchmark, 4k random writes with fio running inside a
VM, this patch resulted in a 10% improvement in total iops.

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
This commit is contained in:
Kent Overstreet 2022-10-31 16:13:05 -04:00
parent 5f41739403
commit b17d3cec14
11 changed files with 248 additions and 139 deletions

View file

@ -762,7 +762,7 @@ static struct write_point *writepoint_find(struct bch_fs *c,
/*
* Get us an open_bucket we can allocate from, return with it locked:
*/
struct write_point *bch2_alloc_sectors_start(struct bch_fs *c,
int bch2_alloc_sectors_start(struct bch_fs *c,
unsigned target,
unsigned erasure_code,
struct write_point_specifier write_point,
@ -771,7 +771,8 @@ struct write_point *bch2_alloc_sectors_start(struct bch_fs *c,
unsigned nr_replicas_required,
enum alloc_reserve reserve,
unsigned flags,
struct closure *cl)
struct closure *cl,
struct write_point **wp_ret)
{
struct write_point *wp;
struct open_bucket *ob;
@ -792,7 +793,7 @@ struct write_point *bch2_alloc_sectors_start(struct bch_fs *c,
write_points_nr = c->write_points_nr;
have_cache = false;
wp = writepoint_find(c, write_point.v);
*wp_ret = wp = writepoint_find(c, write_point.v);
if (wp->data_type == BCH_DATA_user)
ob_flags |= BUCKET_MAY_ALLOC_PARTIAL;
@ -848,7 +849,7 @@ struct write_point *bch2_alloc_sectors_start(struct bch_fs *c,
BUG_ON(!wp->sectors_free || wp->sectors_free == UINT_MAX);
return wp;
return 0;
err:
open_bucket_for_each(c, &wp->ptrs, ob, i)
if (ptrs.nr < ARRAY_SIZE(ptrs.v))
@ -866,9 +867,9 @@ struct write_point *bch2_alloc_sectors_start(struct bch_fs *c,
switch (ret) {
case -OPEN_BUCKETS_EMPTY:
case -FREELIST_EMPTY:
return cl ? ERR_PTR(-EAGAIN) : ERR_PTR(-ENOSPC);
return cl ? -EAGAIN : -ENOSPC;
case -INSUFFICIENT_DEVICES:
return ERR_PTR(-EROFS);
return -EROFS;
default:
BUG();
}
@ -895,13 +896,13 @@ struct bch_extent_ptr bch2_ob_ptr(struct bch_fs *c, struct open_bucket *ob)
void bch2_alloc_sectors_append_ptrs(struct bch_fs *c, struct write_point *wp,
struct bkey_i *k, unsigned sectors,
bool cached)
{
struct open_bucket *ob;
unsigned i;
BUG_ON(sectors > wp->sectors_free);
wp->sectors_free -= sectors;
wp->sectors_allocated += sectors;
open_bucket_for_each(c, &wp->ptrs, ob, i) {
struct bch_dev *ca = bch_dev_bkey_exists(c, ob->dev);
@ -942,6 +943,10 @@ static inline void writepoint_init(struct write_point *wp,
{
mutex_init(&wp->lock);
wp->data_type = type;
INIT_WORK(&wp->index_update_work, bch2_write_point_do_index_updates);
INIT_LIST_HEAD(&wp->writes);
spin_lock_init(&wp->writes_lock);
}
void bch2_fs_allocator_foreground_init(struct bch_fs *c)
@ -997,3 +1002,33 @@ void bch2_open_buckets_to_text(struct printbuf *out, struct bch_fs *c)
}
}
static const char * const bch2_write_point_states[] = {
#define x(n) #n,
WRITE_POINT_STATES()
#undef x
NULL
};
void bch2_write_points_to_text(struct printbuf *out, struct bch_fs *c)
{
struct write_point *wp;
unsigned i;
for (wp = c->write_points;
wp < c->write_points + ARRAY_SIZE(c->write_points);
wp++) {
pr_buf(out, "%lu: ", wp->write_point);
bch2_hprint(out, wp->sectors_allocated);
pr_buf(out, " last wrote: ");
bch2_pr_time_units(out, sched_clock() - wp->last_used);
for (i = 0; i < WRITE_POINT_STATE_NR; i++) {
pr_buf(out, " %s: ", bch2_write_point_states[i]);
bch2_pr_time_units(out, wp->time[i]);
}
pr_newline(out);
}
}

View file

@ -122,14 +122,15 @@ int bch2_bucket_alloc_set(struct bch_fs *, struct open_buckets *,
unsigned, unsigned *, bool *, enum alloc_reserve,
unsigned, struct closure *);
struct write_point *bch2_alloc_sectors_start(struct bch_fs *,
int bch2_alloc_sectors_start(struct bch_fs *,
unsigned, unsigned,
struct write_point_specifier,
struct bch_devs_list *,
unsigned, unsigned,
enum alloc_reserve,
unsigned,
struct closure *);
struct closure *,
struct write_point **);
struct bch_extent_ptr bch2_ob_ptr(struct bch_fs *, struct open_bucket *);
void bch2_alloc_sectors_append_ptrs(struct bch_fs *, struct write_point *,
@ -156,4 +157,6 @@ void bch2_fs_allocator_foreground_init(struct bch_fs *);
void bch2_open_buckets_to_text(struct printbuf *, struct bch_fs *);
void bch2_write_points_to_text(struct printbuf *, struct bch_fs *);
#endif /* _BCACHEFS_ALLOC_FOREGROUND_H */

View file

@ -81,7 +81,21 @@ struct dev_stripe_state {
u64 next_alloc[BCH_SB_MEMBERS_MAX];
};
#define WRITE_POINT_STATES() \
x(stopped) \
x(waiting_io) \
x(waiting_work) \
x(running)
enum write_point_state {
#define x(n) WRITE_POINT_##n,
WRITE_POINT_STATES()
#undef x
WRITE_POINT_STATE_NR
};
struct write_point {
struct {
struct hlist_node node;
struct mutex lock;
u64 last_used;
@ -93,6 +107,20 @@ struct write_point {
struct open_buckets ptrs;
struct dev_stripe_state stripe;
u64 sectors_allocated;
} __attribute__((__aligned__(SMP_CACHE_BYTES)));
struct {
struct work_struct index_update_work;
struct list_head writes;
spinlock_t writes_lock;
enum write_point_state state;
u64 last_state_change;
u64 time[WRITE_POINT_STATE_NR];
} __attribute__((__aligned__(SMP_CACHE_BYTES)));
};
struct write_point_specifier {

View file

@ -191,6 +191,7 @@ static struct btree *__bch2_btree_node_alloc(struct bch_fs *c,
struct bch_devs_list devs_have = (struct bch_devs_list) { 0 };
unsigned nr_reserve;
enum alloc_reserve alloc_reserve;
int ret;
if (flags & BTREE_INSERT_USE_RESERVE) {
nr_reserve = 0;
@ -213,7 +214,7 @@ static struct btree *__bch2_btree_node_alloc(struct bch_fs *c,
mutex_unlock(&c->btree_reserve_cache_lock);
retry:
wp = bch2_alloc_sectors_start(c,
ret = bch2_alloc_sectors_start(c,
c->opts.metadata_target ?:
c->opts.foreground_target,
0,
@ -221,9 +222,9 @@ static struct btree *__bch2_btree_node_alloc(struct bch_fs *c,
&devs_have,
res->nr_replicas,
c->opts.metadata_replicas_required,
alloc_reserve, 0, cl);
if (IS_ERR(wp))
return ERR_CAST(wp);
alloc_reserve, 0, cl, &wp);
if (unlikely(ret))
return ERR_PTR(ret);
if (wp->sectors_free < btree_sectors(c)) {
struct open_bucket *ob;

View file

@ -589,7 +589,7 @@ void bch2_submit_wbio_replicas(struct bch_write_bio *wbio, struct bch_fs *c,
}
}
static void __bch2_write(struct closure *);
static void __bch2_write(struct bch_write_op *);
static void bch2_write_done(struct closure *cl)
{
@ -686,22 +686,86 @@ static void __bch2_write_index(struct bch_write_op *op)
goto out;
}
static inline void __wp_update_state(struct write_point *wp, enum write_point_state state)
{
if (state != wp->state) {
u64 now = ktime_get_ns();
if (wp->last_state_change &&
time_after64(now, wp->last_state_change))
wp->time[wp->state] += now - wp->last_state_change;
wp->state = state;
wp->last_state_change = now;
}
}
static inline void wp_update_state(struct write_point *wp, bool running)
{
enum write_point_state state;
state = running ? WRITE_POINT_running :
!list_empty(&wp->writes) ? WRITE_POINT_waiting_io
: WRITE_POINT_stopped;
__wp_update_state(wp, state);
}
static void bch2_write_index(struct closure *cl)
{
struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
struct bch_fs *c = op->c;
struct write_point *wp = op->wp;
struct workqueue_struct *wq = index_update_wq(op);
barrier();
/*
* We're not using wp->writes_lock here, so this is racey: that's ok,
* because this is just for diagnostic purposes, and we're running out
* of interrupt context here so if we were to take the log we'd have to
* switch to spin_lock_irq()/irqsave(), which is not free:
*/
if (wp->state == WRITE_POINT_waiting_io)
__wp_update_state(wp, WRITE_POINT_waiting_work);
op->btree_update_ready = true;
queue_work(wq, &wp->index_update_work);
}
void bch2_write_point_do_index_updates(struct work_struct *work)
{
struct write_point *wp =
container_of(work, struct write_point, index_update_work);
struct bch_write_op *op;
while (1) {
spin_lock(&wp->writes_lock);
list_for_each_entry(op, &wp->writes, wp_list)
if (op->btree_update_ready) {
list_del(&op->wp_list);
goto unlock;
}
op = NULL;
unlock:
wp_update_state(wp, op != NULL);
spin_unlock(&wp->writes_lock);
if (!op)
break;
op->flags |= BCH_WRITE_IN_WORKER;
__bch2_write_index(op);
if (!(op->flags & BCH_WRITE_DONE)) {
continue_at(cl, __bch2_write, index_update_wq(op));
__bch2_write(op);
} else if (!op->error && (op->flags & BCH_WRITE_FLUSH)) {
bch2_journal_flush_seq_async(&c->journal,
bch2_journal_flush_seq_async(&op->c->journal,
*op_journal_seq(op),
cl);
continue_at(cl, bch2_write_done, index_update_wq(op));
&op->cl);
continue_at(&op->cl, bch2_write_done, index_update_wq(op));
} else {
continue_at_nobarrier(cl, bch2_write_done, NULL);
bch2_write_done(&op->cl);
}
}
}
@ -734,10 +798,8 @@ static void bch2_write_endio(struct bio *bio)
if (parent)
bio_endio(&parent->bio);
else if (!(op->flags & BCH_WRITE_SKIP_CLOSURE_PUT))
closure_put(cl);
else
continue_at_nobarrier(cl, bch2_write_index, index_update_wq(op));
closure_put(cl);
}
static void init_append_extent(struct bch_write_op *op,
@ -1136,19 +1198,18 @@ static int bch2_write_extent(struct bch_write_op *op, struct write_point *wp,
return ret;
}
static void __bch2_write(struct closure *cl)
static void __bch2_write(struct bch_write_op *op)
{
struct bch_write_op *op = container_of(cl, struct bch_write_op, cl);
struct bch_fs *c = op->c;
struct write_point *wp;
struct write_point *wp = NULL;
struct bio *bio = NULL;
bool skip_put = true;
unsigned nofs_flags;
int ret;
nofs_flags = memalloc_nofs_save();
again:
memset(&op->failed, 0, sizeof(op->failed));
op->btree_update_ready = false;
do {
struct bkey_i *key_to_write;
@ -1158,13 +1219,13 @@ static void __bch2_write(struct closure *cl)
/* +1 for possible cache device: */
if (op->open_buckets.nr + op->nr_replicas + 1 >
ARRAY_SIZE(op->open_buckets.v))
goto flush_io;
break;
if (bch2_keylist_realloc(&op->insert_keys,
op->inline_keys,
ARRAY_SIZE(op->inline_keys),
BKEY_EXTENT_U64s_MAX))
goto flush_io;
break;
if ((op->flags & BCH_WRITE_FROM_INTERNAL) &&
percpu_ref_is_dying(&c->writes)) {
@ -1177,7 +1238,7 @@ static void __bch2_write(struct closure *cl)
* freeing up space on specific disks, which means that
* allocations for specific disks may hang arbitrarily long:
*/
wp = bch2_alloc_sectors_start(c,
ret = bch2_alloc_sectors_start(c,
op->target,
op->opts.erasure_code && !(op->flags & BCH_WRITE_CACHED),
op->write_point,
@ -1187,53 +1248,34 @@ static void __bch2_write(struct closure *cl)
op->alloc_reserve,
op->flags,
(op->flags & (BCH_WRITE_ALLOC_NOWAIT|
BCH_WRITE_ONLY_SPECIFIED_DEVS)) ? NULL : cl);
EBUG_ON(!wp);
if (unlikely(IS_ERR(wp))) {
if (unlikely(PTR_ERR(wp) != -EAGAIN)) {
ret = PTR_ERR(wp);
BCH_WRITE_ONLY_SPECIFIED_DEVS))
? NULL : &op->cl,
&wp);
if (unlikely(ret)) {
if (unlikely(ret != -EAGAIN))
goto err;
break;
}
goto flush_io;
}
/*
* It's possible for the allocator to fail, put us on the
* freelist waitlist, and then succeed in one of various retry
* paths: if that happens, we need to disable the skip_put
* optimization because otherwise there won't necessarily be a
* barrier before we free the bch_write_op:
*/
if (atomic_read(&cl->remaining) & CLOSURE_WAITING)
skip_put = false;
EBUG_ON(!wp);
bch2_open_bucket_get(c, wp, &op->open_buckets);
ret = bch2_write_extent(op, wp, &bio);
bch2_alloc_sectors_done(c, wp);
if (ret < 0)
goto err;
if (ret) {
skip_put = false;
} else {
/*
* for the skip_put optimization this has to be set
* before we submit the bio:
*/
if (!ret)
op->flags |= BCH_WRITE_DONE;
}
bio->bi_end_io = bch2_write_endio;
bio->bi_private = &op->cl;
bio->bi_opf |= REQ_OP_WRITE;
if (!skip_put)
closure_get(bio->bi_private);
else
op->flags |= BCH_WRITE_SKIP_CLOSURE_PUT;
key_to_write = (void *) (op->insert_keys.keys_p +
key_to_write_offset);
@ -1241,55 +1283,49 @@ static void __bch2_write(struct closure *cl)
bch2_submit_wbio_replicas(to_wbio(bio), c, BCH_DATA_user,
key_to_write);
} while (ret);
if (!skip_put)
continue_at(cl, bch2_write_index, index_update_wq(op));
out:
/*
* If the write can't all be submitted at once, we generally want to
* block synchronously as that signals backpressure to the caller.
*/
if (!(op->flags & BCH_WRITE_DONE) &&
!(op->flags & BCH_WRITE_IN_WORKER)) {
closure_sync(&op->cl);
__bch2_write_index(op);
if (!(op->flags & BCH_WRITE_DONE))
goto again;
bch2_write_done(&op->cl);
} else {
spin_lock(&wp->writes_lock);
op->wp = wp;
list_add_tail(&op->wp_list, &wp->writes);
if (wp->state == WRITE_POINT_stopped)
__wp_update_state(wp, WRITE_POINT_waiting_io);
spin_unlock(&wp->writes_lock);
continue_at(&op->cl, bch2_write_index, NULL);
}
memalloc_nofs_restore(nofs_flags);
return;
err:
op->error = ret;
op->flags |= BCH_WRITE_DONE;
continue_at(cl, bch2_write_index, index_update_wq(op));
goto out;
flush_io:
/*
* If the write can't all be submitted at once, we generally want to
* block synchronously as that signals backpressure to the caller.
*
* However, if we're running out of a workqueue, we can't block here
* because we'll be blocking other work items from completing:
*/
if (current->flags & PF_WQ_WORKER) {
continue_at(cl, bch2_write_index, index_update_wq(op));
goto out;
}
closure_sync(cl);
if (!bch2_keylist_empty(&op->insert_keys)) {
__bch2_write_index(op);
if (op->error) {
op->flags |= BCH_WRITE_DONE;
continue_at_nobarrier(cl, bch2_write_done, NULL);
goto out;
}
}
goto again;
}
static void bch2_write_data_inline(struct bch_write_op *op, unsigned data_len)
{
struct closure *cl = &op->cl;
struct bio *bio = &op->wbio.bio;
struct bvec_iter iter;
struct bkey_i_inline_data *id;
unsigned sectors;
int ret;
op->flags |= BCH_WRITE_WROTE_DATA_INLINE;
op->flags |= BCH_WRITE_DONE;
bch2_check_set_feature(op->c, BCH_FEATURE_inline_data);
ret = bch2_keylist_realloc(&op->insert_keys, op->inline_keys,
@ -1317,11 +1353,7 @@ static void bch2_write_data_inline(struct bch_write_op *op, unsigned data_len)
set_bkey_val_bytes(&id->k, data_len);
bch2_keylist_push(&op->insert_keys);
op->flags |= BCH_WRITE_WROTE_DATA_INLINE;
op->flags |= BCH_WRITE_DONE;
continue_at_nobarrier(cl, bch2_write_index, NULL);
return;
__bch2_write_index(op);
err:
bch2_write_done(&op->cl);
}
@ -1349,6 +1381,7 @@ void bch2_write(struct closure *cl)
struct bch_fs *c = op->c;
unsigned data_len;
EBUG_ON(op->cl.parent);
BUG_ON(!op->nr_replicas);
BUG_ON(!op->write_point.v);
BUG_ON(!bkey_cmp(op->pos, POS_MAX));
@ -1381,18 +1414,14 @@ void bch2_write(struct closure *cl)
return;
}
continue_at_nobarrier(cl, __bch2_write, NULL);
__bch2_write(op);
return;
err:
bch2_disk_reservation_put(c, &op->res);
if (op->end_io) {
EBUG_ON(cl->parent);
closure_debug_destroy(cl);
closure_debug_destroy(&op->cl);
if (op->end_io)
op->end_io(op);
} else {
closure_return(cl);
}
}
/* Cache promotion on read */

View file

@ -41,7 +41,7 @@ enum bch_write_flags {
__BCH_WRITE_CHECK_ENOSPC,
__BCH_WRITE_MOVE,
__BCH_WRITE_JOURNAL_SEQ_PTR,
__BCH_WRITE_SKIP_CLOSURE_PUT,
__BCH_WRITE_IN_WORKER,
__BCH_WRITE_DONE,
};
@ -59,7 +59,7 @@ enum bch_write_flags {
/* Internal: */
#define BCH_WRITE_JOURNAL_SEQ_PTR (1U << __BCH_WRITE_JOURNAL_SEQ_PTR)
#define BCH_WRITE_SKIP_CLOSURE_PUT (1U << __BCH_WRITE_SKIP_CLOSURE_PUT)
#define BCH_WRITE_IN_WORKER (1U << __BCH_WRITE_IN_WORKER)
#define BCH_WRITE_DONE (1U << __BCH_WRITE_DONE)
static inline u64 *op_journal_seq(struct bch_write_op *op)
@ -115,6 +115,8 @@ static inline void bch2_write_op_init(struct bch_write_op *op, struct bch_fs *c,
void bch2_write(struct closure *);
void bch2_write_point_do_index_updates(struct work_struct *);
static inline struct bch_write_bio *wbio_init(struct bio *bio)
{
struct bch_write_bio *wbio = to_wbio(bio);

View file

@ -119,6 +119,7 @@ struct bch_write_op {
unsigned nr_replicas_required:4;
unsigned alloc_reserve:3;
unsigned incompressible:1;
unsigned btree_update_ready:1;
struct bch_devs_list devs_have;
u16 target;
@ -134,6 +135,9 @@ struct bch_write_op {
struct write_point_specifier write_point;
struct write_point *wp;
struct list_head wp_list;
struct disk_reservation res;
struct open_buckets open_buckets;

View file

@ -791,7 +791,7 @@ static struct bch_fs *bch2_fs_alloc(struct bch_sb *sb, struct bch_opts opts)
c->inode_shard_bits = ilog2(roundup_pow_of_two(num_possible_cpus()));
if (!(c->btree_update_wq = alloc_workqueue("bcachefs",
WQ_FREEZABLE|WQ_MEM_RECLAIM, 1)) ||
WQ_FREEZABLE|WQ_UNBOUND|WQ_MEM_RECLAIM, 512)) ||
!(c->btree_io_complete_wq = alloc_workqueue("bcachefs_btree_io",
WQ_FREEZABLE|WQ_MEM_RECLAIM, 1)) ||
!(c->copygc_wq = alloc_workqueue("bcachefs_copygc",

View file

@ -180,6 +180,7 @@ read_attribute(btree_key_cache);
read_attribute(btree_transactions);
read_attribute(stripes_heap);
read_attribute(open_buckets);
read_attribute(write_points);
read_attribute(internal_uuid);
@ -418,6 +419,9 @@ SHOW(bch2_fs)
if (attr == &sysfs_open_buckets)
bch2_open_buckets_to_text(out, c);
if (attr == &sysfs_write_points)
bch2_write_points_to_text(out, c);
if (attr == &sysfs_compression_stats)
bch2_compression_stats_to_text(out, c);
@ -563,6 +567,7 @@ struct attribute *bch2_fs_internal_files[] = {
&sysfs_new_stripes,
&sysfs_stripes_heap,
&sysfs_open_buckets,
&sysfs_write_points,
&sysfs_io_timers_read,
&sysfs_io_timers_write,

View file

@ -390,7 +390,7 @@ static const struct time_unit *pick_time_units(u64 ns)
return u;
}
static void pr_time_units(struct printbuf *out, u64 ns)
void bch2_pr_time_units(struct printbuf *out, u64 ns)
{
const struct time_unit *u = pick_time_units(ns);
@ -410,13 +410,13 @@ void bch2_time_stats_to_text(struct printbuf *out, struct bch2_time_stats *stats
freq ? div64_u64(NSEC_PER_SEC, freq) : 0);
pr_buf(out, "frequency:\t");
pr_time_units(out, freq);
bch2_pr_time_units(out, freq);
pr_buf(out, "\navg duration:\t");
pr_time_units(out, stats->average_duration);
bch2_pr_time_units(out, stats->average_duration);
pr_buf(out, "\nmax duration:\t");
pr_time_units(out, stats->max_duration);
bch2_pr_time_units(out, stats->max_duration);
i = eytzinger0_first(NR_QUANTILES);
u = pick_time_units(stats->quantiles.entries[i].m);

View file

@ -352,6 +352,8 @@ static inline void pr_sectors(struct printbuf *out, u64 v)
bch2_pr_units(out, v, v << 9);
}
void bch2_pr_time_units(struct printbuf *, u64);
#ifdef __KERNEL__
static inline void pr_time(struct printbuf *out, u64 time)
{