Merge pull request #364 from ncdc/resumable-digest-optional
Use a build flag to disable resumable digests
This commit is contained in:
commit
75983a4a7f
6 changed files with 133 additions and 61 deletions
|
@ -2,12 +2,7 @@ package digest
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"fmt"
|
|
||||||
"hash"
|
"hash"
|
||||||
|
|
||||||
"github.com/jlhawn/go-crypto" // For ResumableHash
|
|
||||||
_ "github.com/jlhawn/go-crypto/sha256" // For Resumable SHA256
|
|
||||||
_ "github.com/jlhawn/go-crypto/sha512" // For Resumable SHA384, SHA512
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Digester calculates the digest of written data. It is functionally
|
// Digester calculates the digest of written data. It is functionally
|
||||||
|
@ -38,43 +33,22 @@ func (d *Digester) Digest() Digest {
|
||||||
return NewDigest(d.alg, d.Hash)
|
return NewDigest(d.alg, d.Hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResumableHash is the common interface implemented by all resumable hash
|
||||||
|
// functions.
|
||||||
|
type ResumableHash interface {
|
||||||
|
// ResumableHash is a superset of hash.Hash
|
||||||
|
hash.Hash
|
||||||
|
// Len returns the number of bytes written to the Hash so far.
|
||||||
|
Len() uint64
|
||||||
|
// State returns a snapshot of the state of the Hash.
|
||||||
|
State() ([]byte, error)
|
||||||
|
// Restore resets the Hash to the given state.
|
||||||
|
Restore(state []byte) error
|
||||||
|
}
|
||||||
|
|
||||||
// ResumableDigester is a digester that can export its internal state and be
|
// ResumableDigester is a digester that can export its internal state and be
|
||||||
// restored from saved state.
|
// restored from saved state.
|
||||||
type ResumableDigester struct {
|
type ResumableDigester interface {
|
||||||
alg string
|
ResumableHash
|
||||||
crypto.ResumableHash
|
Digest() Digest
|
||||||
}
|
|
||||||
|
|
||||||
var resumableHashAlgs = map[string]crypto.Hash{
|
|
||||||
"sha256": crypto.SHA256,
|
|
||||||
"sha384": crypto.SHA384,
|
|
||||||
"sha512": crypto.SHA512,
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewResumableDigester creates a new ResumableDigester with the given hashing
|
|
||||||
// algorithm.
|
|
||||||
func NewResumableDigester(alg string) (ResumableDigester, error) {
|
|
||||||
hash, supported := resumableHashAlgs[alg]
|
|
||||||
if !supported {
|
|
||||||
return ResumableDigester{}, fmt.Errorf("unsupported resumable hash algorithm: %s", alg)
|
|
||||||
}
|
|
||||||
|
|
||||||
return ResumableDigester{
|
|
||||||
alg: alg,
|
|
||||||
ResumableHash: hash.New(),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewCanonicalResumableDigester creates a ResumableDigester using the default
|
|
||||||
// digest algorithm.
|
|
||||||
func NewCanonicalResumableDigester() ResumableDigester {
|
|
||||||
return ResumableDigester{
|
|
||||||
alg: "sha256",
|
|
||||||
ResumableHash: crypto.SHA256.New(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Digest returns the current digest for this resumable digester.
|
|
||||||
func (d ResumableDigester) Digest() Digest {
|
|
||||||
return NewDigest(d.alg, d.ResumableHash)
|
|
||||||
}
|
}
|
||||||
|
|
52
digest/digester_resumable.go
Normal file
52
digest/digester_resumable.go
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
// +build !noresumabledigest
|
||||||
|
|
||||||
|
package digest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/jlhawn/go-crypto"
|
||||||
|
// For ResumableHash
|
||||||
|
_ "github.com/jlhawn/go-crypto/sha256" // For Resumable SHA256
|
||||||
|
_ "github.com/jlhawn/go-crypto/sha512" // For Resumable SHA384, SHA512
|
||||||
|
)
|
||||||
|
|
||||||
|
// resumableDigester implements ResumableDigester.
|
||||||
|
type resumableDigester struct {
|
||||||
|
alg string
|
||||||
|
crypto.ResumableHash
|
||||||
|
}
|
||||||
|
|
||||||
|
var resumableHashAlgs = map[string]crypto.Hash{
|
||||||
|
"sha256": crypto.SHA256,
|
||||||
|
"sha384": crypto.SHA384,
|
||||||
|
"sha512": crypto.SHA512,
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewResumableDigester creates a new ResumableDigester with the given hashing
|
||||||
|
// algorithm.
|
||||||
|
func NewResumableDigester(alg string) (ResumableDigester, error) {
|
||||||
|
hash, supported := resumableHashAlgs[alg]
|
||||||
|
if !supported {
|
||||||
|
return resumableDigester{}, fmt.Errorf("unsupported resumable hash algorithm: %s", alg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return resumableDigester{
|
||||||
|
alg: alg,
|
||||||
|
ResumableHash: hash.New(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCanonicalResumableDigester creates a ResumableDigester using the default
|
||||||
|
// digest algorithm.
|
||||||
|
func NewCanonicalResumableDigester() ResumableDigester {
|
||||||
|
return resumableDigester{
|
||||||
|
alg: "sha256",
|
||||||
|
ResumableHash: crypto.SHA256.New(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Digest returns the current digest for this resumable digester.
|
||||||
|
func (d resumableDigester) Digest() Digest {
|
||||||
|
return NewDigest(d.alg, d.ResumableHash)
|
||||||
|
}
|
|
@ -138,13 +138,16 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &layerWriter{
|
lw := &layerWriter{
|
||||||
layerStore: ls,
|
layerStore: ls,
|
||||||
uuid: uuid,
|
uuid: uuid,
|
||||||
startedAt: startedAt,
|
startedAt: startedAt,
|
||||||
resumableDigester: digest.NewCanonicalResumableDigester(),
|
|
||||||
bufferedFileWriter: *fw,
|
bufferedFileWriter: *fw,
|
||||||
}, nil
|
}
|
||||||
|
|
||||||
|
lw.setupResumableDigester()
|
||||||
|
|
||||||
|
return lw, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ls *layerStore) path(dgst digest.Digest) (string, error) {
|
func (ls *layerStore) path(dgst digest.Digest) (string, error) {
|
||||||
|
|
|
@ -87,6 +87,10 @@ func (lw *layerWriter) Cancel() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lw *layerWriter) Write(p []byte) (int, error) {
|
func (lw *layerWriter) Write(p []byte) (int, error) {
|
||||||
|
if lw.resumableDigester == nil {
|
||||||
|
return lw.bufferedFileWriter.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the current write offset matches how many bytes have been
|
// Ensure that the current write offset matches how many bytes have been
|
||||||
// written to the digester. If not, we need to update the digest state to
|
// written to the digester. If not, we need to update the digest state to
|
||||||
// match the current write position.
|
// match the current write position.
|
||||||
|
@ -98,6 +102,10 @@ func (lw *layerWriter) Write(p []byte) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lw *layerWriter) ReadFrom(r io.Reader) (n int64, err error) {
|
func (lw *layerWriter) ReadFrom(r io.Reader) (n int64, err error) {
|
||||||
|
if lw.resumableDigester == nil {
|
||||||
|
return lw.bufferedFileWriter.ReadFrom(r)
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure that the current write offset matches how many bytes have been
|
// Ensure that the current write offset matches how many bytes have been
|
||||||
// written to the digester. If not, we need to update the digest state to
|
// written to the digester. If not, we need to update the digest state to
|
||||||
// match the current write position.
|
// match the current write position.
|
||||||
|
@ -113,8 +121,10 @@ func (lw *layerWriter) Close() error {
|
||||||
return lw.err
|
return lw.err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := lw.storeHashState(); err != nil {
|
if lw.resumableDigester != nil {
|
||||||
return err
|
if err := lw.storeHashState(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return lw.bufferedFileWriter.Close()
|
return lw.bufferedFileWriter.Close()
|
||||||
|
@ -261,22 +271,37 @@ func (lw *layerWriter) storeHashState() error {
|
||||||
// validateLayer checks the layer data against the digest, returning an error
|
// validateLayer checks the layer data against the digest, returning an error
|
||||||
// if it does not match. The canonical digest is returned.
|
// if it does not match. The canonical digest is returned.
|
||||||
func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) {
|
func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) {
|
||||||
// Restore the hasher state to the end of the upload.
|
var (
|
||||||
if err := lw.resumeHashAt(lw.size); err != nil {
|
verified, fullHash bool
|
||||||
return "", err
|
canonical digest.Digest
|
||||||
|
)
|
||||||
|
|
||||||
|
if lw.resumableDigester != nil {
|
||||||
|
// Restore the hasher state to the end of the upload.
|
||||||
|
if err := lw.resumeHashAt(lw.size); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
canonical = lw.resumableDigester.Digest()
|
||||||
|
|
||||||
|
if canonical.Algorithm() == dgst.Algorithm() {
|
||||||
|
// Common case: client and server prefer the same canonical digest
|
||||||
|
// algorithm - currently SHA256.
|
||||||
|
verified = dgst == canonical
|
||||||
|
} else {
|
||||||
|
// The client wants to use a different digest algorithm. They'll just
|
||||||
|
// have to be patient and wait for us to download and re-hash the
|
||||||
|
// uploaded content using that digest algorithm.
|
||||||
|
fullHash = true
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Not using resumable digests, so we need to hash the entire layer.
|
||||||
|
fullHash = true
|
||||||
}
|
}
|
||||||
|
|
||||||
var verified bool
|
if fullHash {
|
||||||
canonical := lw.resumableDigester.Digest()
|
digester := digest.NewCanonicalDigester()
|
||||||
|
|
||||||
if canonical.Algorithm() == dgst.Algorithm() {
|
|
||||||
// Common case: client and server prefer the same canonical digest
|
|
||||||
// algorithm - currently SHA256.
|
|
||||||
verified = dgst == canonical
|
|
||||||
} else {
|
|
||||||
// The client wants to use a different digest algorithm. They'll just
|
|
||||||
// have to be patient and wait for us to download and re-hash the
|
|
||||||
// uploaded content using that digest algorithm.
|
|
||||||
digestVerifier, err := digest.NewDigestVerifier(dgst)
|
digestVerifier, err := digest.NewDigestVerifier(dgst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -288,10 +313,13 @@ func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error)
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = io.Copy(digestVerifier, fr); err != nil {
|
tr := io.TeeReader(fr, digester)
|
||||||
|
|
||||||
|
if _, err = io.Copy(digestVerifier, tr); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
canonical = digester.Digest()
|
||||||
verified = digestVerifier.Verified()
|
verified = digestVerifier.Verified()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
6
registry/storage/layerwriter_nonresumable.go
Normal file
6
registry/storage/layerwriter_nonresumable.go
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
// +build noresumabledigest
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
func (lw *layerWriter) setupResumableDigester() {
|
||||||
|
}
|
9
registry/storage/layerwriter_resumable.go
Normal file
9
registry/storage/layerwriter_resumable.go
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
// +build !noresumabledigest
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import "github.com/docker/distribution/digest"
|
||||||
|
|
||||||
|
func (lw *layerWriter) setupResumableDigester() {
|
||||||
|
lw.resumableDigester = digest.NewCanonicalResumableDigester()
|
||||||
|
}
|
Loading…
Reference in a new issue