diff --git a/ioutils/readers.go b/ioutils/readers.go new file mode 100644 index 0000000..8488034 --- /dev/null +++ b/ioutils/readers.go @@ -0,0 +1,82 @@ +package ioutils + +import ( + "bytes" + "io" + "sync" +) + +type readCloserWrapper struct { + io.Reader + closer func() error +} + +func (r *readCloserWrapper) Close() error { + return r.closer() +} + +func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser { + return &readCloserWrapper{ + Reader: r, + closer: closer, + } +} + +type bufReader struct { + sync.Mutex + buf *bytes.Buffer + reader io.Reader + err error + wait sync.Cond +} + +func NewBufReader(r io.Reader) *bufReader { + reader := &bufReader{ + buf: &bytes.Buffer{}, + reader: r, + } + reader.wait.L = &reader.Mutex + go reader.drain() + return reader +} + +func (r *bufReader) drain() { + buf := make([]byte, 1024) + for { + n, err := r.reader.Read(buf) + r.Lock() + if err != nil { + r.err = err + } else { + r.buf.Write(buf[0:n]) + } + r.wait.Signal() + r.Unlock() + if err != nil { + break + } + } +} + +func (r *bufReader) Read(p []byte) (n int, err error) { + r.Lock() + defer r.Unlock() + for { + n, err = r.buf.Read(p) + if n > 0 { + return n, err + } + if r.err != nil { + return 0, r.err + } + r.wait.Wait() + } +} + +func (r *bufReader) Close() error { + closer, ok := r.reader.(io.ReadCloser) + if !ok { + return nil + } + return closer.Close() +} diff --git a/ioutils/readers_test.go b/ioutils/readers_test.go new file mode 100644 index 0000000..a7a2dad --- /dev/null +++ b/ioutils/readers_test.go @@ -0,0 +1,34 @@ +package ioutils + +import ( + "bytes" + "io" + "io/ioutil" + "testing" +) + +func TestBufReader(t *testing.T) { + reader, writer := io.Pipe() + bufreader := NewBufReader(reader) + + // Write everything down to a Pipe + // Usually, a pipe should block but because of the buffered reader, + // the writes will go through + done := make(chan bool) + go func() { + writer.Write([]byte("hello world")) + writer.Close() + done <- true + }() + + // Drain the reader *after* everything has been written, just to verify + // it is indeed buffering + <-done + output, err := ioutil.ReadAll(bufreader) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(output, []byte("hello world")) { + t.Error(string(output)) + } +} diff --git a/ioutils/writers.go b/ioutils/writers.go new file mode 100644 index 0000000..de7bd02 --- /dev/null +++ b/ioutils/writers.go @@ -0,0 +1,23 @@ +package ioutils + +import "io" + +type NopWriter struct{} + +func (*NopWriter) Write(buf []byte) (int, error) { + return len(buf), nil +} + +type nopWriteCloser struct { + io.Writer +} + +func (w *nopWriteCloser) Close() error { return nil } + +func NopWriteCloser(w io.Writer) io.WriteCloser { + return &nopWriteCloser{w} +} + +type NopFlusher struct{} + +func (f *NopFlusher) Flush() {}