From 7d99b193640909119b58be16c7e9a37c9f7d6038 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Fri, 3 Jul 2015 09:50:06 -0400 Subject: [PATCH] Split reader interface from logger interface Implement new reader interface on jsonfile. Moves jsonlog decoding from daemon to jsonfile logger. Signed-off-by: Brian Goff --- ioutils/multireader.go | 226 ++++++++++++++++++++++++++++++++++++ ioutils/multireader_test.go | 149 ++++++++++++++++++++++++ tailfile/tailfile.go | 3 +- 3 files changed, 377 insertions(+), 1 deletion(-) create mode 100644 ioutils/multireader.go create mode 100644 ioutils/multireader_test.go diff --git a/ioutils/multireader.go b/ioutils/multireader.go new file mode 100644 index 0000000..f231aa9 --- /dev/null +++ b/ioutils/multireader.go @@ -0,0 +1,226 @@ +package ioutils + +import ( + "bytes" + "fmt" + "io" + "os" +) + +type pos struct { + idx int + offset int64 +} + +type multiReadSeeker struct { + readers []io.ReadSeeker + pos *pos + posIdx map[io.ReadSeeker]int +} + +func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) { + var tmpOffset int64 + switch whence { + case os.SEEK_SET: + for i, rdr := range r.readers { + // get size of the current reader + s, err := rdr.Seek(0, os.SEEK_END) + if err != nil { + return -1, err + } + + if offset > tmpOffset+s { + if i == len(r.readers)-1 { + rdrOffset := s + (offset - tmpOffset) + if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil { + return -1, err + } + r.pos = &pos{i, rdrOffset} + return offset, nil + } + + tmpOffset += s + continue + } + + rdrOffset := offset - tmpOffset + idx := i + + rdr.Seek(rdrOffset, os.SEEK_SET) + // make sure all following readers are at 0 + for _, rdr := range r.readers[i+1:] { + rdr.Seek(0, os.SEEK_SET) + } + + if rdrOffset == s && i != len(r.readers)-1 { + idx += 1 + rdrOffset = 0 + } + r.pos = &pos{idx, rdrOffset} + return offset, nil + } + case os.SEEK_END: + for _, rdr := range r.readers { + s, err := rdr.Seek(0, os.SEEK_END) + if err != nil { + return -1, err + } + tmpOffset += s + } + r.Seek(tmpOffset+offset, os.SEEK_SET) + return tmpOffset + offset, nil + case os.SEEK_CUR: + if r.pos == nil { + return r.Seek(offset, os.SEEK_SET) + } + // Just return the current offset + if offset == 0 { + return r.getCurOffset() + } + + curOffset, err := r.getCurOffset() + if err != nil { + return -1, err + } + rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset) + if err != nil { + return -1, err + } + + r.pos = &pos{r.posIdx[rdr], rdrOffset} + return curOffset + offset, nil + default: + return -1, fmt.Errorf("Invalid whence: %d", whence) + } + + return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset) +} + +func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) { + var rdr io.ReadSeeker + var rdrOffset int64 + + for i, rdr := range r.readers { + offsetTo, err := r.getOffsetToReader(rdr) + if err != nil { + return nil, -1, err + } + if offsetTo > offset { + rdr = r.readers[i-1] + rdrOffset = offsetTo - offset + break + } + + if rdr == r.readers[len(r.readers)-1] { + rdrOffset = offsetTo + offset + break + } + } + + return rdr, rdrOffset, nil +} + +func (r *multiReadSeeker) getCurOffset() (int64, error) { + var totalSize int64 + for _, rdr := range r.readers[:r.pos.idx+1] { + if r.posIdx[rdr] == r.pos.idx { + totalSize += r.pos.offset + break + } + + size, err := getReadSeekerSize(rdr) + if err != nil { + return -1, fmt.Errorf("error getting seeker size: %v", err) + } + totalSize += size + } + return totalSize, nil +} + +func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) { + var offset int64 + for _, r := range r.readers { + if r == rdr { + break + } + + size, err := getReadSeekerSize(rdr) + if err != nil { + return -1, err + } + offset += size + } + return offset, nil +} + +func (r *multiReadSeeker) Read(b []byte) (int, error) { + if r.pos == nil { + r.pos = &pos{0, 0} + } + + bCap := int64(cap(b)) + buf := bytes.NewBuffer(nil) + var rdr io.ReadSeeker + + for _, rdr = range r.readers[r.pos.idx:] { + readBytes, err := io.CopyN(buf, rdr, bCap) + if err != nil && err != io.EOF { + return -1, err + } + bCap -= readBytes + + if bCap == 0 { + break + } + } + + rdrPos, err := rdr.Seek(0, os.SEEK_CUR) + if err != nil { + return -1, err + } + r.pos = &pos{r.posIdx[rdr], rdrPos} + return buf.Read(b) +} + +func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) { + // save the current position + pos, err := rdr.Seek(0, os.SEEK_CUR) + if err != nil { + return -1, err + } + + // get the size + size, err := rdr.Seek(0, os.SEEK_END) + if err != nil { + return -1, err + } + + // reset the position + if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil { + return -1, err + } + return size, nil +} + +// MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided +// input readseekers. After calling this method the initial position is set to the +// beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances +// to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker. +// Seek can be used over the sum of lengths of all readseekers. +// +// When a MultiReadSeeker is used, no Read and Seek operations should be made on +// its ReadSeeker components. Also, users should make no assumption on the state +// of individual readseekers while the MultiReadSeeker is used. +func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker { + if len(readers) == 1 { + return readers[0] + } + idx := make(map[io.ReadSeeker]int) + for i, rdr := range readers { + idx[rdr] = i + } + return &multiReadSeeker{ + readers: readers, + posIdx: idx, + } +} diff --git a/ioutils/multireader_test.go b/ioutils/multireader_test.go new file mode 100644 index 0000000..de495b5 --- /dev/null +++ b/ioutils/multireader_test.go @@ -0,0 +1,149 @@ +package ioutils + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "strings" + "testing" +) + +func TestMultiReadSeekerReadAll(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + expectedSize := int64(s1.Len() + s2.Len() + s3.Len()) + + b, err := ioutil.ReadAll(mr) + if err != nil { + t.Fatal(err) + } + + expected := "hello world 1hello world 2hello world 3" + if string(b) != expected { + t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) + } + + size, err := mr.Seek(0, os.SEEK_END) + if err != nil { + t.Fatal(err) + } + if size != expectedSize { + t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize) + } + + // Reset the position and read again + pos, err := mr.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if pos != 0 { + t.Fatalf("expected position to be set to 0, got %d", pos) + } + + b, err = ioutil.ReadAll(mr) + if err != nil { + t.Fatal(err) + } + + if string(b) != expected { + t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) + } +} + +func TestMultiReadSeekerReadEach(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + var totalBytes int64 + for i, s := range []*strings.Reader{s1, s2, s3} { + sLen := int64(s.Len()) + buf := make([]byte, s.Len()) + expected := []byte(fmt.Sprintf("%s %d", str, i+1)) + + if _, err := mr.Read(buf); err != nil && err != io.EOF { + t.Fatal(err) + } + + if !bytes.Equal(buf, expected) { + t.Fatalf("expected %q to be %q", string(buf), string(expected)) + } + + pos, err := mr.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("iteration: %d, error: %v", i+1, err) + } + + // check that the total bytes read is the current position of the seeker + totalBytes += sLen + if pos != totalBytes { + t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1) + } + + // This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well + newPos, err := mr.Seek(pos, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if newPos != pos { + t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos) + } + } +} + +func TestMultiReadSeekerReadSpanningChunks(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + buf := make([]byte, s1.Len()+3) + _, err := mr.Read(buf) + if err != nil { + t.Fatal(err) + } + + // expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string + expected := "hello world 1hel" + if string(buf) != expected { + t.Fatalf("expected %s to be %s", string(buf), expected) + } +} + +func TestMultiReadSeekerNegativeSeek(t *testing.T) { + str := "hello world" + s1 := strings.NewReader(str + " 1") + s2 := strings.NewReader(str + " 2") + s3 := strings.NewReader(str + " 3") + mr := MultiReadSeeker(s1, s2, s3) + + s1Len := s1.Len() + s2Len := s2.Len() + s3Len := s3.Len() + + s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END) + if err != nil { + t.Fatal(err) + } + if s != int64(s1Len+s2Len) { + t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len()) + } + + buf := make([]byte, s3Len) + if _, err := mr.Read(buf); err != nil && err != io.EOF { + t.Fatal(err) + } + expected := fmt.Sprintf("%s %d", str, 3) + if string(buf) != fmt.Sprintf("%s %d", str, 3) { + t.Fatalf("expected %q to be %q", string(buf), expected) + } +} diff --git a/tailfile/tailfile.go b/tailfile/tailfile.go index 2ffd36d..92aea46 100644 --- a/tailfile/tailfile.go +++ b/tailfile/tailfile.go @@ -3,6 +3,7 @@ package tailfile import ( "bytes" "errors" + "io" "os" ) @@ -12,7 +13,7 @@ var eol = []byte("\n") var ErrNonPositiveLinesNumber = errors.New("Lines number must be positive") //TailFile returns last n lines of file f -func TailFile(f *os.File, n int) ([][]byte, error) { +func TailFile(f io.ReadSeeker, n int) ([][]byte, error) { if n <= 0 { return nil, ErrNonPositiveLinesNumber }