Update diff interface to use callback

The change computation will be done on the callers thread
and use callbacks rather than running a goroutine and
returning a channel.

Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
This commit is contained in:
Derek McGowan 2017-02-02 16:37:04 -08:00
parent aa3be3b0fe
commit 65e8c07847
3 changed files with 164 additions and 219 deletions

View file

@ -5,7 +5,8 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"golang.org/x/sync/errgroup"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
) )
@ -43,14 +44,13 @@ func (k ChangeKind) String() string {
// Change represents single change between a diff and its parent. // Change represents single change between a diff and its parent.
type Change struct { type Change struct {
Kind ChangeKind Kind ChangeKind
Path string Path string
FileInfo os.FileInfo
Source string
} }
// Changes returns a stream of changes between the provided upper // Changes computes changes between lower and upper calling the
// directory and lower directory. // given change function for each computed change. Callbacks
// will be done serialially and order by path name.
// //
// Changes are ordered by name and should be appliable in the // Changes are ordered by name and should be appliable in the
// order in which they received. // order in which they received.
@ -71,70 +71,22 @@ type Change struct {
// nanosecond values where one of those values is zero, the files will // nanosecond values where one of those values is zero, the files will
// be considered unchanged if the content is the same. This behavior // be considered unchanged if the content is the same. This behavior
// is to account for timestamp truncation during archiving. // is to account for timestamp truncation during archiving.
func Changes(ctx context.Context, upper, lower string) (context.Context, <-chan Change) { func Changes(ctx context.Context, upper, lower string, ch func(Change, os.FileInfo) error) error {
var ( if lower == "" {
changes = make(chan Change) logrus.Debugf("Using single walk diff for %s", upper)
retCtx, cancel = context.WithCancel(ctx) return addDirChanges(ctx, ch, upper)
) } else if diffOptions := detectDirDiff(upper, lower); diffOptions != nil {
logrus.Debugf("Using single walk diff for %s from %s", diffOptions.diffDir, lower)
cc := &changeContext{ return diffDirChanges(ctx, ch, lower, diffOptions)
Context: retCtx,
} }
go func() { logrus.Debugf("Using double walk diff for %s from %s", upper, lower)
var err error return doubleWalkDiff(ctx, ch, upper, lower)
if lower == "" {
logrus.Debugf("Using single walk diff for %s", upper)
err = addDirChanges(ctx, changes, upper)
} else if diffOptions := detectDirDiff(upper, lower); diffOptions != nil {
logrus.Debugf("Using single walk diff for %s from %s", diffOptions.diffDir, lower)
err = diffDirChanges(ctx, changes, lower, diffOptions)
} else {
logrus.Debugf("Using double walk diff for %s from %s", upper, lower)
err = doubleWalkDiff(ctx, changes, upper, lower)
}
if err != nil {
cc.errL.Lock()
cc.err = err
cc.errL.Unlock()
cancel()
}
close(changes)
}()
return cc, changes
} }
// changeContext wraps a context to allow setting an error type changeFn func(Change, os.FileInfo) error
// directly from a change streamer to allow streams canceled
// due to errors to propagate the error to the caller.
type changeContext struct {
context.Context
err error func addDirChanges(ctx context.Context, changes changeFn, root string) error {
errL sync.Mutex
}
func (cc *changeContext) Err() error {
cc.errL.Lock()
if cc.err != nil {
return cc.err
}
cc.errL.Unlock()
return cc.Context.Err()
}
func sendChange(ctx context.Context, changes chan<- Change, change Change) error {
select {
case <-ctx.Done():
return ctx.Err()
case changes <- change:
return nil
}
}
func addDirChanges(ctx context.Context, changes chan<- Change, root string) error {
return filepath.Walk(root, func(path string, f os.FileInfo, err error) error { return filepath.Walk(root, func(path string, f os.FileInfo, err error) error {
if err != nil { if err != nil {
return err return err
@ -154,13 +106,11 @@ func addDirChanges(ctx context.Context, changes chan<- Change, root string) erro
} }
change := Change{ change := Change{
Path: path, Path: path,
Kind: ChangeKindAdd, Kind: ChangeKindAdd,
FileInfo: f,
Source: filepath.Join(root, path),
} }
return sendChange(ctx, changes, change) return changes(change, f)
}) })
} }
@ -173,7 +123,7 @@ type diffDirOptions struct {
} }
// diffDirChanges walks the diff directory and compares changes against the lower. // diffDirChanges walks the diff directory and compares changes against the lower.
func diffDirChanges(ctx context.Context, changes chan<- Change, lower string, o *diffDirOptions) error { func diffDirChanges(ctx context.Context, changes changeFn, lower string, o *diffDirOptions) error {
changedDirs := make(map[string]struct{}) changedDirs := make(map[string]struct{})
return filepath.Walk(o.diffDir, func(path string, f os.FileInfo, err error) error { return filepath.Walk(o.diffDir, func(path string, f os.FileInfo, err error) error {
if err != nil { if err != nil {
@ -215,11 +165,10 @@ func diffDirChanges(ctx context.Context, changes chan<- Change, lower string, o
if deletedFile != "" { if deletedFile != "" {
change.Path = deletedFile change.Path = deletedFile
change.Kind = ChangeKindDelete change.Kind = ChangeKindDelete
f = nil
} else { } else {
// Otherwise, the file was added // Otherwise, the file was added
change.Kind = ChangeKindAdd change.Kind = ChangeKindAdd
change.FileInfo = f
change.Source = filepath.Join(o.diffDir, path)
// ...Unless it already existed in a lower, in which case, it's a modification // ...Unless it already existed in a lower, in which case, it's a modification
stat, err := os.Stat(filepath.Join(lower, path)) stat, err := os.Stat(filepath.Join(lower, path))
@ -256,105 +205,108 @@ func diffDirChanges(ctx context.Context, changes chan<- Change, lower string, o
return err return err
} }
dirChange := Change{ dirChange := Change{
Path: parent, Path: parent,
Kind: ChangeKindModify, Kind: ChangeKindModify,
FileInfo: pi,
Source: filepath.Join(o.diffDir, parent),
} }
if err := sendChange(ctx, changes, dirChange); err != nil { if err := changes(dirChange, pi); err != nil {
return err return err
} }
changedDirs[parent] = struct{}{} changedDirs[parent] = struct{}{}
} }
} }
return sendChange(ctx, changes, change) return changes(change, f)
}) })
} }
// doubleWalkDiff walks both directories to create a diff // doubleWalkDiff walks both directories to create a diff
func doubleWalkDiff(ctx context.Context, changes chan<- Change, upper, lower string) (err error) { func doubleWalkDiff(ctx context.Context, changes changeFn, upper, lower string) (err error) {
pathCtx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx)
defer func() {
if err != nil {
cancel()
}
}()
var ( var (
w1 = pathWalker(pathCtx, lower) c1 = make(chan *currentPath)
w2 = pathWalker(pathCtx, upper) c2 = make(chan *currentPath)
f1, f2 *currentPath f1, f2 *currentPath
rmdir string rmdir string
) )
g.Go(func() error {
defer close(c1)
return pathWalk(ctx, lower, c1)
})
g.Go(func() error {
defer close(c2)
return pathWalk(ctx, upper, c2)
})
g.Go(func() error {
for c1 != nil || c2 != nil {
if f1 == nil && c1 != nil {
f1, err = nextPath(ctx, c1)
if err != nil {
return err
}
if f1 == nil {
c1 = nil
}
}
for w1 != nil || w2 != nil { if f2 == nil && c2 != nil {
if f1 == nil && w1 != nil { f2, err = nextPath(ctx, c2)
f1, err = nextPath(w1) if err != nil {
if err != nil { return err
return err }
if f2 == nil {
c2 = nil
}
} }
if f1 == nil { if f1 == nil && f2 == nil {
w1 = nil continue
} }
}
if f2 == nil && w2 != nil { var f os.FileInfo
f2, err = nextPath(w2) c := pathChange(f1, f2)
if err != nil { switch c.Kind {
return err case ChangeKindAdd:
} if rmdir != "" {
if f2 == nil { rmdir = ""
w2 = nil }
} f = f2.f
} f2 = nil
if f1 == nil && f2 == nil { case ChangeKindDelete:
continue // Check if this file is already removed by being
} // under of a removed directory
if rmdir != "" && strings.HasPrefix(f1.path, rmdir) {
c := pathChange(f1, f2) f1 = nil
switch c.Kind { continue
case ChangeKindAdd: } else if rmdir == "" && f1.f.IsDir() {
if rmdir != "" { rmdir = f1.path + string(os.PathSeparator)
rmdir = "" } else if rmdir != "" {
} rmdir = ""
c.FileInfo = f2.f }
c.Source = filepath.Join(upper, c.Path)
f2 = nil
case ChangeKindDelete:
// Check if this file is already removed by being
// under of a removed directory
if rmdir != "" && strings.HasPrefix(f1.path, rmdir) {
f1 = nil f1 = nil
continue case ChangeKindModify:
} else if rmdir == "" && f1.f.IsDir() { same, err := sameFile(f1, f2)
rmdir = f1.path + string(os.PathSeparator) if err != nil {
} else if rmdir != "" { return err
rmdir = "" }
if f1.f.IsDir() && !f2.f.IsDir() {
rmdir = f1.path + string(os.PathSeparator)
} else if rmdir != "" {
rmdir = ""
}
f = f2.f
f1 = nil
f2 = nil
if same {
continue
}
} }
f1 = nil if err := changes(c, f); err != nil {
case ChangeKindModify:
same, err := sameFile(f1, f2)
if err != nil {
return err return err
} }
if f1.f.IsDir() && !f2.f.IsDir() {
rmdir = f1.path + string(os.PathSeparator)
} else if rmdir != "" {
rmdir = ""
}
c.FileInfo = f2.f
c.Source = filepath.Join(upper, c.Path)
f1 = nil
f2 = nil
if same {
continue
}
} }
if err := sendChange(ctx, changes, c); err != nil { return nil
return err })
}
}
return nil return g.Wait()
} }

