Merge pull request #1925 from dmcgowan/reenable-race-detector
Re-enable race detector in circleci
This commit is contained in:
commit
49da29ee46
8 changed files with 126 additions and 30 deletions
|
@ -77,13 +77,16 @@ test:
|
||||||
timeout: 1000
|
timeout: 1000
|
||||||
pwd: $BASE_STABLE
|
pwd: $BASE_STABLE
|
||||||
|
|
||||||
|
# Test stable with race
|
||||||
|
- gvm use stable; export ROOT_PACKAGE=$(go list .); go list -tags "$DOCKER_BUILDTAGS" ./... | grep -v "/vendor/" | grep -v "registry/handlers" | grep -v "registry/storage/driver" | xargs -L 1 -I{} bash -c 'export PACKAGE={}; godep go test -race -tags "$DOCKER_BUILDTAGS" -test.short $PACKAGE':
|
||||||
|
timeout: 1000
|
||||||
|
pwd: $BASE_STABLE
|
||||||
post:
|
post:
|
||||||
# Report to codecov
|
# Report to codecov
|
||||||
- bash <(curl -s https://codecov.io/bash):
|
- bash <(curl -s https://codecov.io/bash):
|
||||||
pwd: $BASE_STABLE
|
pwd: $BASE_STABLE
|
||||||
|
|
||||||
## Notes
|
## Notes
|
||||||
# Disabled the -race detector due to massive memory usage.
|
|
||||||
# Do we want these as well?
|
# Do we want these as well?
|
||||||
# - go get code.google.com/p/go.tools/cmd/goimports
|
# - go get code.google.com/p/go.tools/cmd/goimports
|
||||||
# - test -z "$(goimports -l -w ./... | tee /dev/stderr)"
|
# - test -z "$(goimports -l -w ./... | tee /dev/stderr)"
|
||||||
|
|
|
@ -43,6 +43,7 @@ var headerConfig = http.Header{
|
||||||
// 200 OK response.
|
// 200 OK response.
|
||||||
func TestCheckAPI(t *testing.T) {
|
func TestCheckAPI(t *testing.T) {
|
||||||
env := newTestEnv(t, false)
|
env := newTestEnv(t, false)
|
||||||
|
defer env.Shutdown()
|
||||||
baseURL, err := env.builder.BuildBaseURL()
|
baseURL, err := env.builder.BuildBaseURL()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error building base url: %v", err)
|
t.Fatalf("unexpected error building base url: %v", err)
|
||||||
|
@ -74,6 +75,7 @@ func TestCheckAPI(t *testing.T) {
|
||||||
func TestCatalogAPI(t *testing.T) {
|
func TestCatalogAPI(t *testing.T) {
|
||||||
chunkLen := 2
|
chunkLen := 2
|
||||||
env := newTestEnv(t, false)
|
env := newTestEnv(t, false)
|
||||||
|
defer env.Shutdown()
|
||||||
|
|
||||||
values := url.Values{
|
values := url.Values{
|
||||||
"last": []string{""},
|
"last": []string{""},
|
||||||
|
@ -220,12 +222,16 @@ func TestURLPrefix(t *testing.T) {
|
||||||
config := configuration.Configuration{
|
config := configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"testdriver": configuration.Parameters{},
|
"testdriver": configuration.Parameters{},
|
||||||
|
"maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{
|
||||||
|
"enabled": false,
|
||||||
|
}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
config.HTTP.Prefix = "/test/"
|
config.HTTP.Prefix = "/test/"
|
||||||
config.HTTP.Headers = headerConfig
|
config.HTTP.Headers = headerConfig
|
||||||
|
|
||||||
env := newTestEnvWithConfig(t, &config)
|
env := newTestEnvWithConfig(t, &config)
|
||||||
|
defer env.Shutdown()
|
||||||
|
|
||||||
baseURL, err := env.builder.BuildBaseURL()
|
baseURL, err := env.builder.BuildBaseURL()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -273,20 +279,23 @@ func makeBlobArgs(t *testing.T) blobArgs {
|
||||||
// TestBlobAPI conducts a full test of the of the blob api.
|
// TestBlobAPI conducts a full test of the of the blob api.
|
||||||
func TestBlobAPI(t *testing.T) {
|
func TestBlobAPI(t *testing.T) {
|
||||||
deleteEnabled := false
|
deleteEnabled := false
|
||||||
env := newTestEnv(t, deleteEnabled)
|
env1 := newTestEnv(t, deleteEnabled)
|
||||||
|
defer env1.Shutdown()
|
||||||
args := makeBlobArgs(t)
|
args := makeBlobArgs(t)
|
||||||
testBlobAPI(t, env, args)
|
testBlobAPI(t, env1, args)
|
||||||
|
|
||||||
deleteEnabled = true
|
deleteEnabled = true
|
||||||
env = newTestEnv(t, deleteEnabled)
|
env2 := newTestEnv(t, deleteEnabled)
|
||||||
|
defer env2.Shutdown()
|
||||||
args = makeBlobArgs(t)
|
args = makeBlobArgs(t)
|
||||||
testBlobAPI(t, env, args)
|
testBlobAPI(t, env2, args)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBlobDelete(t *testing.T) {
|
func TestBlobDelete(t *testing.T) {
|
||||||
deleteEnabled := true
|
deleteEnabled := true
|
||||||
env := newTestEnv(t, deleteEnabled)
|
env := newTestEnv(t, deleteEnabled)
|
||||||
|
defer env.Shutdown()
|
||||||
|
|
||||||
args := makeBlobArgs(t)
|
args := makeBlobArgs(t)
|
||||||
env = testBlobAPI(t, env, args)
|
env = testBlobAPI(t, env, args)
|
||||||
|
@ -297,11 +306,15 @@ func TestRelativeURL(t *testing.T) {
|
||||||
config := configuration.Configuration{
|
config := configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"testdriver": configuration.Parameters{},
|
"testdriver": configuration.Parameters{},
|
||||||
|
"maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{
|
||||||
|
"enabled": false,
|
||||||
|
}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
config.HTTP.Headers = headerConfig
|
config.HTTP.Headers = headerConfig
|
||||||
config.HTTP.RelativeURLs = false
|
config.HTTP.RelativeURLs = false
|
||||||
env := newTestEnvWithConfig(t, &config)
|
env := newTestEnvWithConfig(t, &config)
|
||||||
|
defer env.Shutdown()
|
||||||
ref, _ := reference.WithName("foo/bar")
|
ref, _ := reference.WithName("foo/bar")
|
||||||
uploadURLBaseAbs, _ := startPushLayer(t, env, ref)
|
uploadURLBaseAbs, _ := startPushLayer(t, env, ref)
|
||||||
|
|
||||||
|
@ -369,6 +382,7 @@ func TestRelativeURL(t *testing.T) {
|
||||||
func TestBlobDeleteDisabled(t *testing.T) {
|
func TestBlobDeleteDisabled(t *testing.T) {
|
||||||
deleteEnabled := false
|
deleteEnabled := false
|
||||||
env := newTestEnv(t, deleteEnabled)
|
env := newTestEnv(t, deleteEnabled)
|
||||||
|
defer env.Shutdown()
|
||||||
args := makeBlobArgs(t)
|
args := makeBlobArgs(t)
|
||||||
|
|
||||||
imageName := args.imageName
|
imageName := args.imageName
|
||||||
|
@ -684,6 +698,7 @@ func testBlobDelete(t *testing.T, env *testEnv, args blobArgs) {
|
||||||
|
|
||||||
func TestDeleteDisabled(t *testing.T) {
|
func TestDeleteDisabled(t *testing.T) {
|
||||||
env := newTestEnv(t, false)
|
env := newTestEnv(t, false)
|
||||||
|
defer env.Shutdown()
|
||||||
|
|
||||||
imageName, _ := reference.ParseNamed("foo/bar")
|
imageName, _ := reference.ParseNamed("foo/bar")
|
||||||
// "build" our layer file
|
// "build" our layer file
|
||||||
|
@ -710,6 +725,7 @@ func TestDeleteDisabled(t *testing.T) {
|
||||||
|
|
||||||
func TestDeleteReadOnly(t *testing.T) {
|
func TestDeleteReadOnly(t *testing.T) {
|
||||||
env := newTestEnv(t, true)
|
env := newTestEnv(t, true)
|
||||||
|
defer env.Shutdown()
|
||||||
|
|
||||||
imageName, _ := reference.ParseNamed("foo/bar")
|
imageName, _ := reference.ParseNamed("foo/bar")
|
||||||
// "build" our layer file
|
// "build" our layer file
|
||||||
|
@ -738,6 +754,7 @@ func TestDeleteReadOnly(t *testing.T) {
|
||||||
|
|
||||||
func TestStartPushReadOnly(t *testing.T) {
|
func TestStartPushReadOnly(t *testing.T) {
|
||||||
env := newTestEnv(t, true)
|
env := newTestEnv(t, true)
|
||||||
|
defer env.Shutdown()
|
||||||
env.app.readOnly = true
|
env.app.readOnly = true
|
||||||
|
|
||||||
imageName, _ := reference.ParseNamed("foo/bar")
|
imageName, _ := reference.ParseNamed("foo/bar")
|
||||||
|
@ -782,16 +799,18 @@ func TestManifestAPI(t *testing.T) {
|
||||||
schema2Repo, _ := reference.ParseNamed("foo/schema2")
|
schema2Repo, _ := reference.ParseNamed("foo/schema2")
|
||||||
|
|
||||||
deleteEnabled := false
|
deleteEnabled := false
|
||||||
env := newTestEnv(t, deleteEnabled)
|
env1 := newTestEnv(t, deleteEnabled)
|
||||||
testManifestAPISchema1(t, env, schema1Repo)
|
defer env1.Shutdown()
|
||||||
schema2Args := testManifestAPISchema2(t, env, schema2Repo)
|
testManifestAPISchema1(t, env1, schema1Repo)
|
||||||
testManifestAPIManifestList(t, env, schema2Args)
|
schema2Args := testManifestAPISchema2(t, env1, schema2Repo)
|
||||||
|
testManifestAPIManifestList(t, env1, schema2Args)
|
||||||
|
|
||||||
deleteEnabled = true
|
deleteEnabled = true
|
||||||
env = newTestEnv(t, deleteEnabled)
|
env2 := newTestEnv(t, deleteEnabled)
|
||||||
testManifestAPISchema1(t, env, schema1Repo)
|
defer env2.Shutdown()
|
||||||
schema2Args = testManifestAPISchema2(t, env, schema2Repo)
|
testManifestAPISchema1(t, env2, schema1Repo)
|
||||||
testManifestAPIManifestList(t, env, schema2Args)
|
schema2Args = testManifestAPISchema2(t, env2, schema2Repo)
|
||||||
|
testManifestAPIManifestList(t, env2, schema2Args)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestManifestDelete(t *testing.T) {
|
func TestManifestDelete(t *testing.T) {
|
||||||
|
@ -800,6 +819,7 @@ func TestManifestDelete(t *testing.T) {
|
||||||
|
|
||||||
deleteEnabled := true
|
deleteEnabled := true
|
||||||
env := newTestEnv(t, deleteEnabled)
|
env := newTestEnv(t, deleteEnabled)
|
||||||
|
defer env.Shutdown()
|
||||||
schema1Args := testManifestAPISchema1(t, env, schema1Repo)
|
schema1Args := testManifestAPISchema1(t, env, schema1Repo)
|
||||||
testManifestDelete(t, env, schema1Args)
|
testManifestDelete(t, env, schema1Args)
|
||||||
schema2Args := testManifestAPISchema2(t, env, schema2Repo)
|
schema2Args := testManifestAPISchema2(t, env, schema2Repo)
|
||||||
|
@ -810,6 +830,7 @@ func TestManifestDeleteDisabled(t *testing.T) {
|
||||||
schema1Repo, _ := reference.ParseNamed("foo/schema1")
|
schema1Repo, _ := reference.ParseNamed("foo/schema1")
|
||||||
deleteEnabled := false
|
deleteEnabled := false
|
||||||
env := newTestEnv(t, deleteEnabled)
|
env := newTestEnv(t, deleteEnabled)
|
||||||
|
defer env.Shutdown()
|
||||||
testManifestDeleteDisabled(t, env, schema1Repo)
|
testManifestDeleteDisabled(t, env, schema1Repo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1886,6 +1907,9 @@ func newTestEnvMirror(t *testing.T, deleteEnabled bool) *testEnv {
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"testdriver": configuration.Parameters{},
|
"testdriver": configuration.Parameters{},
|
||||||
"delete": configuration.Parameters{"enabled": deleteEnabled},
|
"delete": configuration.Parameters{"enabled": deleteEnabled},
|
||||||
|
"maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{
|
||||||
|
"enabled": false,
|
||||||
|
}},
|
||||||
},
|
},
|
||||||
Proxy: configuration.Proxy{
|
Proxy: configuration.Proxy{
|
||||||
RemoteURL: "http://example.com",
|
RemoteURL: "http://example.com",
|
||||||
|
@ -1901,6 +1925,9 @@ func newTestEnv(t *testing.T, deleteEnabled bool) *testEnv {
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"testdriver": configuration.Parameters{},
|
"testdriver": configuration.Parameters{},
|
||||||
"delete": configuration.Parameters{"enabled": deleteEnabled},
|
"delete": configuration.Parameters{"enabled": deleteEnabled},
|
||||||
|
"maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{
|
||||||
|
"enabled": false,
|
||||||
|
}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1935,6 +1962,11 @@ func newTestEnvWithConfig(t *testing.T, config *configuration.Configuration) *te
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *testEnv) Shutdown() {
|
||||||
|
t.server.CloseClientConnections()
|
||||||
|
t.server.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func putManifest(t *testing.T, msg, url, contentType string, v interface{}) *http.Response {
|
func putManifest(t *testing.T, msg, url, contentType string, v interface{}) *http.Response {
|
||||||
var body []byte
|
var body []byte
|
||||||
|
|
||||||
|
@ -2328,6 +2360,7 @@ func createRepository(env *testEnv, t *testing.T, imageName string, tag string)
|
||||||
func TestRegistryAsCacheMutationAPIs(t *testing.T) {
|
func TestRegistryAsCacheMutationAPIs(t *testing.T) {
|
||||||
deleteEnabled := true
|
deleteEnabled := true
|
||||||
env := newTestEnvMirror(t, deleteEnabled)
|
env := newTestEnvMirror(t, deleteEnabled)
|
||||||
|
defer env.Shutdown()
|
||||||
|
|
||||||
imageName, _ := reference.ParseNamed("foo/bar")
|
imageName, _ := reference.ParseNamed("foo/bar")
|
||||||
tag := "latest"
|
tag := "latest"
|
||||||
|
@ -2386,6 +2419,7 @@ func TestRegistryAsCacheMutationAPIs(t *testing.T) {
|
||||||
// that implements http.ContextNotifier.
|
// that implements http.ContextNotifier.
|
||||||
func TestCheckContextNotifier(t *testing.T) {
|
func TestCheckContextNotifier(t *testing.T) {
|
||||||
env := newTestEnv(t, false)
|
env := newTestEnv(t, false)
|
||||||
|
defer env.Shutdown()
|
||||||
|
|
||||||
// Register a new endpoint for testing
|
// Register a new endpoint for testing
|
||||||
env.app.router.Handle("/unittest/{name}/", env.app.dispatcher(func(ctx *Context, r *http.Request) http.Handler {
|
env.app.router.Handle("/unittest/{name}/", env.app.dispatcher(func(ctx *Context, r *http.Request) http.Handler {
|
||||||
|
@ -2414,6 +2448,9 @@ func TestProxyManifestGetByTag(t *testing.T) {
|
||||||
truthConfig := configuration.Configuration{
|
truthConfig := configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"testdriver": configuration.Parameters{},
|
"testdriver": configuration.Parameters{},
|
||||||
|
"maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{
|
||||||
|
"enabled": false,
|
||||||
|
}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
truthConfig.HTTP.Headers = headerConfig
|
truthConfig.HTTP.Headers = headerConfig
|
||||||
|
@ -2422,6 +2459,7 @@ func TestProxyManifestGetByTag(t *testing.T) {
|
||||||
tag := "latest"
|
tag := "latest"
|
||||||
|
|
||||||
truthEnv := newTestEnvWithConfig(t, &truthConfig)
|
truthEnv := newTestEnvWithConfig(t, &truthConfig)
|
||||||
|
defer truthEnv.Shutdown()
|
||||||
// create a repository in the truth registry
|
// create a repository in the truth registry
|
||||||
dgst := createRepository(truthEnv, t, imageName.Name(), tag)
|
dgst := createRepository(truthEnv, t, imageName.Name(), tag)
|
||||||
|
|
||||||
|
@ -2436,6 +2474,7 @@ func TestProxyManifestGetByTag(t *testing.T) {
|
||||||
proxyConfig.HTTP.Headers = headerConfig
|
proxyConfig.HTTP.Headers = headerConfig
|
||||||
|
|
||||||
proxyEnv := newTestEnvWithConfig(t, &proxyConfig)
|
proxyEnv := newTestEnvWithConfig(t, &proxyConfig)
|
||||||
|
defer proxyEnv.Shutdown()
|
||||||
|
|
||||||
digestRef, _ := reference.WithDigest(imageName, dgst)
|
digestRef, _ := reference.WithDigest(imageName, dgst)
|
||||||
manifestDigestURL, err := proxyEnv.builder.BuildManifestURL(digestRef)
|
manifestDigestURL, err := proxyEnv.builder.BuildManifestURL(digestRef)
|
||||||
|
|
|
@ -38,6 +38,7 @@ func TestAppDispatcher(t *testing.T) {
|
||||||
registry: registry,
|
registry: registry,
|
||||||
}
|
}
|
||||||
server := httptest.NewServer(app)
|
server := httptest.NewServer(app)
|
||||||
|
defer server.Close()
|
||||||
router := v2.Router()
|
router := v2.Router()
|
||||||
|
|
||||||
serverURL, err := url.Parse(server.URL)
|
serverURL, err := url.Parse(server.URL)
|
||||||
|
@ -143,6 +144,9 @@ func TestNewApp(t *testing.T) {
|
||||||
config := configuration.Configuration{
|
config := configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"testdriver": nil,
|
"testdriver": nil,
|
||||||
|
"maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{
|
||||||
|
"enabled": false,
|
||||||
|
}},
|
||||||
},
|
},
|
||||||
Auth: configuration.Auth{
|
Auth: configuration.Auth{
|
||||||
// For now, we simply test that new auth results in a viable
|
// For now, we simply test that new auth results in a viable
|
||||||
|
@ -160,6 +164,7 @@ func TestNewApp(t *testing.T) {
|
||||||
app := NewApp(ctx, &config)
|
app := NewApp(ctx, &config)
|
||||||
|
|
||||||
server := httptest.NewServer(app)
|
server := httptest.NewServer(app)
|
||||||
|
defer server.Close()
|
||||||
builder, err := v2.NewURLBuilderFromString(server.URL, false)
|
builder, err := v2.NewURLBuilderFromString(server.URL, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error creating urlbuilder: %v", err)
|
t.Fatalf("error creating urlbuilder: %v", err)
|
||||||
|
|
|
@ -26,6 +26,9 @@ func TestFileHealthCheck(t *testing.T) {
|
||||||
config := &configuration.Configuration{
|
config := &configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"inmemory": configuration.Parameters{},
|
"inmemory": configuration.Parameters{},
|
||||||
|
"maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{
|
||||||
|
"enabled": false,
|
||||||
|
}},
|
||||||
},
|
},
|
||||||
Health: configuration.Health{
|
Health: configuration.Health{
|
||||||
FileCheckers: []configuration.FileChecker{
|
FileCheckers: []configuration.FileChecker{
|
||||||
|
@ -86,6 +89,9 @@ func TestTCPHealthCheck(t *testing.T) {
|
||||||
config := &configuration.Configuration{
|
config := &configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"inmemory": configuration.Parameters{},
|
"inmemory": configuration.Parameters{},
|
||||||
|
"maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{
|
||||||
|
"enabled": false,
|
||||||
|
}},
|
||||||
},
|
},
|
||||||
Health: configuration.Health{
|
Health: configuration.Health{
|
||||||
TCPCheckers: []configuration.TCPChecker{
|
TCPCheckers: []configuration.TCPChecker{
|
||||||
|
@ -145,6 +151,9 @@ func TestHTTPHealthCheck(t *testing.T) {
|
||||||
config := &configuration.Configuration{
|
config := &configuration.Configuration{
|
||||||
Storage: configuration.Storage{
|
Storage: configuration.Storage{
|
||||||
"inmemory": configuration.Parameters{},
|
"inmemory": configuration.Parameters{},
|
||||||
|
"maintenance": configuration.Parameters{"uploadpurging": map[interface{}]interface{}{
|
||||||
|
"enabled": false,
|
||||||
|
}},
|
||||||
},
|
},
|
||||||
Health: configuration.Health{
|
Health: configuration.Health{
|
||||||
HTTPCheckers: []configuration.HTTPChecker{
|
HTTPCheckers: []configuration.HTTPChecker{
|
||||||
|
|
|
@ -370,15 +370,20 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
remoteBlobCount := len(te.inRemote)
|
remoteBlobCount := len(te.inRemote)
|
||||||
|
sbsMu.Lock()
|
||||||
if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique {
|
if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique {
|
||||||
|
sbsMu.Unlock()
|
||||||
t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount)
|
t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount)
|
||||||
}
|
}
|
||||||
|
sbsMu.Unlock()
|
||||||
|
|
||||||
// Wait for any async storage goroutines to finish
|
// Wait for any async storage goroutines to finish
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
sbsMu.Lock()
|
||||||
remoteStatCount := (*remoteStats)["stat"]
|
remoteStatCount := (*remoteStats)["stat"]
|
||||||
remoteOpenCount := (*remoteStats)["open"]
|
remoteOpenCount := (*remoteStats)["open"]
|
||||||
|
sbsMu.Unlock()
|
||||||
|
|
||||||
// Serveblob - blobs come from local
|
// Serveblob - blobs come from local
|
||||||
for _, dr := range te.inRemote {
|
for _, dr := range te.inRemote {
|
||||||
|
@ -403,6 +408,8 @@ func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
|
||||||
remoteStats = te.RemoteStats()
|
remoteStats = te.RemoteStats()
|
||||||
|
|
||||||
// Ensure remote unchanged
|
// Ensure remote unchanged
|
||||||
|
sbsMu.Lock()
|
||||||
|
defer sbsMu.Unlock()
|
||||||
if (*remoteStats)["stat"] != remoteStatCount && (*remoteStats)["open"] != remoteOpenCount {
|
if (*remoteStats)["stat"] != remoteStatCount && (*remoteStats)["open"] != remoteOpenCount {
|
||||||
t.Fatalf("unexpected remote stats: %#v", remoteStats)
|
t.Fatalf("unexpected remote stats: %#v", remoteStats)
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,11 +134,12 @@ func (ttles *TTLExpirationScheduler) Start() error {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ttles.saveTimer.C:
|
case <-ttles.saveTimer.C:
|
||||||
|
ttles.Lock()
|
||||||
if !ttles.indexDirty {
|
if !ttles.indexDirty {
|
||||||
|
ttles.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
ttles.Lock()
|
|
||||||
err := ttles.writeState()
|
err := ttles.writeState()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
context.GetLogger(ttles.ctx).Errorf("Error writing scheduler state: %s", err)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package scheduler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -38,6 +39,7 @@ func TestSchedule(t *testing.T) {
|
||||||
ref3.String(): true,
|
ref3.String(): true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var mu sync.Mutex
|
||||||
s := New(context.Background(), inmemory.New(), "/ttl")
|
s := New(context.Background(), inmemory.New(), "/ttl")
|
||||||
deleteFunc := func(repoName reference.Reference) error {
|
deleteFunc := func(repoName reference.Reference) error {
|
||||||
if len(remainingRepos) == 0 {
|
if len(remainingRepos) == 0 {
|
||||||
|
@ -48,7 +50,9 @@ func TestSchedule(t *testing.T) {
|
||||||
t.Fatalf("Trying to remove nonexistent repo: %s", repoName)
|
t.Fatalf("Trying to remove nonexistent repo: %s", repoName)
|
||||||
}
|
}
|
||||||
t.Log("removing", repoName)
|
t.Log("removing", repoName)
|
||||||
|
mu.Lock()
|
||||||
delete(remainingRepos, repoName.String())
|
delete(remainingRepos, repoName.String())
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -62,12 +66,17 @@ func TestSchedule(t *testing.T) {
|
||||||
s.add(ref2, 1*timeUnit, entryTypeBlob)
|
s.add(ref2, 1*timeUnit, entryTypeBlob)
|
||||||
|
|
||||||
func() {
|
func() {
|
||||||
|
s.Lock()
|
||||||
s.add(ref3, 1*timeUnit, entryTypeBlob)
|
s.add(ref3, 1*timeUnit, entryTypeBlob)
|
||||||
|
s.Unlock()
|
||||||
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Ensure all repos are deleted
|
// Ensure all repos are deleted
|
||||||
<-time.After(50 * timeUnit)
|
<-time.After(50 * timeUnit)
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
if len(remainingRepos) != 0 {
|
if len(remainingRepos) != 0 {
|
||||||
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
||||||
}
|
}
|
||||||
|
@ -80,22 +89,28 @@ func TestRestoreOld(t *testing.T) {
|
||||||
ref2.String(): true,
|
ref2.String(): true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(remainingRepos))
|
||||||
|
var mu sync.Mutex
|
||||||
deleteFunc := func(r reference.Reference) error {
|
deleteFunc := func(r reference.Reference) error {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
if r.String() == ref1.String() && len(remainingRepos) == 2 {
|
if r.String() == ref1.String() && len(remainingRepos) == 2 {
|
||||||
t.Errorf("ref1 should be removed first")
|
t.Errorf("ref1 should not be removed first")
|
||||||
}
|
}
|
||||||
_, ok := remainingRepos[r.String()]
|
_, ok := remainingRepos[r.String()]
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("Trying to remove nonexistent repo: %s", r)
|
t.Fatalf("Trying to remove nonexistent repo: %s", r)
|
||||||
}
|
}
|
||||||
delete(remainingRepos, r.String())
|
delete(remainingRepos, r.String())
|
||||||
|
wg.Done()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
timeUnit := time.Millisecond
|
timeUnit := time.Millisecond
|
||||||
serialized, err := json.Marshal(&map[string]schedulerEntry{
|
serialized, err := json.Marshal(&map[string]schedulerEntry{
|
||||||
ref1.String(): {
|
ref1.String(): {
|
||||||
Expiry: time.Now().Add(1 * timeUnit),
|
Expiry: time.Now().Add(10 * timeUnit),
|
||||||
Key: ref1.String(),
|
Key: ref1.String(),
|
||||||
EntryType: 0,
|
EntryType: 0,
|
||||||
},
|
},
|
||||||
|
@ -117,13 +132,16 @@ func TestRestoreOld(t *testing.T) {
|
||||||
t.Fatal("Unable to write serialized data to fs")
|
t.Fatal("Unable to write serialized data to fs")
|
||||||
}
|
}
|
||||||
s := New(context.Background(), fs, "/ttl")
|
s := New(context.Background(), fs, "/ttl")
|
||||||
s.onBlobExpire = deleteFunc
|
s.OnBlobExpire(deleteFunc)
|
||||||
err = s.Start()
|
err = s.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
|
||||||
}
|
}
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
<-time.After(50 * timeUnit)
|
wg.Wait()
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
if len(remainingRepos) != 0 {
|
if len(remainingRepos) != 0 {
|
||||||
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
||||||
}
|
}
|
||||||
|
@ -138,8 +156,11 @@ func TestStopRestore(t *testing.T) {
|
||||||
ref2.String(): true,
|
ref2.String(): true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var mu sync.Mutex
|
||||||
deleteFunc := func(r reference.Reference) error {
|
deleteFunc := func(r reference.Reference) error {
|
||||||
|
mu.Lock()
|
||||||
delete(remainingRepos, r.String())
|
delete(remainingRepos, r.String())
|
||||||
|
mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,6 +190,8 @@ func TestStopRestore(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
<-time.After(500 * timeUnit)
|
<-time.After(500 * timeUnit)
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
if len(remainingRepos) != 0 {
|
if len(remainingRepos) != 0 {
|
||||||
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
t.Fatalf("Repositories remaining: %#v", remainingRepos)
|
||||||
}
|
}
|
||||||
|
|
33
registry/storage/cache/memory/memory.go
vendored
33
registry/storage/cache/memory/memory.go
vendored
|
@ -77,37 +77,46 @@ type repositoryScopedInMemoryBlobDescriptorCache struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||||
if rsimbdcp.repository == nil {
|
rsimbdcp.parent.mu.Lock()
|
||||||
|
repo := rsimbdcp.repository
|
||||||
|
rsimbdcp.parent.mu.Unlock()
|
||||||
|
|
||||||
|
if repo == nil {
|
||||||
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
||||||
}
|
}
|
||||||
|
|
||||||
return rsimbdcp.repository.Stat(ctx, dgst)
|
return repo.Stat(ctx, dgst)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error {
|
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||||
if rsimbdcp.repository == nil {
|
rsimbdcp.parent.mu.Lock()
|
||||||
|
repo := rsimbdcp.repository
|
||||||
|
rsimbdcp.parent.mu.Unlock()
|
||||||
|
|
||||||
|
if repo == nil {
|
||||||
return distribution.ErrBlobUnknown
|
return distribution.ErrBlobUnknown
|
||||||
}
|
}
|
||||||
|
|
||||||
return rsimbdcp.repository.Clear(ctx, dgst)
|
return repo.Clear(ctx, dgst)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||||
if rsimbdcp.repository == nil {
|
|
||||||
// allocate map since we are setting it now.
|
|
||||||
rsimbdcp.parent.mu.Lock()
|
rsimbdcp.parent.mu.Lock()
|
||||||
|
repo := rsimbdcp.repository
|
||||||
|
if repo == nil {
|
||||||
|
// allocate map since we are setting it now.
|
||||||
var ok bool
|
var ok bool
|
||||||
// have to read back value since we may have allocated elsewhere.
|
// have to read back value since we may have allocated elsewhere.
|
||||||
rsimbdcp.repository, ok = rsimbdcp.parent.repositories[rsimbdcp.repo]
|
repo, ok = rsimbdcp.parent.repositories[rsimbdcp.repo]
|
||||||
if !ok {
|
if !ok {
|
||||||
rsimbdcp.repository = newMapBlobDescriptorCache()
|
repo = newMapBlobDescriptorCache()
|
||||||
rsimbdcp.parent.repositories[rsimbdcp.repo] = rsimbdcp.repository
|
rsimbdcp.parent.repositories[rsimbdcp.repo] = repo
|
||||||
|
}
|
||||||
|
rsimbdcp.repository = repo
|
||||||
}
|
}
|
||||||
|
|
||||||
rsimbdcp.parent.mu.Unlock()
|
rsimbdcp.parent.mu.Unlock()
|
||||||
}
|
|
||||||
|
|
||||||
if err := rsimbdcp.repository.SetDescriptor(ctx, dgst, desc); err != nil {
|
if err := repo.SetDescriptor(ctx, dgst, desc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue