From 572a785718ed2c5f2b4f77197103266828c8f8e7 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Tue, 25 Aug 2015 14:23:52 -0700 Subject: [PATCH] Fix race condition when waiting for a concurrent layer pull Before, this only waited for the download to complete. There was no guarantee that the layer had been registered in the graph and was ready use. This is especially problematic with v2 pulls, which wait for all downloads before extracting layers. Change Broadcaster to allow an error value to be propagated from Close to the waiters. Make the wait stop when the extraction is finished, rather than just the download. This also fixes v2 layer downloads to prefix the pool key with "layer:" instead of "img:". "img:" is the wrong prefix, because this is what v1 uses for entire images. A v1 pull waiting for one of these operations to finish would only wait for that particular layer, not all its dependencies. Signed-off-by: Aaron Lehmann --- progressreader/broadcaster.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/progressreader/broadcaster.go b/progressreader/broadcaster.go index 5118e9e..58604aa 100644 --- a/progressreader/broadcaster.go +++ b/progressreader/broadcaster.go @@ -27,6 +27,9 @@ type Broadcaster struct { // isClosed is set to true when Close is called to avoid closing c // multiple times. isClosed bool + // result is the argument passed to the first call of Close, and + // returned to callers of Wait + result error } // NewBroadcaster returns a Broadcaster structure @@ -134,23 +137,33 @@ func (broadcaster *Broadcaster) Add(w io.Writer) error { return nil } -// Close signals to all observers that the operation has finished. -func (broadcaster *Broadcaster) Close() { +// CloseWithError signals to all observers that the operation has finished. Its +// argument is a result that should be returned to waiters blocking on Wait. +func (broadcaster *Broadcaster) CloseWithError(result error) { broadcaster.Lock() if broadcaster.isClosed { broadcaster.Unlock() return } broadcaster.isClosed = true + broadcaster.result = result close(broadcaster.c) broadcaster.cond.Broadcast() broadcaster.Unlock() - // Don't return from Close until all writers have caught up. + // Don't return until all writers have caught up. broadcaster.wg.Wait() } -// Wait blocks until the operation is marked as completed by the Done method. -func (broadcaster *Broadcaster) Wait() { - <-broadcaster.c +// Close signals to all observers that the operation has finished. It causes +// all calls to Wait to return nil. +func (broadcaster *Broadcaster) Close() { + broadcaster.CloseWithError(nil) +} + +// Wait blocks until the operation is marked as completed by the Done method. +// It returns the argument that was passed to Close. +func (broadcaster *Broadcaster) Wait() error { + <-broadcaster.c + return broadcaster.result }