Move parent bucket to same level as snapshot bucket

Adds a zero byte separator on composite key.
Stores child key in back reference to allow following.
Add parent bucket to get/create bucket functions.

Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
This commit is contained in:
Derek McGowan 2017-03-16 15:38:28 -07:00
parent 397499c288
commit 8383519d96
No known key found for this signature in database
GPG key ID: F58C5D0A4405ACDB

View file

@ -70,16 +70,16 @@ func (ms *boltMetastore) TransactionContext(ctx context.Context, writable bool)
return ctx, t, nil
}
func (ms *boltMetastore) withBucket(ctx context.Context, fn func(ctx context.Context, tx *bolt.Bucket) error) error {
func (ms *boltMetastore) withBucket(ctx context.Context, fn func(context.Context, *bolt.Bucket, *bolt.Bucket) error) error {
t, ok := ctx.Value(transactionKey{}).(*boltFileTransactor)
if !ok {
return errors.Errorf("no transaction in context")
}
bkt := t.tx.Bucket(bucketKeyStorageVersion).Bucket(bucketKeySnapshot)
return fn(ctx, bkt)
bkt := t.tx.Bucket(bucketKeyStorageVersion)
return fn(ctx, bkt.Bucket(bucketKeySnapshot), bkt.Bucket(bucketKeyParents))
}
func (ms *boltMetastore) createBucketIfNotExists(ctx context.Context, fn func(ctx context.Context, tx *bolt.Bucket) error) error {
func (ms *boltMetastore) createBucketIfNotExists(ctx context.Context, fn func(context.Context, *bolt.Bucket, *bolt.Bucket) error) error {
t, ok := ctx.Value(transactionKey{}).(*boltFileTransactor)
if !ok {
return errors.Errorf("no transaction in context")
@ -89,11 +89,15 @@ func (ms *boltMetastore) createBucketIfNotExists(ctx context.Context, fn func(ct
if err != nil {
return errors.Wrap(err, "failed to create version bucket")
}
bkt, err = bkt.CreateBucketIfNotExists(bucketKeySnapshot)
sbkt, err := bkt.CreateBucketIfNotExists(bucketKeySnapshot)
if err != nil {
return errors.Wrap(err, "failed to create snapshots bucket")
}
return fn(ctx, bkt)
pbkt, err := bkt.CreateBucketIfNotExists(bucketKeyParents)
if err != nil {
return errors.Wrap(err, "failed to create snapshots bucket")
}
return fn(ctx, sbkt, pbkt)
}
func fromProtoKind(k Kind) snapshot.Kind {
@ -114,21 +118,33 @@ func toProtoKind(k snapshot.Kind) Kind {
}
}
// parentKey returns a composite key of the parent and child identifiers. The
// parts of the key are separated by a zero byte.
func parentKey(parent, child uint64) []byte {
b := make([]byte, binary.Size([]uint64{parent, child}))
b := make([]byte, binary.Size([]uint64{parent, child})+1)
i := binary.PutUvarint(b, parent)
j := binary.PutUvarint(b[i:], child)
return b[0 : i+j]
j := binary.PutUvarint(b[i+1:], child)
return b[0 : i+j+1]
}
func getParent(b []byte) uint64 {
// parentPrefixKey returns the parent part of the composite key with the
// zero byte separator.
func parentPrefixKey(parent uint64) []byte {
b := make([]byte, binary.Size(parent)+1)
i := binary.PutUvarint(b, parent)
return b[0 : i+1]
}
// getParentPrefix returns the first part of the composite key which
// represents the parent identifier.
func getParentPrefix(b []byte) uint64 {
parent, _ := binary.Uvarint(b)
return parent
}
func (ms *boltMetastore) Stat(ctx context.Context, key string) (snapshot.Info, error) {
var ss Snapshot
err := ms.withBucket(ctx, func(ctx context.Context, bkt *bolt.Bucket) error {
err := ms.withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
return getSnapshot(bkt, key, &ss)
})
if err != nil {
@ -144,7 +160,7 @@ func (ms *boltMetastore) Stat(ctx context.Context, key string) (snapshot.Info, e
}
func (ms *boltMetastore) Walk(ctx context.Context, fn func(context.Context, snapshot.Info) error) error {
return ms.withBucket(ctx, func(ctx context.Context, bkt *bolt.Bucket) error {
return ms.withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
return bkt.ForEach(func(k, v []byte) error {
// skip nested buckets
if v == nil {
@ -167,7 +183,7 @@ func (ms *boltMetastore) Walk(ctx context.Context, fn func(context.Context, snap
}
func (ms *boltMetastore) CreateActive(ctx context.Context, key, parent string, readonly bool) (a storage.Active, err error) {
err = ms.createBucketIfNotExists(ctx, func(ctx context.Context, bkt *bolt.Bucket) error {
err = ms.createBucketIfNotExists(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
var (
parentS *Snapshot
)
@ -202,11 +218,9 @@ func (ms *boltMetastore) CreateActive(ctx context.Context, key, parent string, r
}
if parentS != nil {
pbkt, err := bkt.CreateBucketIfNotExists(bucketKeyParents)
if err != nil {
return errors.Wrap(err, "failed to create parent bucket")
}
if err := pbkt.Put(parentKey(parentS.ID, ss.ID), nil); err != nil {
// Store a backlink from the key to the parent. Store the snapshot name
// as the value to allow following the backlink to the snapshot value.
if err := pbkt.Put(parentKey(parentS.ID, ss.ID), []byte(key)); err != nil {
return errors.Wrap(err, "failed to write parent link")
}
@ -229,7 +243,7 @@ func (ms *boltMetastore) CreateActive(ctx context.Context, key, parent string, r
}
func (ms *boltMetastore) GetActive(ctx context.Context, key string) (a storage.Active, err error) {
err = ms.withBucket(ctx, func(ctx context.Context, bkt *bolt.Bucket) error {
err = ms.withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
b := bkt.Get([]byte(key))
if len(b) == 0 {
return errors.Errorf("active not found")
@ -285,7 +299,7 @@ func (ms *boltMetastore) parents(bkt *bolt.Bucket, parent *Snapshot) (parents []
}
func (ms *boltMetastore) Remove(ctx context.Context, key string) (id string, k snapshot.Kind, err error) {
err = ms.withBucket(ctx, func(ctx context.Context, bkt *bolt.Bucket) error {
err = ms.withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
var ss Snapshot
b := bkt.Get([]byte(key))
if len(b) == 0 {
@ -296,10 +310,9 @@ func (ms *boltMetastore) Remove(ctx context.Context, key string) (id string, k s
return errors.Wrap(err, "failed to unmarshal snapshot")
}
pbkt := bkt.Bucket(bucketKeyParents)
if pbkt != nil {
k, _ := pbkt.Cursor().Seek(parentKey(ss.ID, 0))
if getParent(k) == ss.ID {
k, _ := pbkt.Cursor().Seek(parentPrefixKey(ss.ID))
if getParentPrefix(k) == ss.ID {
return errors.Errorf("cannot remove snapshot with child")
}
@ -329,7 +342,7 @@ func (ms *boltMetastore) Remove(ctx context.Context, key string) (id string, k s
}
func (ms *boltMetastore) Commit(ctx context.Context, key, name string) (id string, err error) {
err = ms.withBucket(ctx, func(ctx context.Context, bkt *bolt.Bucket) error {
err = ms.withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
b := bkt.Get([]byte(name))
if len(b) != 0 {
return errors.Errorf("key already exists")