From 65e8c07847c58abc8757b6172d65faf46162c8cb Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 2 Feb 2017 16:37:04 -0800 Subject: [PATCH] Update diff interface to use callback The change computation will be done on the callers thread and use callbacks rather than running a goroutine and returning a channel. Signed-off-by: Derek McGowan (github: dmcgowan) --- fs/diff.go | 250 +++++++++++++++++++----------------------------- fs/diff_test.go | 47 +++++---- fs/path.go | 86 +++++++---------- 3 files changed, 164 insertions(+), 219 deletions(-) diff --git a/fs/diff.go b/fs/diff.go index 1a2c55a..0ec4c89 100644 --- a/fs/diff.go +++ b/fs/diff.go @@ -5,7 +5,8 @@ import ( "os" "path/filepath" "strings" - "sync" + + "golang.org/x/sync/errgroup" "github.com/Sirupsen/logrus" ) @@ -43,14 +44,13 @@ func (k ChangeKind) String() string { // Change represents single change between a diff and its parent. type Change struct { - Kind ChangeKind - Path string - FileInfo os.FileInfo - Source string + Kind ChangeKind + Path string } -// Changes returns a stream of changes between the provided upper -// directory and lower directory. +// Changes computes changes between lower and upper calling the +// given change function for each computed change. Callbacks +// will be done serialially and order by path name. // // Changes are ordered by name and should be appliable in the // order in which they received. @@ -71,70 +71,22 @@ type Change struct { // nanosecond values where one of those values is zero, the files will // be considered unchanged if the content is the same. This behavior // is to account for timestamp truncation during archiving. -func Changes(ctx context.Context, upper, lower string) (context.Context, <-chan Change) { - var ( - changes = make(chan Change) - retCtx, cancel = context.WithCancel(ctx) - ) - - cc := &changeContext{ - Context: retCtx, +func Changes(ctx context.Context, upper, lower string, ch func(Change, os.FileInfo) error) error { + if lower == "" { + logrus.Debugf("Using single walk diff for %s", upper) + return addDirChanges(ctx, ch, upper) + } else if diffOptions := detectDirDiff(upper, lower); diffOptions != nil { + logrus.Debugf("Using single walk diff for %s from %s", diffOptions.diffDir, lower) + return diffDirChanges(ctx, ch, lower, diffOptions) } - go func() { - var err error - if lower == "" { - logrus.Debugf("Using single walk diff for %s", upper) - err = addDirChanges(ctx, changes, upper) - } else if diffOptions := detectDirDiff(upper, lower); diffOptions != nil { - logrus.Debugf("Using single walk diff for %s from %s", diffOptions.diffDir, lower) - err = diffDirChanges(ctx, changes, lower, diffOptions) - } else { - logrus.Debugf("Using double walk diff for %s from %s", upper, lower) - err = doubleWalkDiff(ctx, changes, upper, lower) - } - - if err != nil { - cc.errL.Lock() - cc.err = err - cc.errL.Unlock() - cancel() - } - close(changes) - }() - - return cc, changes + logrus.Debugf("Using double walk diff for %s from %s", upper, lower) + return doubleWalkDiff(ctx, ch, upper, lower) } -// changeContext wraps a context to allow setting an error -// directly from a change streamer to allow streams canceled -// due to errors to propagate the error to the caller. -type changeContext struct { - context.Context +type changeFn func(Change, os.FileInfo) error - err error - errL sync.Mutex -} - -func (cc *changeContext) Err() error { - cc.errL.Lock() - if cc.err != nil { - return cc.err - } - cc.errL.Unlock() - return cc.Context.Err() -} - -func sendChange(ctx context.Context, changes chan<- Change, change Change) error { - select { - case <-ctx.Done(): - return ctx.Err() - case changes <- change: - return nil - } -} - -func addDirChanges(ctx context.Context, changes chan<- Change, root string) error { +func addDirChanges(ctx context.Context, changes changeFn, root string) error { return filepath.Walk(root, func(path string, f os.FileInfo, err error) error { if err != nil { return err @@ -154,13 +106,11 @@ func addDirChanges(ctx context.Context, changes chan<- Change, root string) erro } change := Change{ - Path: path, - Kind: ChangeKindAdd, - FileInfo: f, - Source: filepath.Join(root, path), + Path: path, + Kind: ChangeKindAdd, } - return sendChange(ctx, changes, change) + return changes(change, f) }) } @@ -173,7 +123,7 @@ type diffDirOptions struct { } // diffDirChanges walks the diff directory and compares changes against the lower. -func diffDirChanges(ctx context.Context, changes chan<- Change, lower string, o *diffDirOptions) error { +func diffDirChanges(ctx context.Context, changes changeFn, lower string, o *diffDirOptions) error { changedDirs := make(map[string]struct{}) return filepath.Walk(o.diffDir, func(path string, f os.FileInfo, err error) error { if err != nil { @@ -215,11 +165,10 @@ func diffDirChanges(ctx context.Context, changes chan<- Change, lower string, o if deletedFile != "" { change.Path = deletedFile change.Kind = ChangeKindDelete + f = nil } else { // Otherwise, the file was added change.Kind = ChangeKindAdd - change.FileInfo = f - change.Source = filepath.Join(o.diffDir, path) // ...Unless it already existed in a lower, in which case, it's a modification stat, err := os.Stat(filepath.Join(lower, path)) @@ -256,105 +205,108 @@ func diffDirChanges(ctx context.Context, changes chan<- Change, lower string, o return err } dirChange := Change{ - Path: parent, - Kind: ChangeKindModify, - FileInfo: pi, - Source: filepath.Join(o.diffDir, parent), + Path: parent, + Kind: ChangeKindModify, } - if err := sendChange(ctx, changes, dirChange); err != nil { + if err := changes(dirChange, pi); err != nil { return err } changedDirs[parent] = struct{}{} } } - return sendChange(ctx, changes, change) + return changes(change, f) }) } // doubleWalkDiff walks both directories to create a diff -func doubleWalkDiff(ctx context.Context, changes chan<- Change, upper, lower string) (err error) { - pathCtx, cancel := context.WithCancel(ctx) - defer func() { - if err != nil { - cancel() - } - }() +func doubleWalkDiff(ctx context.Context, changes changeFn, upper, lower string) (err error) { + g, ctx := errgroup.WithContext(ctx) var ( - w1 = pathWalker(pathCtx, lower) - w2 = pathWalker(pathCtx, upper) + c1 = make(chan *currentPath) + c2 = make(chan *currentPath) + f1, f2 *currentPath rmdir string ) + g.Go(func() error { + defer close(c1) + return pathWalk(ctx, lower, c1) + }) + g.Go(func() error { + defer close(c2) + return pathWalk(ctx, upper, c2) + }) + g.Go(func() error { + for c1 != nil || c2 != nil { + if f1 == nil && c1 != nil { + f1, err = nextPath(ctx, c1) + if err != nil { + return err + } + if f1 == nil { + c1 = nil + } + } - for w1 != nil || w2 != nil { - if f1 == nil && w1 != nil { - f1, err = nextPath(w1) - if err != nil { - return err + if f2 == nil && c2 != nil { + f2, err = nextPath(ctx, c2) + if err != nil { + return err + } + if f2 == nil { + c2 = nil + } } - if f1 == nil { - w1 = nil + if f1 == nil && f2 == nil { + continue } - } - if f2 == nil && w2 != nil { - f2, err = nextPath(w2) - if err != nil { - return err - } - if f2 == nil { - w2 = nil - } - } - if f1 == nil && f2 == nil { - continue - } - - c := pathChange(f1, f2) - switch c.Kind { - case ChangeKindAdd: - if rmdir != "" { - rmdir = "" - } - c.FileInfo = f2.f - c.Source = filepath.Join(upper, c.Path) - f2 = nil - case ChangeKindDelete: - // Check if this file is already removed by being - // under of a removed directory - if rmdir != "" && strings.HasPrefix(f1.path, rmdir) { + var f os.FileInfo + c := pathChange(f1, f2) + switch c.Kind { + case ChangeKindAdd: + if rmdir != "" { + rmdir = "" + } + f = f2.f + f2 = nil + case ChangeKindDelete: + // Check if this file is already removed by being + // under of a removed directory + if rmdir != "" && strings.HasPrefix(f1.path, rmdir) { + f1 = nil + continue + } else if rmdir == "" && f1.f.IsDir() { + rmdir = f1.path + string(os.PathSeparator) + } else if rmdir != "" { + rmdir = "" + } f1 = nil - continue - } else if rmdir == "" && f1.f.IsDir() { - rmdir = f1.path + string(os.PathSeparator) - } else if rmdir != "" { - rmdir = "" + case ChangeKindModify: + same, err := sameFile(f1, f2) + if err != nil { + return err + } + if f1.f.IsDir() && !f2.f.IsDir() { + rmdir = f1.path + string(os.PathSeparator) + } else if rmdir != "" { + rmdir = "" + } + f = f2.f + f1 = nil + f2 = nil + if same { + continue + } } - f1 = nil - case ChangeKindModify: - same, err := sameFile(f1, f2) - if err != nil { + if err := changes(c, f); err != nil { return err } - if f1.f.IsDir() && !f2.f.IsDir() { - rmdir = f1.path + string(os.PathSeparator) - } else if rmdir != "" { - rmdir = "" - } - c.FileInfo = f2.f - c.Source = filepath.Join(upper, c.Path) - f1 = nil - f2 = nil - if same { - continue - } } - if err := sendChange(ctx, changes, c); err != nil { - return err - } - } + return nil + }) - return nil + return g.Wait() } diff --git a/fs/diff_test.go b/fs/diff_test.go index 255d347..334bceb 100644 --- a/fs/diff_test.go +++ b/fs/diff_test.go @@ -220,13 +220,13 @@ func testDiffWithoutBase(apply fstest.Applier, expected []Change) error { return checkChanges(tmp, changes, expected) } -func checkChanges(root string, changes, expected []Change) error { +func checkChanges(root string, changes []testChange, expected []Change) error { if len(changes) != len(expected) { - return errors.Errorf("Unexpected number of changes:\n%s", diffString(changes, expected)) + return errors.Errorf("Unexpected number of changes:\n%s", diffString(convertTestChanges(changes), expected)) } for i := range changes { if changes[i].Path != expected[i].Path || changes[i].Kind != expected[i].Kind { - return errors.Errorf("Unexpected change at %d:\n%s", i, diffString(changes, expected)) + return errors.Errorf("Unexpected change at %d:\n%s", i, diffString(convertTestChanges(changes), expected)) } if changes[i].Kind != ChangeKindDelete { filename := filepath.Join(root, changes[i].Path) @@ -253,20 +253,35 @@ func checkChanges(root string, changes, expected []Change) error { return nil } -func collectChanges(upper, lower string) ([]Change, error) { - ctx, changeC := Changes(context.Background(), upper, lower) - changes := []Change{} - for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case c, ok := <-changeC: - if !ok { - return changes, nil - } - changes = append(changes, c) - } +type testChange struct { + Change + FileInfo os.FileInfo + Source string +} + +func collectChanges(upper, lower string) ([]testChange, error) { + changes := []testChange{} + err := Changes(context.Background(), upper, lower, func(c Change, f os.FileInfo) error { + changes = append(changes, testChange{ + Change: c, + FileInfo: f, + Source: filepath.Join(upper, c.Path), + }) + return nil + }) + if err != nil { + return nil, errors.Wrap(err, "failed to compute changes") } + + return changes, nil +} + +func convertTestChanges(c []testChange) []Change { + nc := make([]Change, len(c)) + for i := range c { + nc[i] = c[i].Change + } + return nc } func diffString(c1, c2 []Change) string { diff --git a/fs/path.go b/fs/path.go index 0060a66..342f392 100644 --- a/fs/path.go +++ b/fs/path.go @@ -133,67 +133,45 @@ func compareFileContent(p1, p2 string) (bool, error) { } } -type walker struct { - pathC <-chan *currentPath - errC <-chan error -} - -func pathWalker(ctx context.Context, root string) *walker { - var ( - pathC = make(chan *currentPath) - errC = make(chan error, 1) - ) - go func() { - defer close(pathC) - err := filepath.Walk(root, func(path string, f os.FileInfo, err error) error { - if err != nil { - return err - } - - // Rebase path - path, err = filepath.Rel(root, path) - if err != nil { - return err - } - - path = filepath.Join(string(os.PathSeparator), path) - - // Skip root - if path == string(os.PathSeparator) { - return nil - } - - return sendPath(ctx, pathC, ¤tPath{ - path: path, - f: f, - fullPath: filepath.Join(root, path), - }) - }) +func pathWalk(ctx context.Context, root string, pathC chan<- *currentPath) error { + return filepath.Walk(root, func(path string, f os.FileInfo, err error) error { if err != nil { - errC <- err + return err } - }() - return &walker{ - pathC: pathC, - errC: errC, - } + // Rebase path + path, err = filepath.Rel(root, path) + if err != nil { + return err + } + + path = filepath.Join(string(os.PathSeparator), path) + + // Skip root + if path == string(os.PathSeparator) { + return nil + } + + p := ¤tPath{ + path: path, + f: f, + fullPath: filepath.Join(root, path), + } + + select { + case <-ctx.Done(): + return ctx.Err() + case pathC <- p: + return nil + } + }) } -func sendPath(ctx context.Context, pc chan<- *currentPath, p *currentPath) error { +func nextPath(ctx context.Context, pathC <-chan *currentPath) (*currentPath, error) { select { case <-ctx.Done(): - return ctx.Err() - case pc <- p: - return nil - } -} - -func nextPath(w *walker) (*currentPath, error) { - select { - case err := <-w.errC: - return nil, err - case p := <-w.pathC: + return nil, ctx.Err() + case p := <-pathC: return p, nil } }