Fix race condition when waiting for a concurrent layer pull
Before, this only waited for the download to complete. There was no guarantee that the layer had been registered in the graph and was ready use. This is especially problematic with v2 pulls, which wait for all downloads before extracting layers. Change Broadcaster to allow an error value to be propagated from Close to the waiters. Make the wait stop when the extraction is finished, rather than just the download. This also fixes v2 layer downloads to prefix the pool key with "layer:" instead of "img:". "img:" is the wrong prefix, because this is what v1 uses for entire images. A v1 pull waiting for one of these operations to finish would only wait for that particular layer, not all its dependencies. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
parent
975bd63ecd
commit
572a785718
1 changed files with 19 additions and 6 deletions
|
@ -27,6 +27,9 @@ type Broadcaster struct {
|
|||
// isClosed is set to true when Close is called to avoid closing c
|
||||
// multiple times.
|
||||
isClosed bool
|
||||
// result is the argument passed to the first call of Close, and
|
||||
// returned to callers of Wait
|
||||
result error
|
||||
}
|
||||
|
||||
// NewBroadcaster returns a Broadcaster structure
|
||||
|
@ -134,23 +137,33 @@ func (broadcaster *Broadcaster) Add(w io.Writer) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Close signals to all observers that the operation has finished.
|
||||
func (broadcaster *Broadcaster) Close() {
|
||||
// CloseWithError signals to all observers that the operation has finished. Its
|
||||
// argument is a result that should be returned to waiters blocking on Wait.
|
||||
func (broadcaster *Broadcaster) CloseWithError(result error) {
|
||||
broadcaster.Lock()
|
||||
if broadcaster.isClosed {
|
||||
broadcaster.Unlock()
|
||||
return
|
||||
}
|
||||
broadcaster.isClosed = true
|
||||
broadcaster.result = result
|
||||
close(broadcaster.c)
|
||||
broadcaster.cond.Broadcast()
|
||||
broadcaster.Unlock()
|
||||
|
||||
// Don't return from Close until all writers have caught up.
|
||||
// Don't return until all writers have caught up.
|
||||
broadcaster.wg.Wait()
|
||||
}
|
||||
|
||||
// Wait blocks until the operation is marked as completed by the Done method.
|
||||
func (broadcaster *Broadcaster) Wait() {
|
||||
<-broadcaster.c
|
||||
// Close signals to all observers that the operation has finished. It causes
|
||||
// all calls to Wait to return nil.
|
||||
func (broadcaster *Broadcaster) Close() {
|
||||
broadcaster.CloseWithError(nil)
|
||||
}
|
||||
|
||||
// Wait blocks until the operation is marked as completed by the Done method.
|
||||
// It returns the argument that was passed to Close.
|
||||
func (broadcaster *Broadcaster) Wait() error {
|
||||
<-broadcaster.c
|
||||
return broadcaster.result
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue