package ioutils import ( "errors" "io" "sync" ) // maxCap is the highest capacity to use in byte slices that buffer data. const maxCap = 1e6 // minCap is the lowest capacity to use in byte slices that buffer data const minCap = 64 // blockThreshold is the minimum number of bytes in the buffer which will cause // a write to BytesPipe to block when allocating a new slice. const blockThreshold = 1e6 var ( // ErrClosed is returned when Write is called on a closed BytesPipe. ErrClosed = errors.New("write to closed BytesPipe") bufPools = make(map[int]*sync.Pool) bufPoolsLock sync.Mutex ) // BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue). // All written data may be read at most once. Also, BytesPipe allocates // and releases new byte slices to adjust to current needs, so the buffer // won't be overgrown after peak loads. type BytesPipe struct { mu sync.Mutex wait *sync.Cond buf []*fixedBuffer bufLen int closeErr error // error to return from next Read. set to nil if not closed. } // NewBytesPipe creates new BytesPipe, initialized by specified slice. // If buf is nil, then it will be initialized with slice which cap is 64. // buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf). func NewBytesPipe() *BytesPipe { bp := &BytesPipe{} bp.buf = append(bp.buf, getBuffer(minCap)) bp.wait = sync.NewCond(&bp.mu) return bp } // Write writes p to BytesPipe. // It can allocate new []byte slices in a process of writing. func (bp *BytesPipe) Write(p []byte) (int, error) { bp.mu.Lock() written := 0 loop0: for { if bp.closeErr != nil { bp.mu.Unlock() return written, ErrClosed } if len(bp.buf) == 0 { bp.buf = append(bp.buf, getBuffer(64)) } // get the last buffer b := bp.buf[len(bp.buf)-1] n, err := b.Write(p) written += n bp.bufLen += n // errBufferFull is an error we expect to get if the buffer is full if err != nil && err != errBufferFull { bp.wait.Broadcast() bp.mu.Unlock() return written, err } // if there was enough room to write all then break if len(p) == n { break } // more data: write to the next slice p = p[n:] // make sure the buffer doesn't grow too big from this write for bp.bufLen >= blockThreshold { bp.wait.Wait() if bp.closeErr != nil { continue loop0 } } // add new byte slice to the buffers slice and continue writing nextCap := b.Cap() * 2 if nextCap > maxCap { nextCap = maxCap } bp.buf = append(bp.buf, getBuffer(nextCap)) } bp.wait.Broadcast() bp.mu.Unlock() return written, nil } // CloseWithError causes further reads from a BytesPipe to return immediately. func (bp *BytesPipe) CloseWithError(err error) error { bp.mu.Lock() if err != nil { bp.closeErr = err } else { bp.closeErr = io.EOF } bp.wait.Broadcast() bp.mu.Unlock() return nil } // Close causes further reads from a BytesPipe to return immediately. func (bp *BytesPipe) Close() error { return bp.CloseWithError(nil) } // Read reads bytes from BytesPipe. // Data could be read only once. func (bp *BytesPipe) Read(p []byte) (n int, err error) { bp.mu.Lock() if bp.bufLen == 0 { if bp.closeErr != nil { bp.mu.Unlock() return 0, bp.closeErr } bp.wait.Wait() if bp.bufLen == 0 && bp.closeErr != nil { err := bp.closeErr bp.mu.Unlock() return 0, err } } for bp.bufLen > 0 { b := bp.buf[0] read, _ := b.Read(p) // ignore error since fixedBuffer doesn't really return an error n += read bp.bufLen -= read if b.Len() == 0 { // it's empty so return it to the pool and move to the next one returnBuffer(b) bp.buf[0] = nil bp.buf = bp.buf[1:] } if len(p) == read { break } p = p[read:] } bp.wait.Broadcast() bp.mu.Unlock() return } func returnBuffer(b *fixedBuffer) { b.Reset() bufPoolsLock.Lock() pool := bufPools[b.Cap()] bufPoolsLock.Unlock() if pool != nil { pool.Put(b) } } func getBuffer(size int) *fixedBuffer { bufPoolsLock.Lock() pool, ok := bufPools[size] if !ok { pool = &sync.Pool{New: func() interface{} { return &fixedBuffer{buf: make([]byte, 0, size)} }} bufPools[size] = pool } bufPoolsLock.Unlock() return pool.Get().(*fixedBuffer) }