Split reader interface from logger interface
Implement new reader interface on jsonfile. Moves jsonlog decoding from daemon to jsonfile logger. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
		
							parent
							
								
									49691393a9
								
							
						
					
					
						commit
						7d99b19364
					
				
					 3 changed files with 377 additions and 1 deletions
				
			
		
							
								
								
									
										226
									
								
								ioutils/multireader.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										226
									
								
								ioutils/multireader.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,226 @@ | |||
| package ioutils | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"os" | ||||
| ) | ||||
| 
 | ||||
| type pos struct { | ||||
| 	idx    int | ||||
| 	offset int64 | ||||
| } | ||||
| 
 | ||||
| type multiReadSeeker struct { | ||||
| 	readers []io.ReadSeeker | ||||
| 	pos     *pos | ||||
| 	posIdx  map[io.ReadSeeker]int | ||||
| } | ||||
| 
 | ||||
| func (r *multiReadSeeker) Seek(offset int64, whence int) (int64, error) { | ||||
| 	var tmpOffset int64 | ||||
| 	switch whence { | ||||
| 	case os.SEEK_SET: | ||||
| 		for i, rdr := range r.readers { | ||||
| 			// get size of the current reader | ||||
| 			s, err := rdr.Seek(0, os.SEEK_END) | ||||
| 			if err != nil { | ||||
| 				return -1, err | ||||
| 			} | ||||
| 
 | ||||
| 			if offset > tmpOffset+s { | ||||
| 				if i == len(r.readers)-1 { | ||||
| 					rdrOffset := s + (offset - tmpOffset) | ||||
| 					if _, err := rdr.Seek(rdrOffset, os.SEEK_SET); err != nil { | ||||
| 						return -1, err | ||||
| 					} | ||||
| 					r.pos = &pos{i, rdrOffset} | ||||
| 					return offset, nil | ||||
| 				} | ||||
| 
 | ||||
| 				tmpOffset += s | ||||
| 				continue | ||||
| 			} | ||||
| 
 | ||||
| 			rdrOffset := offset - tmpOffset | ||||
| 			idx := i | ||||
| 
 | ||||
| 			rdr.Seek(rdrOffset, os.SEEK_SET) | ||||
| 			// make sure all following readers are at 0 | ||||
| 			for _, rdr := range r.readers[i+1:] { | ||||
| 				rdr.Seek(0, os.SEEK_SET) | ||||
| 			} | ||||
| 
 | ||||
| 			if rdrOffset == s && i != len(r.readers)-1 { | ||||
| 				idx += 1 | ||||
| 				rdrOffset = 0 | ||||
| 			} | ||||
| 			r.pos = &pos{idx, rdrOffset} | ||||
| 			return offset, nil | ||||
| 		} | ||||
| 	case os.SEEK_END: | ||||
| 		for _, rdr := range r.readers { | ||||
| 			s, err := rdr.Seek(0, os.SEEK_END) | ||||
| 			if err != nil { | ||||
| 				return -1, err | ||||
| 			} | ||||
| 			tmpOffset += s | ||||
| 		} | ||||
| 		r.Seek(tmpOffset+offset, os.SEEK_SET) | ||||
| 		return tmpOffset + offset, nil | ||||
| 	case os.SEEK_CUR: | ||||
| 		if r.pos == nil { | ||||
| 			return r.Seek(offset, os.SEEK_SET) | ||||
| 		} | ||||
| 		// Just return the current offset | ||||
| 		if offset == 0 { | ||||
| 			return r.getCurOffset() | ||||
| 		} | ||||
| 
 | ||||
| 		curOffset, err := r.getCurOffset() | ||||
| 		if err != nil { | ||||
| 			return -1, err | ||||
| 		} | ||||
| 		rdr, rdrOffset, err := r.getReaderForOffset(curOffset + offset) | ||||
| 		if err != nil { | ||||
| 			return -1, err | ||||
| 		} | ||||
| 
 | ||||
| 		r.pos = &pos{r.posIdx[rdr], rdrOffset} | ||||
| 		return curOffset + offset, nil | ||||
| 	default: | ||||
| 		return -1, fmt.Errorf("Invalid whence: %d", whence) | ||||
| 	} | ||||
| 
 | ||||
| 	return -1, fmt.Errorf("Error seeking for whence: %d, offset: %d", whence, offset) | ||||
| } | ||||
| 
 | ||||