View file

@ -220,13 +220,13 @@ func testDiffWithoutBase(apply fstest.Applier, expected []Change) error {
return checkChanges(tmp, changes, expected) return checkChanges(tmp, changes, expected)
} }
func checkChanges(root string, changes, expected []Change) error { func checkChanges(root string, changes []testChange, expected []Change) error {
if len(changes) != len(expected) { if len(changes) != len(expected) {
return errors.Errorf("Unexpected number of changes:\n%s", diffString(changes, expected)) return errors.Errorf("Unexpected number of changes:\n%s", diffString(convertTestChanges(changes), expected))
} }
for i := range changes { for i := range changes {
if changes[i].Path != expected[i].Path || changes[i].Kind != expected[i].Kind { if changes[i].Path != expected[i].Path || changes[i].Kind != expected[i].Kind {
return errors.Errorf("Unexpected change at %d:\n%s", i, diffString(changes, expected)) return errors.Errorf("Unexpected change at %d:\n%s", i, diffString(convertTestChanges(changes), expected))
} }
if changes[i].Kind != ChangeKindDelete { if changes[i].Kind != ChangeKindDelete {
filename := filepath.Join(root, changes[i].Path) filename := filepath.Join(root, changes[i].Path)
@ -253,20 +253,35 @@ func checkChanges(root string, changes, expected []Change) error {
return nil return nil
} }
func collectChanges(upper, lower string) ([]Change, error) { type testChange struct {
ctx, changeC := Changes(context.Background(), upper, lower) Change
changes := []Change{} FileInfo os.FileInfo
for { Source string
select { }
case <-ctx.Done():
return nil, ctx.Err() func collectChanges(upper, lower string) ([]testChange, error) {
case c, ok := <-changeC: changes := []testChange{}
if !ok { err := Changes(context.Background(), upper, lower, func(c Change, f os.FileInfo) error {
return changes, nil changes = append(changes, testChange{
} Change: c,
changes = append(changes, c) FileInfo: f,
} Source: filepath.Join(upper, c.Path),
})
return nil
})
if err != nil {
return nil, errors.Wrap(err, "failed to compute changes")
} }
return changes, nil
}
func convertTestChanges(c []testChange) []Change {
nc := make([]Change, len(c))
for i := range c {
nc[i] = c[i].Change
}
return nc
} }
func diffString(c1, c2 []Change) string { func diffString(c1, c2 []Change) string {

View file

@ -133,67 +133,45 @@ func compareFileContent(p1, p2 string) (bool, error) {
} }
} }
type walker struct { func pathWalk(ctx context.Context, root string, pathC chan<- *currentPath) error {
pathC <-chan *currentPath return filepath.Walk(root, func(path string, f os.FileInfo, err error) error {
errC <-chan error
}
func pathWalker(ctx context.Context, root string) *walker {
var (
pathC = make(chan *currentPath)
errC = make(chan error, 1)
)
go func() {
defer close(pathC)
err := filepath.Walk(root, func(path string, f os.FileInfo, err error) error {
if err != nil {
return err
}
// Rebase path
path, err = filepath.Rel(root, path)
if err != nil {
return err
}
path = filepath.Join(string(os.PathSeparator), path)
// Skip root
if path == string(os.PathSeparator) {
return nil
}
return sendPath(ctx, pathC, &currentPath{
path: path,
f: f,
fullPath: filepath.Join(root, path),
})
})
if err != nil { if err != nil {
errC <- err return err
} }
}()
return &walker{ // Rebase path
pathC: pathC, path, err = filepath.Rel(root, path)
errC: errC, if err != nil {
} return err
}
path = filepath.Join(string(os.PathSeparator), path)
// Skip root
if path == string(os.PathSeparator) {
return nil
}
p := &currentPath{
path: path,
f: f,
fullPath: filepath.Join(root, path),
}
select {
case <-ctx.Done():
return ctx.Err()
case pathC <- p:
return nil
}
})
} }
func sendPath(ctx context.Context, pc chan<- *currentPath, p *currentPath) error { func nextPath(ctx context.Context, pathC <-chan *currentPath) (*currentPath, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return nil, ctx.Err()
case pc <- p: case p := <-pathC:
return nil
}
}
func nextPath(w *walker) (*currentPath, error) {
select {
case err := <-w.errC:
return nil, err
case p := <-w.pathC:
return p, nil return p, nil
} }
} }