Revert "update to use containerd seccomp package"
This reverts commit 4f8e065faf055d3f0463a92622297ca3afac07f4.
This commit is contained in:
parent
09243b740c
commit
60f032f6f5
8199 changed files with 1598219 additions and 30742 deletions
83
vendor/github.com/docker/distribution/registry/proxy/proxyauth.go
generated
vendored
Normal file
83
vendor/github.com/docker/distribution/registry/proxy/proxyauth.go
generated
vendored
Normal file
|
@ -0,0 +1,83 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/registry/client/auth"
|
||||
"github.com/docker/distribution/registry/client/auth/challenge"
|
||||
)
|
||||
|
||||
const challengeHeader = "Docker-Distribution-Api-Version"
|
||||
|
||||
type userpass struct {
|
||||
username string
|
||||
password string
|
||||
}
|
||||
|
||||
type credentials struct {
|
||||
creds map[string]userpass
|
||||
}
|
||||
|
||||
func (c credentials) Basic(u *url.URL) (string, string) {
|
||||
up := c.creds[u.String()]
|
||||
|
||||
return up.username, up.password
|
||||
}
|
||||
|
||||
func (c credentials) RefreshToken(u *url.URL, service string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (c credentials) SetRefreshToken(u *url.URL, service, token string) {
|
||||
}
|
||||
|
||||
// configureAuth stores credentials for challenge responses
|
||||
func configureAuth(username, password, remoteURL string) (auth.CredentialStore, error) {
|
||||
creds := map[string]userpass{}
|
||||
|
||||
authURLs, err := getAuthURLs(remoteURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, url := range authURLs {
|
||||
context.GetLogger(context.Background()).Infof("Discovered token authentication URL: %s", url)
|
||||
creds[url] = userpass{
|
||||
username: username,
|
||||
password: password,
|
||||
}
|
||||
}
|
||||
|
||||
return credentials{creds: creds}, nil
|
||||
}
|
||||
|
||||
func getAuthURLs(remoteURL string) ([]string, error) {
|
||||
authURLs := []string{}
|
||||
|
||||
resp, err := http.Get(remoteURL + "/v2/")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
for _, c := range challenge.ResponseChallenges(resp) {
|
||||
if strings.EqualFold(c.Scheme, "bearer") {
|
||||
authURLs = append(authURLs, c.Parameters["realm"])
|
||||
}
|
||||
}
|
||||
|
||||
return authURLs, nil
|
||||
}
|
||||
|
||||
func ping(manager challenge.Manager, endpoint, versionHeader string) error {
|
||||
resp, err := http.Get(endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return manager.AddResponse(resp)
|
||||
}
|
225
vendor/github.com/docker/distribution/registry/proxy/proxyblobstore.go
generated
vendored
Normal file
225
vendor/github.com/docker/distribution/registry/proxy/proxyblobstore.go
generated
vendored
Normal file
|
@ -0,0 +1,225 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
dcontext "github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/docker/distribution/registry/proxy/scheduler"
|
||||
"github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
// todo(richardscothern): from cache control header or config file
|
||||
const blobTTL = time.Duration(24 * 7 * time.Hour)
|
||||
|
||||
type proxyBlobStore struct {
|
||||
localStore distribution.BlobStore
|
||||
remoteStore distribution.BlobService
|
||||
scheduler *scheduler.TTLExpirationScheduler
|
||||
repositoryName reference.Named
|
||||
authChallenger authChallenger
|
||||
}
|
||||
|
||||
var _ distribution.BlobStore = &proxyBlobStore{}
|
||||
|
||||
// inflight tracks currently downloading blobs
|
||||
var inflight = make(map[digest.Digest]struct{})
|
||||
|
||||
// mu protects inflight
|
||||
var mu sync.Mutex
|
||||
|
||||
func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
|
||||
w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
|
||||
w.Header().Set("Content-Type", mediaType)
|
||||
w.Header().Set("Docker-Content-Digest", digest.String())
|
||||
w.Header().Set("Etag", digest.String())
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) {
|
||||
desc, err := pbs.remoteStore.Stat(ctx, dgst)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
if w, ok := writer.(http.ResponseWriter); ok {
|
||||
setResponseHeaders(w, desc.Size, desc.MediaType, dgst)
|
||||
}
|
||||
|
||||
remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
defer remoteReader.Close()
|
||||
|
||||
_, err = io.CopyN(writer, remoteReader, desc.Size)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
proxyMetrics.BlobPush(uint64(desc.Size))
|
||||
|
||||
return desc, nil
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) {
|
||||
localDesc, err := pbs.localStore.Stat(ctx, dgst)
|
||||
if err != nil {
|
||||
// Stat can report a zero sized file here if it's checked between creation
|
||||
// and population. Return nil error, and continue
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
proxyMetrics.BlobPush(uint64(localDesc.Size))
|
||||
return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error {
|
||||
defer func() {
|
||||
mu.Lock()
|
||||
delete(inflight, dgst)
|
||||
mu.Unlock()
|
||||
}()
|
||||
|
||||
var desc distribution.Descriptor
|
||||
var err error
|
||||
var bw distribution.BlobWriter
|
||||
|
||||
bw, err = pbs.localStore.Create(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
desc, err = pbs.copyContent(ctx, dgst, bw)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = bw.Commit(ctx, desc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
|
||||
served, err := pbs.serveLocal(ctx, w, r, dgst)
|
||||
if err != nil {
|
||||
dcontext.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
if served {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
_, ok := inflight[dgst]
|
||||
if ok {
|
||||
mu.Unlock()
|
||||
_, err := pbs.copyContent(ctx, dgst, w)
|
||||
return err
|
||||
}
|
||||
inflight[dgst] = struct{}{}
|
||||
mu.Unlock()
|
||||
|
||||
go func(dgst digest.Digest) {
|
||||
if err := pbs.storeLocal(ctx, dgst); err != nil {
|
||||
dcontext.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error())
|
||||
}
|
||||
|
||||
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
|
||||
if err != nil {
|
||||
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
pbs.scheduler.AddBlob(blobRef, repositoryTTL)
|
||||
}(dgst)
|
||||
|
||||
_, err = pbs.copyContent(ctx, dgst, w)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
desc, err := pbs.localStore.Stat(ctx, dgst)
|
||||
if err == nil {
|
||||
return desc, err
|
||||
}
|
||||
|
||||
if err != distribution.ErrBlobUnknown {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
return pbs.remoteStore.Stat(ctx, dgst)
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
|
||||
blob, err := pbs.localStore.Get(ctx, dgst)
|
||||
if err == nil {
|
||||
return blob, nil
|
||||
}
|
||||
|
||||
if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
|
||||
blob, err = pbs.remoteStore.Get(ctx, dgst)
|
||||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
|
||||
_, err = pbs.localStore.Put(ctx, "", blob)
|
||||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
return blob, nil
|
||||
}
|
||||
|
||||
// Unsupported functions
|
||||
func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
|
||||
return distribution.Descriptor{}, distribution.ErrUnsupported
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
|
||||
return nil, distribution.ErrUnsupported
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
||||
return nil, distribution.ErrUnsupported
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
return distribution.Descriptor{}, distribution.ErrUnsupported
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
|
||||
return nil, distribution.ErrUnsupported
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
return distribution.ErrUnsupported
|
||||
}
|
416
vendor/github.com/docker/distribution/registry/proxy/proxyblobstore_test.go
generated
vendored
Normal file
416
vendor/github.com/docker/distribution/registry/proxy/proxyblobstore_test.go
generated
vendored
Normal file
|
@ -0,0 +1,416 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/docker/distribution/registry/proxy/scheduler"
|
||||
"github.com/docker/distribution/registry/storage"
|
||||
"github.com/docker/distribution/registry/storage/cache/memory"
|
||||
"github.com/docker/distribution/registry/storage/driver/filesystem"
|
||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||
"github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
var sbsMu sync.Mutex
|
||||
|
||||
type statsBlobStore struct {
|
||||
stats map[string]int
|
||||
blobs distribution.BlobStore
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["put"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Put(ctx, mediaType, p)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["get"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Get(ctx, dgst)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["create"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Create(ctx, options...)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["resume"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Resume(ctx, id)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["open"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Open(ctx, dgst)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["serveblob"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.ServeBlob(ctx, w, r, dgst)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
|
||||
sbsMu.Lock()
|
||||
sbs.stats["stat"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Stat(ctx, dgst)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["delete"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Delete(ctx, dgst)
|
||||
}
|
||||
|
||||
type testEnv struct {
|
||||
numUnique int
|
||||
inRemote []distribution.Descriptor
|
||||
store proxyBlobStore
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (te *testEnv) LocalStats() *map[string]int {
|
||||
sbsMu.Lock()
|
||||
ls := te.store.localStore.(statsBlobStore).stats
|
||||
sbsMu.Unlock()
|
||||
return &ls
|
||||
}
|
||||
|
||||
func (te *testEnv) RemoteStats() *map[string]int {
|
||||
sbsMu.Lock()
|
||||
rs := te.store.remoteStore.(statsBlobStore).stats
|
||||
sbsMu.Unlock()
|
||||
return &rs
|
||||
}
|
||||
|
||||
// Populate remote store and record the digests
|
||||
func makeTestEnv(t *testing.T, name string) *testEnv {
|
||||
nameRef, err := reference.WithName(name)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to parse reference: %s", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
truthDir, err := ioutil.TempDir("", "truth")
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create tempdir: %s", err)
|
||||
}
|
||||
|
||||
cacheDir, err := ioutil.TempDir("", "cache")
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create tempdir: %s", err)
|
||||
}
|
||||
|
||||
localDriver, err := filesystem.FromParameters(map[string]interface{}{
|
||||
"rootdirectory": truthDir,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create filesystem driver: %s", err)
|
||||
}
|
||||
|
||||
// todo: create a tempfile area here
|
||||
localRegistry, err := storage.NewRegistry(ctx, localDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
localRepo, err := localRegistry.Repository(ctx, nameRef)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
}
|
||||
|
||||
cacheDriver, err := filesystem.FromParameters(map[string]interface{}{
|
||||
"rootdirectory": cacheDir,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create filesystem driver: %s", err)
|
||||
}
|
||||
|
||||
truthRegistry, err := storage.NewRegistry(ctx, cacheDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()))
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
truthRepo, err := truthRegistry.Repository(ctx, nameRef)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
}
|
||||
|
||||
truthBlobs := statsBlobStore{
|
||||
stats: make(map[string]int),
|
||||
blobs: truthRepo.Blobs(ctx),
|
||||
}
|
||||
|
||||
localBlobs := statsBlobStore{
|
||||
stats: make(map[string]int),
|
||||
blobs: localRepo.Blobs(ctx),
|
||||
}
|
||||
|
||||
s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json")
|
||||
|
||||
proxyBlobStore := proxyBlobStore{
|
||||
repositoryName: nameRef,
|
||||
remoteStore: truthBlobs,
|
||||
localStore: localBlobs,
|
||||
scheduler: s,
|
||||
authChallenger: &mockChallenger{},
|
||||
}
|
||||
|
||||
te := &testEnv{
|
||||
store: proxyBlobStore,
|
||||
ctx: ctx,
|
||||
}
|
||||
return te
|
||||
}
|
||||
|
||||
func makeBlob(size int) []byte {
|
||||
blob := make([]byte, size, size)
|
||||
for i := 0; i < size; i++ {
|
||||
blob[i] = byte('A' + rand.Int()%48)
|
||||
}
|
||||
return blob
|
||||
}
|
||||
|
||||
func init() {
|
||||
rand.Seed(42)
|
||||
}
|
||||
|
||||
func perm(m []distribution.Descriptor) []distribution.Descriptor {
|
||||
for i := 0; i < len(m); i++ {
|
||||
j := rand.Intn(i + 1)
|
||||
tmp := m[i]
|
||||
m[i] = m[j]
|
||||
m[j] = tmp
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func populate(t *testing.T, te *testEnv, blobCount, size, numUnique int) {
|
||||
var inRemote []distribution.Descriptor
|
||||
|
||||
for i := 0; i < numUnique; i++ {
|
||||
bytes := makeBlob(size)
|
||||
for j := 0; j < blobCount/numUnique; j++ {
|
||||
desc, err := te.store.remoteStore.Put(te.ctx, "", bytes)
|
||||
if err != nil {
|
||||
t.Fatalf("Put in store")
|
||||
}
|
||||
|
||||
inRemote = append(inRemote, desc)
|
||||
}
|
||||
}
|
||||
|
||||
te.inRemote = inRemote
|
||||
te.numUnique = numUnique
|
||||
}
|
||||
func TestProxyStoreGet(t *testing.T) {
|
||||
te := makeTestEnv(t, "foo/bar")
|
||||
|
||||
localStats := te.LocalStats()
|
||||
remoteStats := te.RemoteStats()
|
||||
|
||||
populate(t, te, 1, 10, 1)
|
||||
_, err := te.store.Get(te.ctx, te.inRemote[0].Digest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if (*localStats)["get"] != 1 && (*localStats)["put"] != 1 {
|
||||
t.Errorf("Unexpected local counts")
|
||||
}
|
||||
|
||||
if (*remoteStats)["get"] != 1 {
|
||||
t.Errorf("Unexpected remote get count")
|
||||
}
|
||||
|
||||
_, err = te.store.Get(te.ctx, te.inRemote[0].Digest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if (*localStats)["get"] != 2 && (*localStats)["put"] != 1 {
|
||||
t.Errorf("Unexpected local counts")
|
||||
}
|
||||
|
||||
if (*remoteStats)["get"] != 1 {
|
||||
t.Errorf("Unexpected remote get count")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestProxyStoreStat(t *testing.T) {
|
||||
te := makeTestEnv(t, "foo/bar")
|
||||
|
||||
remoteBlobCount := 1
|
||||
populate(t, te, remoteBlobCount, 10, 1)
|
||||
|
||||
localStats := te.LocalStats()
|
||||
remoteStats := te.RemoteStats()
|
||||
|
||||
// Stat - touches both stores
|
||||
for _, d := range te.inRemote {
|
||||
_, err := te.store.Stat(te.ctx, d.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf("Error stating proxy store")
|
||||
}
|
||||
}
|
||||
|
||||
if (*localStats)["stat"] != remoteBlobCount {
|
||||
t.Errorf("Unexpected local stat count")
|
||||
}
|
||||
|
||||
if (*remoteStats)["stat"] != remoteBlobCount {
|
||||
t.Errorf("Unexpected remote stat count")
|
||||
}
|
||||
|
||||
if te.store.authChallenger.(*mockChallenger).count != len(te.inRemote) {
|
||||
t.Fatalf("Unexpected auth challenge count, got %#v", te.store.authChallenger)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestProxyStoreServeHighConcurrency(t *testing.T) {
|
||||
te := makeTestEnv(t, "foo/bar")
|
||||
blobSize := 200
|
||||
blobCount := 10
|
||||
numUnique := 1
|
||||
populate(t, te, blobCount, blobSize, numUnique)
|
||||
|
||||
numClients := 16
|
||||
testProxyStoreServe(t, te, numClients)
|
||||
}
|
||||
|
||||
func TestProxyStoreServeMany(t *testing.T) {
|
||||
te := makeTestEnv(t, "foo/bar")
|
||||
blobSize := 200
|
||||
blobCount := 10
|
||||
numUnique := 4
|
||||
populate(t, te, blobCount, blobSize, numUnique)
|
||||
|
||||
numClients := 4
|
||||
testProxyStoreServe(t, te, numClients)
|
||||
}
|
||||
|
||||
// todo(richardscothern): blobCount must be smaller than num clients
|
||||
func TestProxyStoreServeBig(t *testing.T) {
|
||||
te := makeTestEnv(t, "foo/bar")
|
||||
|
||||
blobSize := 2 << 20
|
||||
blobCount := 4
|
||||
numUnique := 2
|
||||
populate(t, te, blobCount, blobSize, numUnique)
|
||||
|
||||
numClients := 4
|
||||
testProxyStoreServe(t, te, numClients)
|
||||
}
|
||||
|
||||
// testProxyStoreServe will create clients to consume all blobs
|
||||
// populated in the truth store
|
||||
func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
|
||||
localStats := te.LocalStats()
|
||||
remoteStats := te.RemoteStats()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < numClients; i++ {
|
||||
// Serveblob - pulls through blobs
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for _, remoteBlob := range te.inRemote {
|
||||
w := httptest.NewRecorder()
|
||||
r, err := http.NewRequest("GET", "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = te.store.ServeBlob(te.ctx, w, r, remoteBlob.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
bodyBytes := w.Body.Bytes()
|
||||
localDigest := digest.FromBytes(bodyBytes)
|
||||
if localDigest != remoteBlob.Digest {
|
||||
t.Fatalf("Mismatching blob fetch from proxy")
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
remoteBlobCount := len(te.inRemote)
|
||||
sbsMu.Lock()
|
||||
if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique {
|
||||
sbsMu.Unlock()
|
||||
t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount)
|
||||
}
|
||||
sbsMu.Unlock()
|
||||
|
||||
// Wait for any async storage goroutines to finish
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
sbsMu.Lock()
|
||||
remoteStatCount := (*remoteStats)["stat"]
|
||||
remoteOpenCount := (*remoteStats)["open"]
|
||||
sbsMu.Unlock()
|
||||
|
||||
// Serveblob - blobs come from local
|
||||
for _, dr := range te.inRemote {
|
||||
w := httptest.NewRecorder()
|
||||
r, err := http.NewRequest("GET", "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = te.store.ServeBlob(te.ctx, w, r, dr.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
dl := digest.FromBytes(w.Body.Bytes())
|
||||
if dl != dr.Digest {
|
||||
t.Errorf("Mismatching blob fetch from proxy")
|
||||
}
|
||||
}
|
||||
|
||||
localStats = te.LocalStats()
|
||||
remoteStats = te.RemoteStats()
|
||||
|
||||
// Ensure remote unchanged
|
||||
sbsMu.Lock()
|
||||
defer sbsMu.Unlock()
|
||||
if (*remoteStats)["stat"] != remoteStatCount && (*remoteStats)["open"] != remoteOpenCount {
|
||||
t.Fatalf("unexpected remote stats: %#v", remoteStats)
|
||||
}
|
||||
}
|
96
vendor/github.com/docker/distribution/registry/proxy/proxymanifeststore.go
generated
vendored
Normal file
96
vendor/github.com/docker/distribution/registry/proxy/proxymanifeststore.go
generated
vendored
Normal file
|
@ -0,0 +1,96 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
dcontext "github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/docker/distribution/registry/proxy/scheduler"
|
||||
"github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
// todo(richardscothern): from cache control header or config
|
||||
const repositoryTTL = time.Duration(24 * 7 * time.Hour)
|
||||
|
||||
type proxyManifestStore struct {
|
||||
ctx context.Context
|
||||
localManifests distribution.ManifestService
|
||||
remoteManifests distribution.ManifestService
|
||||
repositoryName reference.Named
|
||||
scheduler *scheduler.TTLExpirationScheduler
|
||||
authChallenger authChallenger
|
||||
}
|
||||
|
||||
var _ distribution.ManifestService = &proxyManifestStore{}
|
||||
|
||||
func (pms proxyManifestStore) Exists(ctx context.Context, dgst digest.Digest) (bool, error) {
|
||||
exists, err := pms.localManifests.Exists(ctx, dgst)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if exists {
|
||||
return true, nil
|
||||
}
|
||||
if err := pms.authChallenger.tryEstablishChallenges(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return pms.remoteManifests.Exists(ctx, dgst)
|
||||
}
|
||||
|
||||
func (pms proxyManifestStore) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
|
||||
// At this point `dgst` was either specified explicitly, or returned by the
|
||||
// tagstore with the most recent association.
|
||||
var fromRemote bool
|
||||
manifest, err := pms.localManifests.Get(ctx, dgst, options...)
|
||||
if err != nil {
|
||||
if err := pms.authChallenger.tryEstablishChallenges(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
manifest, err = pms.remoteManifests.Get(ctx, dgst, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fromRemote = true
|
||||
}
|
||||
|
||||
_, payload, err := manifest.Payload()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
proxyMetrics.ManifestPush(uint64(len(payload)))
|
||||
if fromRemote {
|
||||
proxyMetrics.ManifestPull(uint64(len(payload)))
|
||||
|
||||
_, err = pms.localManifests.Put(ctx, manifest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Schedule the manifest blob for removal
|
||||
repoBlob, err := reference.WithDigest(pms.repositoryName, dgst)
|
||||
if err != nil {
|
||||
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pms.scheduler.AddManifest(repoBlob, repositoryTTL)
|
||||
// Ensure the manifest blob is cleaned up
|
||||
//pms.scheduler.AddBlob(blobRef, repositoryTTL)
|
||||
|
||||
}
|
||||
|
||||
return manifest, err
|
||||
}
|
||||
|
||||
func (pms proxyManifestStore) Put(ctx context.Context, manifest distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
|
||||
var d digest.Digest
|
||||
return d, distribution.ErrUnsupported
|
||||
}
|
||||
|
||||
func (pms proxyManifestStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
return distribution.ErrUnsupported
|
||||
}
|
275
vendor/github.com/docker/distribution/registry/proxy/proxymanifeststore_test.go
generated
vendored
Normal file
275
vendor/github.com/docker/distribution/registry/proxy/proxymanifeststore_test.go
generated
vendored
Normal file
|
@ -0,0 +1,275 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/manifest"
|
||||
"github.com/docker/distribution/manifest/schema1"
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/docker/distribution/registry/client/auth"
|
||||
"github.com/docker/distribution/registry/client/auth/challenge"
|
||||
"github.com/docker/distribution/registry/proxy/scheduler"
|
||||
"github.com/docker/distribution/registry/storage"
|
||||
"github.com/docker/distribution/registry/storage/cache/memory"
|
||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||
"github.com/docker/distribution/testutil"
|
||||
"github.com/docker/libtrust"
|
||||
"github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
type statsManifest struct {
|
||||
manifests distribution.ManifestService
|
||||
stats map[string]int
|
||||
}
|
||||
|
||||
type manifestStoreTestEnv struct {
|
||||
manifestDigest digest.Digest // digest of the signed manifest in the local storage
|
||||
manifests proxyManifestStore
|
||||
}
|
||||
|
||||
func (te manifestStoreTestEnv) LocalStats() *map[string]int {
|
||||
ls := te.manifests.localManifests.(statsManifest).stats
|
||||
return &ls
|
||||
}
|
||||
|
||||
func (te manifestStoreTestEnv) RemoteStats() *map[string]int {
|
||||
rs := te.manifests.remoteManifests.(statsManifest).stats
|
||||
return &rs
|
||||
}
|
||||
|
||||
func (sm statsManifest) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
sm.stats["delete"]++
|
||||
return sm.manifests.Delete(ctx, dgst)
|
||||
}
|
||||
|
||||
func (sm statsManifest) Exists(ctx context.Context, dgst digest.Digest) (bool, error) {
|
||||
sm.stats["exists"]++
|
||||
return sm.manifests.Exists(ctx, dgst)
|
||||
}
|
||||
|
||||
func (sm statsManifest) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
|
||||
sm.stats["get"]++
|
||||
return sm.manifests.Get(ctx, dgst)
|
||||
}
|
||||
|
||||
func (sm statsManifest) Put(ctx context.Context, manifest distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
|
||||
sm.stats["put"]++
|
||||
return sm.manifests.Put(ctx, manifest)
|
||||
}
|
||||
|
||||
type mockChallenger struct {
|
||||
sync.Mutex
|
||||
count int
|
||||
}
|
||||
|
||||
// Called for remote operations only
|
||||
func (m *mockChallenger) tryEstablishChallenges(context.Context) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
m.count++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockChallenger) credentialStore() auth.CredentialStore {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockChallenger) challengeManager() challenge.Manager {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestEnv {
|
||||
nameRef, err := reference.WithName(name)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to parse reference: %s", err)
|
||||
}
|
||||
k, err := libtrust.GenerateECP256PrivateKey()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
truthRegistry, err := storage.NewRegistry(ctx, inmemory.New(),
|
||||
storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()),
|
||||
storage.Schema1SigningKey(k))
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
truthRepo, err := truthRegistry.Repository(ctx, nameRef)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
}
|
||||
tr, err := truthRepo.Manifests(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
truthManifests := statsManifest{
|
||||
manifests: tr,
|
||||
stats: make(map[string]int),
|
||||
}
|
||||
|
||||
manifestDigest, err := populateRepo(ctx, t, truthRepo, name, tag)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
localRegistry, err := storage.NewRegistry(ctx, inmemory.New(), storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption, storage.Schema1SigningKey(k))
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
localRepo, err := localRegistry.Repository(ctx, nameRef)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
}
|
||||
lr, err := localRepo.Manifests(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
localManifests := statsManifest{
|
||||
manifests: lr,
|
||||
stats: make(map[string]int),
|
||||
}
|
||||
|
||||
s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json")
|
||||
return &manifestStoreTestEnv{
|
||||
manifestDigest: manifestDigest,
|
||||
manifests: proxyManifestStore{
|
||||
ctx: ctx,
|
||||
localManifests: localManifests,
|
||||
remoteManifests: truthManifests,
|
||||
scheduler: s,
|
||||
repositoryName: nameRef,
|
||||
authChallenger: &mockChallenger{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func populateRepo(ctx context.Context, t *testing.T, repository distribution.Repository, name, tag string) (digest.Digest, error) {
|
||||
m := schema1.Manifest{
|
||||
Versioned: manifest.Versioned{
|
||||
SchemaVersion: 1,
|
||||
},
|
||||
Name: name,
|
||||
Tag: tag,
|
||||
}
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
wr, err := repository.Blobs(ctx).Create(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating test upload: %v", err)
|
||||
}
|
||||
|
||||
rs, ts, err := testutil.CreateRandomTarFile()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error generating test layer file")
|
||||
}
|
||||
dgst := digest.Digest(ts)
|
||||
if _, err := io.Copy(wr, rs); err != nil {
|
||||
t.Fatalf("unexpected error copying to upload: %v", err)
|
||||
}
|
||||
|
||||
if _, err := wr.Commit(ctx, distribution.Descriptor{Digest: dgst}); err != nil {
|
||||
t.Fatalf("unexpected error finishing upload: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
pk, err := libtrust.GenerateECP256PrivateKey()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error generating private key: %v", err)
|
||||
}
|
||||
|
||||
sm, err := schema1.Sign(&m, pk)
|
||||
if err != nil {
|
||||
t.Fatalf("error signing manifest: %v", err)
|
||||
}
|
||||
|
||||
ms, err := repository.Manifests(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
dgst, err := ms.Put(ctx, sm)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected errors putting manifest: %v", err)
|
||||
}
|
||||
|
||||
return dgst, nil
|
||||
}
|
||||
|
||||
// TestProxyManifests contains basic acceptance tests
|
||||
// for the pull-through behavior
|
||||
func TestProxyManifests(t *testing.T) {
|
||||
name := "foo/bar"
|
||||
env := newManifestStoreTestEnv(t, name, "latest")
|
||||
|
||||
localStats := env.LocalStats()
|
||||
remoteStats := env.RemoteStats()
|
||||
|
||||
ctx := context.Background()
|
||||
// Stat - must check local and remote
|
||||
exists, err := env.manifests.Exists(ctx, env.manifestDigest)
|
||||
if err != nil {
|
||||
t.Fatalf("Error checking existence")
|
||||
}
|
||||
if !exists {
|
||||
t.Errorf("Unexpected non-existant manifest")
|
||||
}
|
||||
|
||||
if (*localStats)["exists"] != 1 && (*remoteStats)["exists"] != 1 {
|
||||
t.Errorf("Unexpected exists count : \n%v \n%v", localStats, remoteStats)
|
||||
}
|
||||
|
||||
if env.manifests.authChallenger.(*mockChallenger).count != 1 {
|
||||
t.Fatalf("Expected 1 auth challenge, got %#v", env.manifests.authChallenger)
|
||||
}
|
||||
|
||||
// Get - should succeed and pull manifest into local
|
||||
_, err = env.manifests.Get(ctx, env.manifestDigest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if (*localStats)["get"] != 1 && (*remoteStats)["get"] != 1 {
|
||||
t.Errorf("Unexpected get count")
|
||||
}
|
||||
|
||||
if (*localStats)["put"] != 1 {
|
||||
t.Errorf("Expected local put")
|
||||
}
|
||||
|
||||
if env.manifests.authChallenger.(*mockChallenger).count != 2 {
|
||||
t.Fatalf("Expected 2 auth challenges, got %#v", env.manifests.authChallenger)
|
||||
}
|
||||
|
||||
// Stat - should only go to local
|
||||
exists, err = env.manifests.Exists(ctx, env.manifestDigest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !exists {
|
||||
t.Errorf("Unexpected non-existant manifest")
|
||||
}
|
||||
|
||||
if (*localStats)["exists"] != 2 && (*remoteStats)["exists"] != 1 {
|
||||
t.Errorf("Unexpected exists count")
|
||||
}
|
||||
|
||||
if env.manifests.authChallenger.(*mockChallenger).count != 2 {
|
||||
t.Fatalf("Expected 2 auth challenges, got %#v", env.manifests.authChallenger)
|
||||
}
|
||||
|
||||
// Get proxied - won't require another authchallenge
|
||||
_, err = env.manifests.Get(ctx, env.manifestDigest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if env.manifests.authChallenger.(*mockChallenger).count != 2 {
|
||||
t.Fatalf("Expected 2 auth challenges, got %#v", env.manifests.authChallenger)
|
||||
}
|
||||
|
||||
}
|
74
vendor/github.com/docker/distribution/registry/proxy/proxymetrics.go
generated
vendored
Normal file
74
vendor/github.com/docker/distribution/registry/proxy/proxymetrics.go
generated
vendored
Normal file
|
@ -0,0 +1,74 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// Metrics is used to hold metric counters
|
||||
// related to the proxy
|
||||
type Metrics struct {
|
||||
Requests uint64
|
||||
Hits uint64
|
||||
Misses uint64
|
||||
BytesPulled uint64
|
||||
BytesPushed uint64
|
||||
}
|
||||
|
||||
type proxyMetricsCollector struct {
|
||||
blobMetrics Metrics
|
||||
manifestMetrics Metrics
|
||||
}
|
||||
|
||||
// BlobPull tracks metrics about blobs pulled into the cache
|
||||
func (pmc *proxyMetricsCollector) BlobPull(bytesPulled uint64) {
|
||||
atomic.AddUint64(&pmc.blobMetrics.Misses, 1)
|
||||
atomic.AddUint64(&pmc.blobMetrics.BytesPulled, bytesPulled)
|
||||
}
|
||||
|
||||
// BlobPush tracks metrics about blobs pushed to clients
|
||||
func (pmc *proxyMetricsCollector) BlobPush(bytesPushed uint64) {
|
||||
atomic.AddUint64(&pmc.blobMetrics.Requests, 1)
|
||||
atomic.AddUint64(&pmc.blobMetrics.Hits, 1)
|
||||
atomic.AddUint64(&pmc.blobMetrics.BytesPushed, bytesPushed)
|
||||
}
|
||||
|
||||
// ManifestPull tracks metrics related to Manifests pulled into the cache
|
||||
func (pmc *proxyMetricsCollector) ManifestPull(bytesPulled uint64) {
|
||||
atomic.AddUint64(&pmc.manifestMetrics.Misses, 1)
|
||||
atomic.AddUint64(&pmc.manifestMetrics.BytesPulled, bytesPulled)
|
||||
}
|
||||
|
||||
// ManifestPush tracks metrics about manifests pushed to clients
|
||||
func (pmc *proxyMetricsCollector) ManifestPush(bytesPushed uint64) {
|
||||
atomic.AddUint64(&pmc.manifestMetrics.Requests, 1)
|
||||
atomic.AddUint64(&pmc.manifestMetrics.Hits, 1)
|
||||
atomic.AddUint64(&pmc.manifestMetrics.BytesPushed, bytesPushed)
|
||||
}
|
||||
|
||||
// proxyMetrics tracks metrics about the proxy cache. This is
|
||||
// kept globally and made available via expvar.
|
||||
var proxyMetrics = &proxyMetricsCollector{}
|
||||
|
||||
func init() {
|
||||
registry := expvar.Get("registry")
|
||||
if registry == nil {
|
||||
registry = expvar.NewMap("registry")
|
||||
}
|
||||
|
||||
pm := registry.(*expvar.Map).Get("proxy")
|
||||
if pm == nil {
|
||||
pm = &expvar.Map{}
|
||||
pm.(*expvar.Map).Init()
|
||||
registry.(*expvar.Map).Set("proxy", pm)
|
||||
}
|
||||
|
||||
pm.(*expvar.Map).Set("blobs", expvar.Func(func() interface{} {
|
||||
return proxyMetrics.blobMetrics
|
||||
}))
|
||||
|
||||
pm.(*expvar.Map).Set("manifests", expvar.Func(func() interface{} {
|
||||
return proxyMetrics.manifestMetrics
|
||||
}))
|
||||
|
||||
}
|
263
vendor/github.com/docker/distribution/registry/proxy/proxyregistry.go
generated
vendored
Normal file
263
vendor/github.com/docker/distribution/registry/proxy/proxyregistry.go
generated
vendored
Normal file
|
@ -0,0 +1,263 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
"github.com/docker/distribution/configuration"
|
||||
dcontext "github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/docker/distribution/registry/client"
|
||||
"github.com/docker/distribution/registry/client/auth"
|
||||
"github.com/docker/distribution/registry/client/auth/challenge"
|
||||
"github.com/docker/distribution/registry/client/transport"
|
||||
"github.com/docker/distribution/registry/proxy/scheduler"
|
||||
"github.com/docker/distribution/registry/storage"
|
||||
"github.com/docker/distribution/registry/storage/driver"
|
||||
)
|
||||
|
||||
// proxyingRegistry fetches content from a remote registry and caches it locally
|
||||
type proxyingRegistry struct {
|
||||
embedded distribution.Namespace // provides local registry functionality
|
||||
scheduler *scheduler.TTLExpirationScheduler
|
||||
remoteURL url.URL
|
||||
authChallenger authChallenger
|
||||
}
|
||||
|
||||
// NewRegistryPullThroughCache creates a registry acting as a pull through cache
|
||||
func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) {
|
||||
remoteURL, err := url.Parse(config.RemoteURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
v := storage.NewVacuum(ctx, driver)
|
||||
s := scheduler.New(ctx, driver, "/scheduler-state.json")
|
||||
s.OnBlobExpire(func(ref reference.Reference) error {
|
||||
var r reference.Canonical
|
||||
var ok bool
|
||||
if r, ok = ref.(reference.Canonical); !ok {
|
||||
return fmt.Errorf("unexpected reference type : %T", ref)
|
||||
}
|
||||
|
||||
repo, err := registry.Repository(ctx, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
blobs := repo.Blobs(ctx)
|
||||
|
||||
// Clear the repository reference and descriptor caches
|
||||
err = blobs.Delete(ctx, r.Digest())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = v.RemoveBlob(r.Digest().String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
s.OnManifestExpire(func(ref reference.Reference) error {
|
||||
var r reference.Canonical
|
||||
var ok bool
|
||||
if r, ok = ref.(reference.Canonical); !ok {
|
||||
return fmt.Errorf("unexpected reference type : %T", ref)
|
||||
}
|
||||
|
||||
repo, err := registry.Repository(ctx, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
manifests, err := repo.Manifests(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = manifests.Delete(ctx, r.Digest())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
err = s.Start()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cs, err := configureAuth(config.Username, config.Password, config.RemoteURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &proxyingRegistry{
|
||||
embedded: registry,
|
||||
scheduler: s,
|
||||
remoteURL: *remoteURL,
|
||||
authChallenger: &remoteAuthChallenger{
|
||||
remoteURL: *remoteURL,
|
||||
cm: challenge.NewSimpleManager(),
|
||||
cs: cs,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (pr *proxyingRegistry) Scope() distribution.Scope {
|
||||
return distribution.GlobalScope
|
||||
}
|
||||
|
||||
func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
|
||||
return pr.embedded.Repositories(ctx, repos, last)
|
||||
}
|
||||
|
||||
func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named) (distribution.Repository, error) {
|
||||
c := pr.authChallenger
|
||||
|
||||
tkopts := auth.TokenHandlerOptions{
|
||||
Transport: http.DefaultTransport,
|
||||
Credentials: c.credentialStore(),
|
||||
Scopes: []auth.Scope{
|
||||
auth.RepositoryScope{
|
||||
Repository: name.Name(),
|
||||
Actions: []string{"pull"},
|
||||
},
|
||||
},
|
||||
Logger: dcontext.GetLogger(ctx),
|
||||
}
|
||||
|
||||
tr := transport.NewTransport(http.DefaultTransport,
|
||||
auth.NewAuthorizer(c.challengeManager(),
|
||||
auth.NewTokenHandlerWithOptions(tkopts)))
|
||||
|
||||
localRepo, err := pr.embedded.Repository(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
remoteRepo, err := client.NewRepository(name, pr.remoteURL.String(), tr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
remoteManifests, err := remoteRepo.Manifests(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &proxiedRepository{
|
||||
blobStore: &proxyBlobStore{
|
||||
localStore: localRepo.Blobs(ctx),
|
||||
remoteStore: remoteRepo.Blobs(ctx),
|
||||
scheduler: pr.scheduler,
|
||||
repositoryName: name,
|
||||
authChallenger: pr.authChallenger,
|
||||
},
|
||||
manifests: &proxyManifestStore{
|
||||
repositoryName: name,
|
||||
localManifests: localManifests, // Options?
|
||||
remoteManifests: remoteManifests,
|
||||
ctx: ctx,
|
||||
scheduler: pr.scheduler,
|
||||
authChallenger: pr.authChallenger,
|
||||
},
|
||||
name: name,
|
||||
tags: &proxyTagService{
|
||||
localTags: localRepo.Tags(ctx),
|
||||
remoteTags: remoteRepo.Tags(ctx),
|
||||
authChallenger: pr.authChallenger,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (pr *proxyingRegistry) Blobs() distribution.BlobEnumerator {
|
||||
return pr.embedded.Blobs()
|
||||
}
|
||||
|
||||
func (pr *proxyingRegistry) BlobStatter() distribution.BlobStatter {
|
||||
return pr.embedded.BlobStatter()
|
||||
}
|
||||
|
||||
// authChallenger encapsulates a request to the upstream to establish credential challenges
|
||||
type authChallenger interface {
|
||||
tryEstablishChallenges(context.Context) error
|
||||
challengeManager() challenge.Manager
|
||||
credentialStore() auth.CredentialStore
|
||||
}
|
||||
|
||||
type remoteAuthChallenger struct {
|
||||
remoteURL url.URL
|
||||
sync.Mutex
|
||||
cm challenge.Manager
|
||||
cs auth.CredentialStore
|
||||
}
|
||||
|
||||
func (r *remoteAuthChallenger) credentialStore() auth.CredentialStore {
|
||||
return r.cs
|
||||
}
|
||||
|
||||
func (r *remoteAuthChallenger) challengeManager() challenge.Manager {
|
||||
return r.cm
|
||||
}
|
||||
|
||||
// tryEstablishChallenges will attempt to get a challenge type for the upstream if none currently exist
|
||||
func (r *remoteAuthChallenger) tryEstablishChallenges(ctx context.Context) error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
remoteURL := r.remoteURL
|
||||
remoteURL.Path = "/v2/"
|
||||
challenges, err := r.cm.GetChallenges(remoteURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(challenges) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// establish challenge type with upstream
|
||||
if err := ping(r.cm, remoteURL.String(), challengeHeader); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dcontext.GetLogger(ctx).Infof("Challenge established with upstream : %s %s", remoteURL, r.cm)
|
||||
return nil
|
||||
}
|
||||
|
||||
// proxiedRepository uses proxying blob and manifest services to serve content
|
||||
// locally, or pulling it through from a remote and caching it locally if it doesn't
|
||||
// already exist
|
||||
type proxiedRepository struct {
|
||||
blobStore distribution.BlobStore
|
||||
manifests distribution.ManifestService
|
||||
name reference.Named
|
||||
tags distribution.TagService
|
||||
}
|
||||
|
||||
func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
|
||||
return pr.manifests, nil
|
||||
}
|
||||
|
||||
func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore {
|
||||
return pr.blobStore
|
||||
}
|
||||
|
||||
func (pr *proxiedRepository) Named() reference.Named {
|
||||
return pr.name
|
||||
}
|
||||
|
||||
func (pr *proxiedRepository) Tags(ctx context.Context) distribution.TagService {
|
||||
return pr.tags
|
||||
}
|
66
vendor/github.com/docker/distribution/registry/proxy/proxytagservice.go
generated
vendored
Normal file
66
vendor/github.com/docker/distribution/registry/proxy/proxytagservice.go
generated
vendored
Normal file
|
@ -0,0 +1,66 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
)
|
||||
|
||||
// proxyTagService supports local and remote lookup of tags.
|
||||
type proxyTagService struct {
|
||||
localTags distribution.TagService
|
||||
remoteTags distribution.TagService
|
||||
authChallenger authChallenger
|
||||
}
|
||||
|
||||
var _ distribution.TagService = proxyTagService{}
|
||||
|
||||
// Get attempts to get the most recent digest for the tag by checking the remote
|
||||
// tag service first and then caching it locally. If the remote is unavailable
|
||||
// the local association is returned
|
||||
func (pt proxyTagService) Get(ctx context.Context, tag string) (distribution.Descriptor, error) {
|
||||
err := pt.authChallenger.tryEstablishChallenges(ctx)
|
||||
if err == nil {
|
||||
desc, err := pt.remoteTags.Get(ctx, tag)
|
||||
if err == nil {
|
||||
err := pt.localTags.Tag(ctx, tag, desc)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
return desc, nil
|
||||
}
|
||||
}
|
||||
|
||||
desc, err := pt.localTags.Get(ctx, tag)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
return desc, nil
|
||||
}
|
||||
|
||||
func (pt proxyTagService) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error {
|
||||
return distribution.ErrUnsupported
|
||||
}
|
||||
|
||||
func (pt proxyTagService) Untag(ctx context.Context, tag string) error {
|
||||
err := pt.localTags.Untag(ctx, tag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pt proxyTagService) All(ctx context.Context) ([]string, error) {
|
||||
err := pt.authChallenger.tryEstablishChallenges(ctx)
|
||||
if err == nil {
|
||||
tags, err := pt.remoteTags.All(ctx)
|
||||
if err == nil {
|
||||
return tags, err
|
||||
}
|
||||
}
|
||||
return pt.localTags.All(ctx)
|
||||
}
|
||||
|
||||
func (pt proxyTagService) Lookup(ctx context.Context, digest distribution.Descriptor) ([]string, error) {
|
||||
return []string{}, distribution.ErrUnsupported
|
||||
}
|
182
vendor/github.com/docker/distribution/registry/proxy/proxytagservice_test.go
generated
vendored
Normal file
182
vendor/github.com/docker/distribution/registry/proxy/proxytagservice_test.go
generated
vendored
Normal file
|
@ -0,0 +1,182 @@
|
|||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
)
|
||||
|
||||
type mockTagStore struct {
|
||||
mapping map[string]distribution.Descriptor
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
var _ distribution.TagService = &mockTagStore{}
|
||||
|
||||
func (m *mockTagStore) Get(ctx context.Context, tag string) (distribution.Descriptor, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if d, ok := m.mapping[tag]; ok {
|
||||
return d, nil
|
||||
}
|
||||
return distribution.Descriptor{}, distribution.ErrTagUnknown{}
|
||||
}
|
||||
|
||||
func (m *mockTagStore) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.mapping[tag] = desc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockTagStore) Untag(ctx context.Context, tag string) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if _, ok := m.mapping[tag]; ok {
|
||||
delete(m.mapping, tag)
|
||||
return nil
|
||||
}
|
||||
return distribution.ErrTagUnknown{}
|
||||
}
|
||||
|
||||
func (m *mockTagStore) All(ctx context.Context) ([]string, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
var tags []string
|
||||
for tag := range m.mapping {
|
||||
tags = append(tags, tag)
|
||||
}
|
||||
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
func (m *mockTagStore) Lookup(ctx context.Context, digest distribution.Descriptor) ([]string, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func testProxyTagService(local, remote map[string]distribution.Descriptor) *proxyTagService {
|
||||
if local == nil {
|
||||
local = make(map[string]distribution.Descriptor)
|
||||
}
|
||||
if remote == nil {
|
||||
remote = make(map[string]distribution.Descriptor)
|
||||
}
|
||||
return &proxyTagService{
|
||||
localTags: &mockTagStore{mapping: local},
|
||||
remoteTags: &mockTagStore{mapping: remote},
|
||||
authChallenger: &mockChallenger{},
|
||||
}
|
||||
}
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
remoteDesc := distribution.Descriptor{Size: 42}
|
||||
remoteTag := "remote"
|
||||
proxyTags := testProxyTagService(map[string]distribution.Descriptor{remoteTag: remoteDesc}, nil)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Get pre-loaded tag
|
||||
d, err := proxyTags.Get(ctx, remoteTag)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if proxyTags.authChallenger.(*mockChallenger).count != 1 {
|
||||
t.Fatalf("Expected 1 auth challenge call, got %#v", proxyTags.authChallenger)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(d, remoteDesc) {
|
||||
t.Fatal("unable to get put tag")
|
||||
}
|
||||
|
||||
local, err := proxyTags.localTags.Get(ctx, remoteTag)
|
||||
if err != nil {
|
||||
t.Fatal("remote tag not pulled into store")
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(local, remoteDesc) {
|
||||
t.Fatalf("unexpected descriptor pulled through")
|
||||
}
|
||||
|
||||
// Manually overwrite remote tag
|
||||
newRemoteDesc := distribution.Descriptor{Size: 43}
|
||||
err = proxyTags.remoteTags.Tag(ctx, remoteTag, newRemoteDesc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
d, err = proxyTags.Get(ctx, remoteTag)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if proxyTags.authChallenger.(*mockChallenger).count != 2 {
|
||||
t.Fatalf("Expected 2 auth challenge calls, got %#v", proxyTags.authChallenger)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(d, newRemoteDesc) {
|
||||
t.Fatal("unable to get put tag")
|
||||
}
|
||||
|
||||
_, err = proxyTags.localTags.Get(ctx, remoteTag)
|
||||
if err != nil {
|
||||
t.Fatal("remote tag not pulled into store")
|
||||
}
|
||||
|
||||
// untag, ensure it's removed locally, but present in remote
|
||||
err = proxyTags.Untag(ctx, remoteTag)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = proxyTags.localTags.Get(ctx, remoteTag)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected error getting Untag'd tag")
|
||||
}
|
||||
|
||||
_, err = proxyTags.remoteTags.Get(ctx, remoteTag)
|
||||
if err != nil {
|
||||
t.Fatalf("remote tag should not be untagged with proxyTag.Untag")
|
||||
}
|
||||
|
||||
_, err = proxyTags.Get(ctx, remoteTag)
|
||||
if err != nil {
|
||||
t.Fatal("untagged tag should be pulled through")
|
||||
}
|
||||
|
||||
if proxyTags.authChallenger.(*mockChallenger).count != 3 {
|
||||
t.Fatalf("Expected 3 auth challenge calls, got %#v", proxyTags.authChallenger)
|
||||
}
|
||||
|
||||
// Add another tag. Ensure both tags appear in 'All'
|
||||
err = proxyTags.remoteTags.Tag(ctx, "funtag", distribution.Descriptor{Size: 42})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
all, err := proxyTags.All(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(all) != 2 {
|
||||
t.Fatalf("Unexpected tag length returned from All() : %d ", len(all))
|
||||
}
|
||||
|
||||
sort.Strings(all)
|
||||
if all[0] != "funtag" && all[1] != "remote" {
|
||||
t.Fatalf("Unexpected tags returned from All() : %v ", all)
|
||||
}
|
||||
|
||||
if proxyTags.authChallenger.(*mockChallenger).count != 4 {
|
||||
t.Fatalf("Expected 4 auth challenge calls, got %#v", proxyTags.authChallenger)
|
||||
}
|
||||
}
|
260
vendor/github.com/docker/distribution/registry/proxy/scheduler/scheduler.go
generated
vendored
Normal file
260
vendor/github.com/docker/distribution/registry/proxy/scheduler/scheduler.go
generated
vendored
Normal file
|
@ -0,0 +1,260 @@
|
|||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
dcontext "github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/docker/distribution/registry/storage/driver"
|
||||
)
|
||||
|
||||
// onTTLExpiryFunc is called when a repository's TTL expires
|
||||
type expiryFunc func(reference.Reference) error
|
||||
|
||||
const (
|
||||
entryTypeBlob = iota
|
||||
entryTypeManifest
|
||||
indexSaveFrequency = 5 * time.Second
|
||||
)
|
||||
|
||||
// schedulerEntry represents an entry in the scheduler
|
||||
// fields are exported for serialization
|
||||
type schedulerEntry struct {
|
||||
Key string `json:"Key"`
|
||||
Expiry time.Time `json:"ExpiryData"`
|
||||
EntryType int `json:"EntryType"`
|
||||
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
// New returns a new instance of the scheduler
|
||||
func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler {
|
||||
return &TTLExpirationScheduler{
|
||||
entries: make(map[string]*schedulerEntry),
|
||||
driver: driver,
|
||||
pathToStateFile: path,
|
||||
ctx: ctx,
|
||||
stopped: true,
|
||||
doneChan: make(chan struct{}),
|
||||
saveTimer: time.NewTicker(indexSaveFrequency),
|
||||
}
|
||||
}
|
||||
|
||||
// TTLExpirationScheduler is a scheduler used to perform actions
|
||||
// when TTLs expire
|
||||
type TTLExpirationScheduler struct {
|
||||
sync.Mutex
|
||||
|
||||
entries map[string]*schedulerEntry
|
||||
|
||||
driver driver.StorageDriver
|
||||
ctx context.Context
|
||||
pathToStateFile string
|
||||
|
||||
stopped bool
|
||||
|
||||
onBlobExpire expiryFunc
|
||||
onManifestExpire expiryFunc
|
||||
|
||||
indexDirty bool
|
||||
saveTimer *time.Ticker
|
||||
doneChan chan struct{}
|
||||
}
|
||||
|
||||
// OnBlobExpire is called when a scheduled blob's TTL expires
|
||||
func (ttles *TTLExpirationScheduler) OnBlobExpire(f expiryFunc) {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
ttles.onBlobExpire = f
|
||||
}
|
||||
|
||||
// OnManifestExpire is called when a scheduled manifest's TTL expires
|
||||
func (ttles *TTLExpirationScheduler) OnManifestExpire(f expiryFunc) {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
ttles.onManifestExpire = f
|
||||
}
|
||||
|
||||
// AddBlob schedules a blob cleanup after ttl expires
|
||||
func (ttles *TTLExpirationScheduler) AddBlob(blobRef reference.Canonical, ttl time.Duration) error {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
if ttles.stopped {
|
||||
return fmt.Errorf("scheduler not started")
|
||||
}
|
||||
|
||||
ttles.add(blobRef, ttl, entryTypeBlob)
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddManifest schedules a manifest cleanup after ttl expires
|
||||
func (ttles *TTLExpirationScheduler) AddManifest(manifestRef reference.Canonical, ttl time.Duration) error {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
if ttles.stopped {
|
||||
return fmt.Errorf("scheduler not started")
|
||||
}
|
||||
|
||||
ttles.add(manifestRef, ttl, entryTypeManifest)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start starts the scheduler
|
||||
func (ttles *TTLExpirationScheduler) Start() error {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
err := ttles.readState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !ttles.stopped {
|
||||
return fmt.Errorf("Scheduler already started")
|
||||
}
|
||||
|
||||
dcontext.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...")
|
||||
ttles.stopped = false
|
||||
|
||||
// Start timer for each deserialized entry
|
||||
for _, entry := range ttles.entries {
|
||||
entry.timer = ttles.startTimer(entry, entry.Expiry.Sub(time.Now()))
|
||||
}
|
||||
|
||||
// Start a ticker to periodically save the entries index
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ttles.saveTimer.C:
|
||||
ttles.Lock()
|
||||
if !ttles.indexDirty {
|
||||
ttles.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
err := ttles.writeState()
|
||||
if err != nil {
|
||||
dcontext.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||
} else {
|
||||
ttles.indexDirty = false
|
||||
}
|
||||
ttles.Unlock()
|
||||
|
||||
case <-ttles.doneChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ttles *TTLExpirationScheduler) add(r reference.Reference, ttl time.Duration, eType int) {
|
||||
entry := &schedulerEntry{
|
||||
Key: r.String(),
|
||||
Expiry: time.Now().Add(ttl),
|
||||
EntryType: eType,
|
||||
}
|
||||
dcontext.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, entry.Expiry.Sub(time.Now()))
|
||||
if oldEntry, present := ttles.entries[entry.Key]; present && oldEntry.timer != nil {
|
||||
oldEntry.timer.Stop()
|
||||
}
|
||||
ttles.entries[entry.Key] = entry
|
||||
entry.timer = ttles.startTimer(entry, ttl)
|
||||
ttles.indexDirty = true
|
||||
}
|
||||
|
||||
func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.Duration) *time.Timer {
|
||||
return time.AfterFunc(ttl, func() {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
var f expiryFunc
|
||||
|
||||
switch entry.EntryType {
|
||||
case entryTypeBlob:
|
||||
f = ttles.onBlobExpire
|
||||
case entryTypeManifest:
|
||||
f = ttles.onManifestExpire
|
||||
default:
|
||||
f = func(reference.Reference) error {
|
||||
return fmt.Errorf("scheduler entry type")
|
||||
}
|
||||
}
|
||||
|
||||
ref, err := reference.Parse(entry.Key)
|
||||
if err == nil {
|
||||
if err := f(ref); err != nil {
|
||||
dcontext.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
|
||||
}
|
||||
} else {
|
||||
dcontext.GetLogger(ttles.ctx).Errorf("Error unpacking reference: %s", err)
|
||||
}
|
||||
|
||||
delete(ttles.entries, entry.Key)
|
||||
ttles.indexDirty = true
|
||||
})
|
||||
}
|
||||
|
||||
// Stop stops the scheduler.
|
||||
func (ttles *TTLExpirationScheduler) Stop() {
|
||||
ttles.Lock()
|
||||
defer ttles.Unlock()
|
||||
|
||||
if err := ttles.writeState(); err != nil {
|
||||
dcontext.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||
}
|
||||
|
||||
for _, entry := range ttles.entries {
|
||||
entry.timer.Stop()
|
||||
}
|
||||
|
||||
close(ttles.doneChan)
|
||||
ttles.saveTimer.Stop()
|
||||
ttles.stopped = true
|
||||
}
|
||||
|
||||
func (ttles *TTLExpirationScheduler) writeState() error {
|
||||
jsonBytes, err := json.Marshal(ttles.entries)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = ttles.driver.PutContent(ttles.ctx, ttles.pathToStateFile, jsonBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ttles *TTLExpirationScheduler) readState() error {
|
||||
if _, err := ttles.driver.Stat(ttles.ctx, ttles.pathToStateFile); err != nil {
|
||||
switch err := err.(type) {
|
||||
case driver.PathNotFoundError:
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
bytes, err := ttles.driver.GetContent(ttles.ctx, ttles.pathToStateFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(bytes, &ttles.entries)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
211
vendor/github.com/docker/distribution/registry/proxy/scheduler/scheduler_test.go
generated
vendored
Normal file
211
vendor/github.com/docker/distribution/registry/proxy/scheduler/scheduler_test.go
generated
vendored
Normal file
|
@ -0,0 +1,211 @@
|
|||
package scheduler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/distribution/context"
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||
)
|
||||
|
||||
func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference.Reference) {
|
||||
ref1, err := reference.Parse("testrepo@sha256:aaaaeaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
|
||||
if err != nil {
|
||||
t.Fatalf("could not parse reference: %v", err)
|
||||
}
|
||||
|
||||
ref2, err := reference.Parse("testrepo@sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
|
||||
if err != nil {
|
||||
t.Fatalf("could not parse reference: %v", err)
|
||||
}
|
||||
|
||||
ref3, err := reference.Parse("testrepo@sha256:cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc")
|
||||
if err != nil {
|
||||
t.Fatalf("could not parse reference: %v", err)
|
||||
}
|
||||
|
||||
return ref1, ref2, ref3
|
||||
}
|
||||
|
||||
func TestSchedule(t *testing.T) {
|
||||
ref1, ref2, ref3 := testRefs(t)
|
||||
timeUnit := time.Millisecond
|
||||
remainingRepos := map[string]bool{
|
||||
ref1.String(): true,
|
||||
ref2.String(): true,
|
||||
ref3.String(): true,
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
s := New(context.Background(), inmemory.New(), "/ttl")
|
||||
deleteFunc := func(repoName reference.Reference) error {
|
||||
if len(remainingRepos) == 0 {
|
||||
t.Fatalf("Incorrect expiry count")
|
||||
}
|
||||
_, ok := remainingRepos[repoName.String()]
|
||||
if !ok {
|
||||
t.Fatalf("Trying to remove nonexistent repo: %s", repoName)
|
||||
}
|
||||
t.Log("removing", repoName)
|
||||
mu.Lock()
|
||||
delete(remainingRepos, repoName.String())
|
||||
mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
s.onBlobExpire = deleteFunc
|
||||
err := s.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
||||
}
|
||||
|
||||
s.add(ref1, 3*timeUnit, entryTypeBlob)
|
||||
s.add(ref2, 1*timeUnit, entryTypeBlob)
|
||||
|
||||
func() {
|
||||
s.Lock()
|
||||
s.add(ref3, 1*timeUnit, entryTypeBlob)
|
||||
s.Unlock()
|
||||
|
||||
}()
|
||||
|
||||
// Ensure all repos are deleted
|
||||
<-time.After(50 * timeUnit)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if len(remainingRepos) != 0 {
|
||||
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestoreOld(t *testing.T) {
|
||||
ref1, ref2, _ := testRefs(t)
|
||||
remainingRepos := map[string]bool{
|
||||
ref1.String(): true,
|
||||
ref2.String(): true,
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(remainingRepos))
|
||||
var mu sync.Mutex
|
||||
deleteFunc := func(r reference.Reference) error {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if r.String() == ref1.String() && len(remainingRepos) == 2 {
|
||||
t.Errorf("ref1 should not be removed first")
|
||||
}
|
||||
_, ok := remainingRepos[r.String()]
|
||||
if !ok {
|
||||
t.Fatalf("Trying to remove nonexistent repo: %s", r)
|
||||
}
|
||||
delete(remainingRepos, r.String())
|
||||
wg.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
timeUnit := time.Millisecond
|
||||
serialized, err := json.Marshal(&map[string]schedulerEntry{
|
||||
ref1.String(): {
|
||||
Expiry: time.Now().Add(10 * timeUnit),
|
||||
Key: ref1.String(),
|
||||
EntryType: 0,
|
||||
},
|
||||
ref2.String(): {
|
||||
Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first
|
||||
Key: ref2.String(),
|
||||
EntryType: 0,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Error serializing test data: %s", err.Error())
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
pathToStatFile := "/ttl"
|
||||
fs := inmemory.New()
|
||||
err = fs.PutContent(ctx, pathToStatFile, serialized)
|
||||
if err != nil {
|
||||
t.Fatal("Unable to write serialized data to fs")
|
||||
}
|
||||
s := New(context.Background(), fs, "/ttl")
|
||||
s.OnBlobExpire(deleteFunc)
|
||||
err = s.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
||||
}
|
||||
defer s.Stop()
|
||||
|
||||
wg.Wait()
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if len(remainingRepos) != 0 {
|
||||
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStopRestore(t *testing.T) {
|
||||
ref1, ref2, _ := testRefs(t)
|
||||
|
||||
timeUnit := time.Millisecond
|
||||
remainingRepos := map[string]bool{
|
||||
ref1.String(): true,
|
||||
ref2.String(): true,
|
||||
}
|
||||
|
||||
var mu sync.Mutex
|
||||
deleteFunc := func(r reference.Reference) error {
|
||||
mu.Lock()
|
||||
delete(remainingRepos, r.String())
|
||||
mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
fs := inmemory.New()
|
||||
pathToStateFile := "/ttl"
|
||||
s := New(context.Background(), fs, pathToStateFile)
|
||||
s.onBlobExpire = deleteFunc
|
||||
|
||||
err := s.Start()
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
s.add(ref1, 300*timeUnit, entryTypeBlob)
|
||||
s.add(ref2, 100*timeUnit, entryTypeBlob)
|
||||
|
||||
// Start and stop before all operations complete
|
||||
// state will be written to fs
|
||||
s.Stop()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// v2 will restore state from fs
|
||||
s2 := New(context.Background(), fs, pathToStateFile)
|
||||
s2.onBlobExpire = deleteFunc
|
||||
err = s2.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Error starting v2: %s", err.Error())
|
||||
}
|
||||
|
||||
<-time.After(500 * timeUnit)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if len(remainingRepos) != 0 {
|
||||
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestDoubleStart(t *testing.T) {
|
||||
s := New(context.Background(), inmemory.New(), "/ttl")
|
||||
err := s.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to start scheduler")
|
||||
}
|
||||
err = s.Start()
|
||||
if err == nil {
|
||||
t.Fatalf("Scheduler started twice without error")
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue