content: add Walk method to content store
Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
		
							parent
							
								
									8a36e1c6d1
								
							
						
					
					
						commit
						47f8b25d25
					
				
					 3 changed files with 126 additions and 9 deletions
				
			
		|  | @ -7,6 +7,7 @@ import ( | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" | 	"sync" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/docker/containerd/log" | ||||||
| 	"github.com/docker/distribution/digest" | 	"github.com/docker/distribution/digest" | ||||||
| 	"github.com/nightlyone/lockfile" | 	"github.com/nightlyone/lockfile" | ||||||
| 	"github.com/pkg/errors" | 	"github.com/pkg/errors" | ||||||
|  | @ -99,6 +100,44 @@ func (cs *ContentStore) Active() ([]Status, error) { | ||||||
| 
 | 
 | ||||||
| // TODO(stevvooe): Allow querying the set of blobs in the blob store. | // TODO(stevvooe): Allow querying the set of blobs in the blob store. | ||||||
| 
 | 
 | ||||||
|  | func (cs *ContentStore) Walk(fn func(path string, dgst digest.Digest) error) error { | ||||||
|  | 	root := filepath.Join(cs.root, "blobs") | ||||||
|  | 	var alg digest.Algorithm | ||||||
|  | 	return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { | ||||||
|  | 		if !fi.IsDir() && !alg.Available() { | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// TODO(stevvooe): There are few more cases with subdirs that should be | ||||||
|  | 		// handled in case the layout gets corrupted. This isn't strict enough | ||||||
|  | 		// an may spew bad data. | ||||||
|  | 
 | ||||||
|  | 		if path == root { | ||||||
|  | 			return nil | ||||||
|  | 		} else if filepath.Dir(path) == root { | ||||||
|  | 			alg = digest.Algorithm(filepath.Base(path)) | ||||||
|  | 
 | ||||||
|  | 			if !alg.Available() { | ||||||
|  | 				alg = "" | ||||||
|  | 				return filepath.SkipDir | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			// descending into a hash directory | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path)) | ||||||
|  | 		if err := dgst.Validate(); err != nil { | ||||||
|  | 			// log error but don't report | ||||||
|  | 			log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path") | ||||||
|  | 			// if we see this, it could mean some sort of corruption of the | ||||||
|  | 			// store or extra paths not expected previously. | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return fn(path, dgst) | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (cs *ContentStore) GetPath(dgst digest.Digest) (string, error) { | func (cs *ContentStore) GetPath(dgst digest.Digest) (string, error) { | ||||||
| 	p := filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) | 	p := filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) | ||||||
| 	if _, err := os.Stat(p); err != nil { | 	if _, err := os.Stat(p); err != nil { | ||||||
|  |  | ||||||
|  | @ -8,21 +8,19 @@ import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"io" | 	"io" | ||||||
| 	"io/ioutil" | 	"io/ioutil" | ||||||
|  | 	mrand "math/rand" | ||||||
| 	"os" | 	"os" | ||||||
| 	"path/filepath" | 	"path/filepath" | ||||||
| 	"reflect" | 	"reflect" | ||||||
|  | 	"runtime" | ||||||
| 	"testing" | 	"testing" | ||||||
| 
 | 
 | ||||||
| 	"github.com/docker/distribution/digest" | 	"github.com/docker/distribution/digest" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func TestContentWriter(t *testing.T) { | func TestContentWriter(t *testing.T) { | ||||||
| 	tmpdir, err := ioutil.TempDir("", "TestContentWriter-") | 	tmpdir, cs, cleanup := contentStoreEnv(t) | ||||||
| 
 | 	defer cleanup() | ||||||
| 	cs, err := OpenContentStore(tmpdir) |  | ||||||
| 	if err != nil { |  | ||||||
| 		t.Fatal(err) |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	if _, err := os.Stat(filepath.Join(tmpdir, "ingest")); os.IsNotExist(err) { | 	if _, err := os.Stat(filepath.Join(tmpdir, "ingest")); os.IsNotExist(err) { | ||||||
| 		t.Fatal("ingest dir should be created", err) | 		t.Fatal("ingest dir should be created", err) | ||||||
|  | @ -112,7 +110,76 @@ func TestContentWriter(t *testing.T) { | ||||||
| 	dumpDir(tmpdir) | 	dumpDir(tmpdir) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func checkCopy(t *testing.T, size int64, dst io.Writer, src io.Reader) { | func TestWalkBlobs(t *testing.T) { | ||||||
|  | 	_, cs, cleanup := contentStoreEnv(t) | ||||||
|  | 	defer cleanup() | ||||||
|  | 
 | ||||||
|  | 	const ( | ||||||
|  | 		nblobs  = 4 << 10 | ||||||
|  | 		maxsize = 4 << 10 | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	var ( | ||||||
|  | 		blobs    = map[digest.Digest][]byte{} | ||||||
|  | 		expected = map[digest.Digest]struct{}{} | ||||||
|  | 		found    = map[digest.Digest]struct{}{} | ||||||
|  | 	) | ||||||
|  | 
 | ||||||
|  | 	for i := 0; i < nblobs; i++ { | ||||||
|  | 		p := make([]byte, mrand.Intn(maxsize)) | ||||||
|  | 
 | ||||||
|  | 		if _, err := rand.Read(p); err != nil { | ||||||
|  | 			t.Fatal(err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		dgst := checkWrite(t, cs, p) | ||||||
|  | 		blobs[dgst] = p | ||||||
|  | 		expected[dgst] = struct{}{} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if err := cs.Walk(func(path string, dgst digest.Digest) error { | ||||||
|  | 		found[dgst] = struct{}{} | ||||||
|  | 		if checked := checkBlobPath(t, cs, dgst); checked != path { | ||||||
|  | 			t.Fatalf("blob path did not match: %v != %v", path, checked) | ||||||
|  | 		} | ||||||
|  | 		return nil | ||||||
|  | 	}); err != nil { | ||||||
|  | 		t.Fatal(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if !reflect.DeepEqual(expected, found) { | ||||||
|  | 		t.Fatalf("expected did not match found: %v != %v", found, expected) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func contentStoreEnv(t interface { | ||||||
|  | 	Fatal(args ...interface{}) | ||||||
|  | }) (string, *ContentStore, func()) { | ||||||
|  | 	pc, _, _, ok := runtime.Caller(1) | ||||||
|  | 	if !ok { | ||||||
|  | 		t.Fatal("failed to resolve caller") | ||||||
|  | 	} | ||||||
|  | 	fn := runtime.FuncForPC(pc) | ||||||
|  | 
 | ||||||
|  | 	tmpdir, err := ioutil.TempDir("", filepath.Base(fn.Name())+"-") | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatal(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	cs, err := OpenContentStore(tmpdir) | ||||||
|  | 	if err != nil { | ||||||
|  | 		os.RemoveAll(tmpdir) | ||||||
|  | 		t.Fatal(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return tmpdir, cs, func() { | ||||||
|  | 		os.RemoveAll(tmpdir) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func checkCopy(t interface { | ||||||
|  | 	Fatal(args ...interface{}) | ||||||
|  | }, size int64, dst io.Writer, src io.Reader) { | ||||||
| 	nn, err := io.Copy(dst, src) | 	nn, err := io.Copy(dst, src) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
|  | @ -126,7 +193,7 @@ func checkCopy(t *testing.T, size int64, dst io.Writer, src io.Reader) { | ||||||
| func checkBlobPath(t *testing.T, cs *ContentStore, dgst digest.Digest) string { | func checkBlobPath(t *testing.T, cs *ContentStore, dgst digest.Digest) string { | ||||||
| 	path, err := cs.GetPath(dgst) | 	path, err := cs.GetPath(dgst) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err, dgst) | ||||||
| 	} | 	} | ||||||
| 	if path != filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) { | 	if path != filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) { | ||||||
| 		t.Fatalf("unxpected path: %q", path) | 		t.Fatalf("unxpected path: %q", path) | ||||||
|  | @ -144,6 +211,15 @@ func checkBlobPath(t *testing.T, cs *ContentStore, dgst digest.Digest) string { | ||||||
| 	return path | 	return path | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func checkWrite(t *testing.T, cs *ContentStore, p []byte) digest.Digest { | ||||||
|  | 	dgst := digest.FromBytes(p) | ||||||
|  | 	if err := WriteBlob(cs, bytes.NewReader(p), int64(len(p)), dgst); err != nil { | ||||||
|  | 		t.Fatal(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return dgst | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func dumpDir(root string) error { | func dumpDir(root string) error { | ||||||
| 	return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { | 	return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|  |  | ||||||
|  | @ -60,7 +60,9 @@ func (cw *ContentWriter) Commit(size int64, expected digest.Digest) error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	dgst := cw.digester.Digest() | 	dgst := cw.digester.Digest() | ||||||
| 	if expected != dgst { | 	// TODO(stevvooe): Correctly handle missing expected digest or allow no | ||||||
|  | 	// expected digest at commit time. | ||||||
|  | 	if expected != "" && expected != dgst { | ||||||
| 		return errors.Errorf("unexpected digest: %v != %v", dgst, expected) | 		return errors.Errorf("unexpected digest: %v != %v", dgst, expected) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue