diff --git a/registry/handlers/app.go b/registry/handlers/app.go index c2685d98..28940c8e 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -3,6 +3,7 @@ package handlers import ( "expvar" "fmt" + "math/rand" "net" "net/http" "os" @@ -79,6 +80,9 @@ func NewApp(ctx context.Context, configuration configuration.Configuration) *App // a health check. panic(err) } + + startUploadPurger(app.driver, ctxu.GetLogger(app)) + app.driver, err = applyStorageMiddleware(app.driver, configuration.Middleware["storage"]) if err != nil { panic(err) @@ -549,3 +553,27 @@ func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []co } return driver, nil } + +// startUploadPurger schedules a goroutine which will periodically +// check upload directories for old files and delete them +func startUploadPurger(storageDriver storagedriver.StorageDriver, log ctxu.Logger) { + rand.Seed(time.Now().Unix()) + jitter := time.Duration(rand.Int()%60) * time.Minute + + // Start with reasonable defaults + // TODO:(richardscothern) make configurable + purgeAge := time.Duration(7 * 24 * time.Hour) + timeBetweenPurges := time.Duration(1 * 24 * time.Hour) + + go func() { + log.Infof("Starting upload purge in %s", jitter) + time.Sleep(jitter) + + for { + storage.PurgeUploads(storageDriver, time.Now().Add(-purgeAge), true) + log.Infof("Starting upload purge in %s", timeBetweenPurges) + time.Sleep(timeBetweenPurges) + } + }() + +} diff --git a/registry/storage/paths.go b/registry/storage/paths.go index f541f079..7aeff6e4 100644 --- a/registry/storage/paths.go +++ b/registry/storage/paths.go @@ -257,6 +257,8 @@ func (pm *pathMapper) path(spec pathSpec) (string, error) { offset = "" // Limit to the prefix for listing offsets. } return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "hashstates", v.alg, offset)...), nil + case repositoriesRootPathSpec: + return path.Join(repoPrefix...), nil default: // TODO(sday): This is an internal error. Ensure it doesn't escape (panic?). return "", fmt.Errorf("unknown path spec: %#v", v) @@ -446,6 +448,12 @@ type uploadHashStatePathSpec struct { func (uploadHashStatePathSpec) pathSpec() {} +// repositoriesRootPathSpec returns the root of repositories +type repositoriesRootPathSpec struct { +} + +func (repositoriesRootPathSpec) pathSpec() {} + // digestPathComponents provides a consistent path breakdown for a given // digest. For a generic digest, it will be as follows: // diff --git a/registry/storage/purgeuploads.go b/registry/storage/purgeuploads.go new file mode 100644 index 00000000..13c468de --- /dev/null +++ b/registry/storage/purgeuploads.go @@ -0,0 +1,136 @@ +package storage + +import ( + "path" + "strings" + "time" + + "code.google.com/p/go-uuid/uuid" + log "github.com/Sirupsen/logrus" + storageDriver "github.com/docker/distribution/registry/storage/driver" +) + +// uploadData stored the location of temporary files created during a layer upload +// along with the date the upload was started +type uploadData struct { + containingDir string + startedAt time.Time +} + +func newUploadData() uploadData { + return uploadData{ + containingDir: "", + // default to far in future to protect against missing startedat + startedAt: time.Now().Add(time.Duration(10000 * time.Hour)), + } +} + +// PurgeUploads deletes files from the upload directory +// created before olderThan. The list of files deleted and errors +// encountered are returned +func PurgeUploads(driver storageDriver.StorageDriver, olderThan time.Time, actuallyDelete bool) ([]string, []error) { + log.Infof("PurgeUploads starting: olderThan=%s, actuallyDelete=%t", olderThan, actuallyDelete) + uploadData, errors := getOutstandingUploads(driver) + var deleted []string + for _, uploadData := range uploadData { + if uploadData.startedAt.Before(olderThan) { + var err error + log.Infof("Upload files in %s have older date (%s) than purge date (%s). Removing upload directory.", + uploadData.containingDir, uploadData.startedAt, olderThan) + if actuallyDelete { + err = driver.Delete(uploadData.containingDir) + } + if err == nil { + deleted = append(deleted, uploadData.containingDir) + } else { + errors = append(errors, err) + } + } + } + + log.Infof("Purge uploads finished. Num deleted=%d, num errors=%d", len(deleted), len(errors)) + return deleted, errors +} + +// getOutstandingUploads walks the upload directory, collecting files +// which could be eligible for deletion. The only reliable way to +// classify the age of a file is with the date stored in the startedAt +// file, so gather files by UUID with a date from startedAt. +func getOutstandingUploads(driver storageDriver.StorageDriver) (map[string]uploadData, []error) { + var errors []error + uploads := make(map[string]uploadData, 0) + + inUploadDir := false + root, err := defaultPathMapper.path(repositoriesRootPathSpec{}) + if err != nil { + return uploads, append(errors, err) + } + err = Walk(driver, root, func(fileInfo storageDriver.FileInfo) error { + filePath := fileInfo.Path() + _, file := path.Split(filePath) + if file[0] == '_' { + // Reserved directory + inUploadDir = (file == "_uploads") + + if fileInfo.IsDir() && !inUploadDir { + return ErrSkipDir + } + + } + + uuid, isContainingDir := uUIDFromPath(filePath) + if uuid == "" { + // Cannot reliably delete + return nil + } + ud, ok := uploads[uuid] + if !ok { + ud = newUploadData() + } + if isContainingDir { + ud.containingDir = filePath + } + if file == "startedat" { + if t, err := readStartedAtFile(driver, filePath); err == nil { + ud.startedAt = t + } else { + errors = pushError(errors, filePath, err) + } + + } + + uploads[uuid] = ud + return nil + }) + + if err != nil { + errors = pushError(errors, root, err) + } + return uploads, errors +} + +// uUIDFromPath extracts the upload UUID from a given path +// If the UUID is the last path component, this is the containing +// directory for all upload files +func uUIDFromPath(path string) (string, bool) { + components := strings.Split(path, "/") + for i := len(components) - 1; i >= 0; i-- { + if uuid := uuid.Parse(components[i]); uuid != nil { + return uuid.String(), i == len(components)-1 + } + } + return "", false +} + +// readStartedAtFile reads the date from an upload's startedAtFile +func readStartedAtFile(driver storageDriver.StorageDriver, path string) (time.Time, error) { + startedAtBytes, err := driver.GetContent(path) + if err != nil { + return time.Now(), err + } + startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes)) + if err != nil { + return time.Now(), err + } + return startedAt, nil +} diff --git a/registry/storage/purgeuploads_test.go b/registry/storage/purgeuploads_test.go new file mode 100644 index 00000000..368e7c86 --- /dev/null +++ b/registry/storage/purgeuploads_test.go @@ -0,0 +1,165 @@ +package storage + +import ( + "path" + "strings" + "testing" + "time" + + "code.google.com/p/go-uuid/uuid" + "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/inmemory" +) + +var pm = defaultPathMapper + +func testUploadFS(t *testing.T, numUploads int, repoName string, startedAt time.Time) driver.StorageDriver { + d := inmemory.New() + for i := 0; i < numUploads; i++ { + addUploads(t, d, uuid.New(), repoName, startedAt) + } + return d +} + +func addUploads(t *testing.T, d driver.StorageDriver, uploadID, repo string, startedAt time.Time) { + dataPath, err := pm.path(uploadDataPathSpec{name: repo, uuid: uploadID}) + if err != nil { + t.Fatalf("Unable to resolve path") + } + if err := d.PutContent(dataPath, []byte("")); err != nil { + t.Fatalf("Unable to write data file") + } + + startedAtPath, err := pm.path(uploadStartedAtPathSpec{name: repo, uuid: uploadID}) + if err != nil { + t.Fatalf("Unable to resolve path") + } + + if d.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { + t.Fatalf("Unable to write startedAt file") + } + +} + +func TestPurgeGather(t *testing.T) { + uploadCount := 5 + fs := testUploadFS(t, uploadCount, "test-repo", time.Now()) + uploadData, errs := getOutstandingUploads(fs) + if len(errs) != 0 { + t.Errorf("Unexepected errors: %q", errs) + } + if len(uploadData) != uploadCount { + t.Errorf("Unexpected upload file count: %d != %d", uploadCount, len(uploadData)) + } +} + +func TestPurgeNone(t *testing.T) { + fs := testUploadFS(t, 10, "test-repo", time.Now()) + oneHourAgo := time.Now().Add(-1 * time.Hour) + deleted, errs := PurgeUploads(fs, oneHourAgo, true) + if len(errs) != 0 { + t.Error("Unexpected errors", errs) + } + if len(deleted) != 0 { + t.Errorf("Unexpectedly deleted files for time: %s", oneHourAgo) + } +} + +func TestPurgeAll(t *testing.T) { + uploadCount := 10 + oneHourAgo := time.Now().Add(-1 * time.Hour) + fs := testUploadFS(t, uploadCount, "test-repo", oneHourAgo) + + // Ensure > 1 repos are purged + addUploads(t, fs, uuid.New(), "test-repo2", oneHourAgo) + uploadCount++ + + deleted, errs := PurgeUploads(fs, time.Now(), true) + if len(errs) != 0 { + t.Error("Unexpected errors:", errs) + } + fileCount := uploadCount + if len(deleted) != fileCount { + t.Errorf("Unexpectedly deleted file count %d != %d", + len(deleted), fileCount) + } +} + +func TestPurgeSome(t *testing.T) { + oldUploadCount := 5 + oneHourAgo := time.Now().Add(-1 * time.Hour) + fs := testUploadFS(t, oldUploadCount, "library/test-repo", oneHourAgo) + + newUploadCount := 4 + + for i := 0; i < newUploadCount; i++ { + addUploads(t, fs, uuid.New(), "test-repo", time.Now().Add(1*time.Hour)) + } + + deleted, errs := PurgeUploads(fs, time.Now(), true) + if len(errs) != 0 { + t.Error("Unexpected errors:", errs) + } + if len(deleted) != oldUploadCount { + t.Errorf("Unexpectedly deleted file count %d != %d", + len(deleted), oldUploadCount) + } +} + +func TestPurgeOnlyUploads(t *testing.T) { + oldUploadCount := 5 + oneHourAgo := time.Now().Add(-1 * time.Hour) + fs := testUploadFS(t, oldUploadCount, "test-repo", oneHourAgo) + + // Create a directory tree outside _uploads and ensure + // these files aren't deleted. + dataPath, err := pm.path(uploadDataPathSpec{name: "test-repo", uuid: uuid.New()}) + if err != nil { + t.Fatalf(err.Error()) + } + nonUploadPath := strings.Replace(dataPath, "_upload", "_important", -1) + if strings.Index(nonUploadPath, "_upload") != -1 { + t.Fatalf("Non-upload path not created correctly") + } + + nonUploadFile := path.Join(nonUploadPath, "file") + if err = fs.PutContent(nonUploadFile, []byte("")); err != nil { + t.Fatalf("Unable to write data file") + } + + deleted, errs := PurgeUploads(fs, time.Now(), true) + if len(errs) != 0 { + t.Error("Unexpected errors", errs) + } + for _, file := range deleted { + if strings.Index(file, "_upload") == -1 { + t.Errorf("Non-upload file deleted") + } + } +} + +func TestPurgeMissingStartedAt(t *testing.T) { + oneHourAgo := time.Now().Add(-1 * time.Hour) + fs := testUploadFS(t, 1, "test-repo", oneHourAgo) + err := Walk(fs, "/", func(fileInfo driver.FileInfo) error { + filePath := fileInfo.Path() + _, file := path.Split(filePath) + + if file == "startedat" { + if err := fs.Delete(filePath); err != nil { + t.Fatalf("Unable to delete startedat file: %s", filePath) + } + } + return nil + }) + if err != nil { + t.Fatalf("Unexpected error during Walk: %s ", err.Error()) + } + deleted, errs := PurgeUploads(fs, time.Now(), true) + if len(errs) > 0 { + t.Errorf("Unexpected errors") + } + if len(deleted) > 0 { + t.Errorf("Files unexpectedly deleted: %s", deleted) + } +} diff --git a/registry/storage/walk.go b/registry/storage/walk.go new file mode 100644 index 00000000..7b958d87 --- /dev/null +++ b/registry/storage/walk.go @@ -0,0 +1,50 @@ +package storage + +import ( + "errors" + "fmt" + + storageDriver "github.com/docker/distribution/registry/storage/driver" +) + +// SkipDir is used as a return value from onFileFunc to indicate that +// the directory named in the call is to be skipped. It is not returned +// as an error by any function. +var ErrSkipDir = errors.New("skip this directory") + +// WalkFn is called once per file by Walk +// If the returned error is ErrSkipDir and fileInfo refers +// to a directory, the directory will not be entered and Walk +// will continue the traversal. Otherwise Walk will return +type WalkFn func(fileInfo storageDriver.FileInfo) error + +// Walk traverses a filesystem defined within driver, starting +// from the given path, calling f on each file +func Walk(driver storageDriver.StorageDriver, from string, f WalkFn) error { + children, err := driver.List(from) + if err != nil { + return err + } + for _, child := range children { + fileInfo, err := driver.Stat(child) + if err != nil { + return err + } + err = f(fileInfo) + skipDir := (err == ErrSkipDir) + if err != nil && !skipDir { + return err + } + + if fileInfo.IsDir() && !skipDir { + Walk(driver, child, f) + } + } + return nil +} + +// pushError formats an error type given a path and an error +// and pushes it to a slice of errors +func pushError(errors []error, path string, err error) []error { + return append(errors, fmt.Errorf("%s: %s", path, err)) +} diff --git a/registry/storage/walk_test.go b/registry/storage/walk_test.go new file mode 100644 index 00000000..22b91b35 --- /dev/null +++ b/registry/storage/walk_test.go @@ -0,0 +1,119 @@ +package storage + +import ( + "fmt" + "testing" + + "github.com/docker/distribution/registry/storage/driver" + "github.com/docker/distribution/registry/storage/driver/inmemory" +) + +func testFS(t *testing.T) (driver.StorageDriver, map[string]string) { + d := inmemory.New() + c := []byte("") + if err := d.PutContent("/a/b/c/d", c); err != nil { + t.Fatalf("Unable to put to inmemory fs") + } + if err := d.PutContent("/a/b/c/e", c); err != nil { + t.Fatalf("Unable to put to inmemory fs") + } + + expected := map[string]string{ + "/a": "dir", + "/a/b": "dir", + "/a/b/c": "dir", + "/a/b/c/d": "file", + "/a/b/c/e": "file", + } + + return d, expected +} + +func TestWalkErrors(t *testing.T) { + d, expected := testFS(t) + fileCount := len(expected) + err := Walk(d, "", func(fileInfo driver.FileInfo) error { + return nil + }) + if err == nil { + t.Error("Expected invalid root err") + } + + err = Walk(d, "/", func(fileInfo driver.FileInfo) error { + // error on the 2nd file + if fileInfo.Path() == "/a/b" { + return fmt.Errorf("Early termination") + } + delete(expected, fileInfo.Path()) + return nil + }) + if len(expected) != fileCount-1 { + t.Error("Walk failed to terminate with error") + } + if err != nil { + t.Error(err.Error()) + } + + err = Walk(d, "/nonexistant", func(fileInfo driver.FileInfo) error { + return nil + }) + if err == nil { + t.Errorf("Expected missing file err") + } + +} + +func TestWalk(t *testing.T) { + d, expected := testFS(t) + err := Walk(d, "/", func(fileInfo driver.FileInfo) error { + filePath := fileInfo.Path() + filetype, ok := expected[filePath] + if !ok { + t.Fatalf("Unexpected file in walk: %q", filePath) + } + + if fileInfo.IsDir() { + if filetype != "dir" { + t.Errorf("Unexpected file type: %q", filePath) + } + } else { + if filetype != "file" { + t.Errorf("Unexpected file type: %q", filePath) + } + } + delete(expected, filePath) + return nil + }) + if len(expected) > 0 { + t.Errorf("Missed files in walk: %q", expected) + } + if err != nil { + t.Fatalf(err.Error()) + } +} + +func TestWalkSkipDir(t *testing.T) { + d, expected := testFS(t) + err := Walk(d, "/", func(fileInfo driver.FileInfo) error { + filePath := fileInfo.Path() + if filePath == "/a/b" { + // skip processing /a/b/c and /a/b/c/d + return ErrSkipDir + } + delete(expected, filePath) + return nil + }) + if err != nil { + t.Fatalf(err.Error()) + } + if _, ok := expected["/a/b/c"]; !ok { + t.Errorf("/a/b/c not skipped") + } + if _, ok := expected["/a/b/c/d"]; !ok { + t.Errorf("/a/b/c/d not skipped") + } + if _, ok := expected["/a/b/c/e"]; !ok { + t.Errorf("/a/b/c/e not skipped") + } + +}