diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index cbc89fa9a677..4515b128d0b4 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -42,6 +42,7 @@ #include #include #include +#include #include "rbd_types.h" @@ -332,7 +333,10 @@ struct rbd_device { char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ + struct list_head rq_queue; /* incoming rq queue */ spinlock_t lock; /* queue, flags, open_count */ + struct workqueue_struct *rq_wq; + struct work_struct rq_work; struct rbd_image_header header; unsigned long flags; /* possibly lock protected */ @@ -3176,21 +3180,116 @@ out: return ret; } +static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq) +{ + struct rbd_img_request *img_request; + u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT; + u64 length = blk_rq_bytes(rq); + bool wr = rq_data_dir(rq) == WRITE; + int result; + + /* Ignore/skip any zero-length requests */ + + if (!length) { + dout("%s: zero-length request\n", __func__); + result = 0; + goto err_rq; + } + + /* Disallow writes to a read-only device */ + + if (wr) { + if (rbd_dev->mapping.read_only) { + result = -EROFS; + goto err_rq; + } + rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); + } + + /* + * Quit early if the mapped snapshot no longer exists. It's + * still possible the snapshot will have disappeared by the + * time our request arrives at the osd, but there's no sense in + * sending it if we already know. + */ + if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { + dout("request for non-existent snapshot"); + rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); + result = -ENXIO; + goto err_rq; + } + + if (offset && length > U64_MAX - offset + 1) { + rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset, + length); + result = -EINVAL; + goto err_rq; /* Shouldn't happen */ + } + + if (offset + length > rbd_dev->mapping.size) { + rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset, + length, rbd_dev->mapping.size); + result = -EIO; + goto err_rq; + } + + img_request = rbd_img_request_create(rbd_dev, offset, length, wr); + if (!img_request) { + result = -ENOMEM; + goto err_rq; + } + img_request->rq = rq; + + result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio); + if (result) + goto err_img_request; + + result = rbd_img_request_submit(img_request); + if (result) + goto err_img_request; + + return; + +err_img_request: + rbd_img_request_put(img_request); +err_rq: + if (result) + rbd_warn(rbd_dev, "%s %llx at %llx result %d", + wr ? "write" : "read", length, offset, result); + blk_end_request_all(rq, result); +} + +static void rbd_request_workfn(struct work_struct *work) +{ + struct rbd_device *rbd_dev = + container_of(work, struct rbd_device, rq_work); + struct request *rq, *next; + LIST_HEAD(requests); + + spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */ + list_splice_init(&rbd_dev->rq_queue, &requests); + spin_unlock_irq(&rbd_dev->lock); + + list_for_each_entry_safe(rq, next, &requests, queuelist) { + list_del_init(&rq->queuelist); + rbd_handle_request(rbd_dev, rq); + } +} + +/* + * Called with q->queue_lock held and interrupts disabled, possibly on + * the way to schedule(). Do not sleep here! + */ static void rbd_request_fn(struct request_queue *q) - __releases(q->queue_lock) __acquires(q->queue_lock) { struct rbd_device *rbd_dev = q->queuedata; struct request *rq; - int result; + int queued = 0; + + rbd_assert(rbd_dev); while ((rq = blk_fetch_request(q))) { - bool write_request = rq_data_dir(rq) == WRITE; - struct rbd_img_request *img_request; - u64 offset; - u64 length; - /* Ignore any non-FS requests that filter through. */ - if (rq->cmd_type != REQ_TYPE_FS) { dout("%s: non-fs request type %d\n", __func__, (int) rq->cmd_type); @@ -3198,80 +3297,12 @@ static void rbd_request_fn(struct request_queue *q) continue; } - /* Ignore/skip any zero-length requests */ - - offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; - length = (u64) blk_rq_bytes(rq); - - if (!length) { - dout("%s: zero-length request\n", __func__); - __blk_end_request_all(rq, 0); - continue; - } - - spin_unlock_irq(q->queue_lock); - - /* Disallow writes to a read-only device */ - - if (write_request) { - result = -EROFS; - if (rbd_dev->mapping.read_only) - goto end_request; - rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); - } - - /* - * Quit early if the mapped snapshot no longer - * exists. It's still possible the snapshot will - * have disappeared by the time our request arrives - * at the osd, but there's no sense in sending it if - * we already know. - */ - if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { - dout("request for non-existent snapshot"); - rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); - result = -ENXIO; - goto end_request; - } - - result = -EINVAL; - if (offset && length > U64_MAX - offset + 1) { - rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n", - offset, length); - goto end_request; /* Shouldn't happen */ - } - - result = -EIO; - if (offset + length > rbd_dev->mapping.size) { - rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n", - offset, length, rbd_dev->mapping.size); - goto end_request; - } - - result = -ENOMEM; - img_request = rbd_img_request_create(rbd_dev, offset, length, - write_request); - if (!img_request) - goto end_request; - - img_request->rq = rq; - - result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, - rq->bio); - if (!result) - result = rbd_img_request_submit(img_request); - if (result) - rbd_img_request_put(img_request); -end_request: - spin_lock_irq(q->queue_lock); - if (result < 0) { - rbd_warn(rbd_dev, "%s %llx at %llx result %d\n", - write_request ? "write" : "read", - length, offset, result); - - __blk_end_request_all(rq, result); - } + list_add_tail(&rq->queuelist, &rbd_dev->rq_queue); + queued++; } + + if (queued) + queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work); } /* @@ -3847,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, return NULL; spin_lock_init(&rbd_dev->lock); + INIT_LIST_HEAD(&rbd_dev->rq_queue); + INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn); rbd_dev->flags = 0; atomic_set(&rbd_dev->parent_ref, 0); INIT_LIST_HEAD(&rbd_dev->node); @@ -5051,12 +5084,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) ret = rbd_dev_mapping_set(rbd_dev); if (ret) goto err_out_disk; + set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only); + rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0); + if (!rbd_dev->rq_wq) + goto err_out_mapping; + ret = rbd_bus_add_dev(rbd_dev); if (ret) - goto err_out_mapping; + goto err_out_workqueue; /* Everything's ready. Announce the disk to the world. */ @@ -5068,6 +5106,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) return ret; +err_out_workqueue: + destroy_workqueue(rbd_dev->rq_wq); + rbd_dev->rq_wq = NULL; err_out_mapping: rbd_dev_mapping_clear(rbd_dev); err_out_disk: @@ -5314,6 +5355,7 @@ static void rbd_dev_device_release(struct device *dev) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + destroy_workqueue(rbd_dev->rq_wq); rbd_free_disk(rbd_dev); clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); rbd_dev_mapping_clear(rbd_dev);