| func (r *multiReadSeeker) getReaderForOffset(offset int64) (io.ReadSeeker, int64, error) { | ||||
| 	var rdr io.ReadSeeker | ||||
| 	var rdrOffset int64 | ||||
| 
 | ||||
| 	for i, rdr := range r.readers { | ||||
| 		offsetTo, err := r.getOffsetToReader(rdr) | ||||
| 		if err != nil { | ||||
| 			return nil, -1, err | ||||
| 		} | ||||
| 		if offsetTo > offset { | ||||
| 			rdr = r.readers[i-1] | ||||
| 			rdrOffset = offsetTo - offset | ||||
| 			break | ||||
| 		} | ||||
| 
 | ||||
| 		if rdr == r.readers[len(r.readers)-1] { | ||||
| 			rdrOffset = offsetTo + offset | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return rdr, rdrOffset, nil | ||||
| } | ||||
| 
 | ||||
| func (r *multiReadSeeker) getCurOffset() (int64, error) { | ||||
| 	var totalSize int64 | ||||
| 	for _, rdr := range r.readers[:r.pos.idx+1] { | ||||
| 		if r.posIdx[rdr] == r.pos.idx { | ||||
| 			totalSize += r.pos.offset | ||||
| 			break | ||||
| 		} | ||||
| 
 | ||||
| 		size, err := getReadSeekerSize(rdr) | ||||
| 		if err != nil { | ||||
| 			return -1, fmt.Errorf("error getting seeker size: %v", err) | ||||
| 		} | ||||
| 		totalSize += size | ||||
| 	} | ||||
| 	return totalSize, nil | ||||
| } | ||||
| 
 | ||||
| func (r *multiReadSeeker) getOffsetToReader(rdr io.ReadSeeker) (int64, error) { | ||||
| 	var offset int64 | ||||
| 	for _, r := range r.readers { | ||||
| 		if r == rdr { | ||||
| 			break | ||||
| 		} | ||||
| 
 | ||||
| 		size, err := getReadSeekerSize(rdr) | ||||
| 		if err != nil { | ||||
| 			return -1, err | ||||
| 		} | ||||
| 		offset += size | ||||
| 	} | ||||
| 	return offset, nil | ||||
| } | ||||
| 
 | ||||
| func (r *multiReadSeeker) Read(b []byte) (int, error) { | ||||
| 	if r.pos == nil { | ||||
| 		r.pos = &pos{0, 0} | ||||
| 	} | ||||
| 
 | ||||
| 	bCap := int64(cap(b)) | ||||
| 	buf := bytes.NewBuffer(nil) | ||||
| 	var rdr io.ReadSeeker | ||||
| 
 | ||||
| 	for _, rdr = range r.readers[r.pos.idx:] { | ||||
| 		readBytes, err := io.CopyN(buf, rdr, bCap) | ||||
| 		if err != nil && err != io.EOF { | ||||
| 			return -1, err | ||||
| 		} | ||||
| 		bCap -= readBytes | ||||
| 
 | ||||
| 		if bCap == 0 { | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	rdrPos, err := rdr.Seek(0, os.SEEK_CUR) | ||||
| 	if err != nil { | ||||
| 		return -1, err | ||||
| 	} | ||||
| 	r.pos = &pos{r.posIdx[rdr], rdrPos} | ||||
| 	return buf.Read(b) | ||||
| } | ||||
| 
 | ||||
| func getReadSeekerSize(rdr io.ReadSeeker) (int64, error) { | ||||
| 	// save the current position | ||||
| 	pos, err := rdr.Seek(0, os.SEEK_CUR) | ||||
| 	if err != nil { | ||||
| 		return -1, err | ||||
| 	} | ||||
| 
 | ||||
| 	// get the size | ||||
| 	size, err := rdr.Seek(0, os.SEEK_END) | ||||
| 	if err != nil { | ||||
| 		return -1, err | ||||
| 	} | ||||
| 
 | ||||
| 	// reset the position | ||||
| 	if _, err := rdr.Seek(pos, os.SEEK_SET); err != nil { | ||||
| 		return -1, err | ||||
| 	} | ||||
| 	return size, nil | ||||
| } | ||||
| 
 | ||||
| // MultiReadSeeker returns a ReadSeeker that's the logical concatenation of the provided | ||||
| // input readseekers. After calling this method the initial position is set to the | ||||
| // beginning of the first ReadSeeker. At the end of a ReadSeeker, Read always advances | ||||
| // to the beginning of the next ReadSeeker and returns EOF at the end of the last ReadSeeker. | ||||
| // Seek can be used over the sum of lengths of all readseekers. | ||||
| // | ||||
| // When a MultiReadSeeker is used, no Read and Seek operations should be made on | ||||
| // its ReadSeeker components. Also, users should make no assumption on the state | ||||
| // of individual readseekers while the MultiReadSeeker is used. | ||||
| func MultiReadSeeker(readers ...io.ReadSeeker) io.ReadSeeker { | ||||
| 	if len(readers) == 1 { | ||||
| 		return readers[0] | ||||
| 	} | ||||
| 	idx := make(map[io.ReadSeeker]int) | ||||
| 	for i, rdr := range readers { | ||||
| 		idx[rdr] = i | ||||
| 	} | ||||
| 	return &multiReadSeeker{ | ||||
| 		readers: readers, | ||||
| 		posIdx:  idx, | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										149
									
								
								ioutils/multireader_test.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										149
									
								
								ioutils/multireader_test.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,149 @@ | |||
| package ioutils | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"io/ioutil" | ||||
| 	"os" | ||||
| 	"strings" | ||||
| 	"testing" | ||||
| ) | ||||
| 
 | ||||
| func TestMultiReadSeekerReadAll(t *testing.T) { | ||||
| 	str := "hello world" | ||||
| 	s1 := strings.NewReader(str + " 1") | ||||
| 	s2 := strings.NewReader(str + " 2") | ||||
| 	s3 := strings.NewReader(str + " 3") | ||||
| 	mr := MultiReadSeeker(s1, s2, s3) | ||||
| 
 | ||||
| 	expectedSize := int64(s1.Len() + s2.Len() + s3.Len()) | ||||
| 
 | ||||
| 	b, err := ioutil.ReadAll(mr) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	expected := "hello world 1hello world 2hello world 3" | ||||
| 	if string(b) != expected { | ||||
| 		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) | ||||
| 	} | ||||
| 
 | ||||
| 	size, err := mr.Seek(0, os.SEEK_END) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	if size != expectedSize { | ||||
| 		t.Fatalf("reader size does not match, got %d, expected %d", size, expectedSize) | ||||
| 	} | ||||
| 
 | ||||
| 	// Reset the position and read again | ||||
| 	pos, err := mr.Seek(0, os.SEEK_SET) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	if pos != 0 { | ||||
| 		t.Fatalf("expected position to be set to 0, got %d", pos) | ||||
| 	} | ||||
| 
 | ||||
| 	b, err = ioutil.ReadAll(mr) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	if string(b) != expected { | ||||
| 		t.Fatalf("ReadAll failed, got: %q, expected %q", string(b), expected) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestMultiReadSeekerReadEach(t *testing.T) { | ||||
| 	str := "hello world" | ||||
| 	s1 := strings.NewReader(str + " 1") | ||||
| 	s2 := strings.NewReader(str + " 2") | ||||
| 	s3 := strings.NewReader(str + " 3") | ||||
| 	mr := MultiReadSeeker(s1, s2, s3) | ||||
| 
 | ||||
| 	var totalBytes int64 | ||||
| 	for i, s := range []*strings.Reader{s1, s2, s3} { | ||||
| 		sLen := int64(s.Len()) | ||||
| 		buf := make([]byte, s.Len()) | ||||
| 		expected := []byte(fmt.Sprintf("%s %d", str, i+1)) | ||||
| 
 | ||||
| 		if _, err := mr.Read(buf); err != nil && err != io.EOF { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 
 | ||||
| 		if !bytes.Equal(buf, expected) { | ||||
| 			t.Fatalf("expected %q to be %q", string(buf), string(expected)) | ||||
| 		} | ||||
| 
 | ||||
| 		pos, err := mr.Seek(0, os.SEEK_CUR) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("iteration: %d, error: %v", i+1, err) | ||||
| 		} | ||||
| 
 | ||||
| 		// check that the total bytes read is the current position of the seeker | ||||
| 		totalBytes += sLen | ||||
| 		if pos != totalBytes { | ||||
| 			t.Fatalf("expected current position to be: %d, got: %d, iteration: %d", totalBytes, pos, i+1) | ||||
| 		} | ||||
| 
 | ||||
| 		// This tests not only that SEEK_SET and SEEK_CUR give the same values, but that the next iteration is in the expected position as well | ||||
| 		newPos, err := mr.Seek(pos, os.SEEK_SET) | ||||
| 		if err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 		if newPos != pos { | ||||
| 			t.Fatalf("expected to get same position when calling SEEK_SET with value from SEEK_CUR, cur: %d, set: %d", pos, newPos) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestMultiReadSeekerReadSpanningChunks(t *testing.T) { | ||||
| 	str := "hello world" | ||||
| 	s1 := strings.NewReader(str + " 1") | ||||
| 	s2 := strings.NewReader(str + " 2") | ||||
| 	s3 := strings.NewReader(str + " 3") | ||||
| 	mr := MultiReadSeeker(s1, s2, s3) | ||||
| 
 | ||||
| 	buf := make([]byte, s1.Len()+3) | ||||
| 	_, err := mr.Read(buf) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	// expected is the contents of s1 + 3 bytes from s2, ie, the `hel` at the end of this string | ||||
| 	expected := "hello world 1hel" | ||||
| 	if string(buf) != expected { | ||||
| 		t.Fatalf("expected %s to be %s", string(buf), expected) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestMultiReadSeekerNegativeSeek(t *testing.T) { | ||||
| 	str := "hello world" | ||||
| 	s1 := strings.NewReader(str + " 1") | ||||
| 	s2 := strings.NewReader(str + " 2") | ||||
| 	s3 := strings.NewReader(str + " 3") | ||||
| 	mr := MultiReadSeeker(s1, s2, s3) | ||||
| 
 | ||||
| 	s1Len := s1.Len() | ||||
| 	s2Len := s2.Len() | ||||
| 	s3Len := s3.Len() | ||||
| 
 | ||||
| 	s, err := mr.Seek(int64(-1*s3.Len()), os.SEEK_END) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	if s != int64(s1Len+s2Len) { | ||||
| 		t.Fatalf("expected %d to be %d", s, s1.Len()+s2.Len()) | ||||
| 	} | ||||
| 
 | ||||
| 	buf := make([]byte, s3Len) | ||||
| 	if _, err := mr.Read(buf); err != nil && err != io.EOF { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	expected := fmt.Sprintf("%s %d", str, 3) | ||||
| 	if string(buf) != fmt.Sprintf("%s %d", str, 3) { | ||||
| 		t.Fatalf("expected %q to be %q", string(buf), expected) | ||||
| 	} | ||||
| } | ||||
|  | @ -3,6 +3,7 @@ package tailfile | |||
| import ( | ||||
| 	"bytes" | ||||
| 	"errors" | ||||
| 	"io" | ||||
| 	"os" | ||||
| ) | ||||
| 
 | ||||
|  | @ -12,7 +13,7 @@ var eol = []byte("\n") | |||
| var ErrNonPositiveLinesNumber = errors.New("Lines number must be positive") | ||||
| 
 | ||||
| //TailFile returns last n lines of file f | ||||
| func TailFile(f *os.File, n int) ([][]byte, error) { | ||||
| func TailFile(f io.ReadSeeker, n int) ([][]byte, error) { | ||||
| 	if n <= 0 { | ||||
| 		return nil, ErrNonPositiveLinesNumber | ||||
| 	} | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue