registry/client/push.go
2014-12-23 17:13:02 -08:00

137 lines
3.4 KiB
Go

package client
import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/docker/distribution/storage"
)
// simultaneousLayerPushWindow is the size of the parallel layer push window.
// A layer may not be pushed until the layer preceeding it by the length of the
// push window has been successfully pushed.
const simultaneousLayerPushWindow = 4
type pushFunction func(fsLayer storage.FSLayer) error
// Push implements a client push workflow for the image defined by the given
// name and tag pair, using the given ObjectStore for local manifest and layer
// storage
func Push(c Client, objectStore ObjectStore, name, tag string) error {
manifest, err := objectStore.Manifest(name, tag)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"name": name,
"tag": tag,
}).Info("No image found")
return err
}
errChans := make([]chan error, len(manifest.FSLayers))
for i := range manifest.FSLayers {
errChans[i] = make(chan error)
}
cancelCh := make(chan struct{})
// Iterate over each layer in the manifest, simultaneously pushing no more
// than simultaneousLayerPushWindow layers at a time. If an error is
// received from a layer push, we abort the push.
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPushWindow; i++ {
dependentLayer := i - simultaneousLayerPushWindow
if dependentLayer >= 0 {
err := <-errChans[dependentLayer]
if err != nil {
log.WithField("error", err).Warn("Push aborted")
close(cancelCh)
return err
}
}
if i < len(manifest.FSLayers) {
go func(i int) {
select {
case errChans[i] <- pushLayer(c, objectStore, name, manifest.FSLayers[i]):
case <-cancelCh: // recv broadcast notification about cancelation
}
}(i)
}
}
err = c.PutImageManifest(name, tag, manifest)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"manifest": manifest,
}).Warn("Unable to upload manifest")
return err
}
return nil
}
func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer storage.FSLayer) error {
log.WithField("layer", fsLayer).Info("Pushing layer")
layer, err := objectStore.Layer(fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
layerReader, err := layer.Reader()
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
defer layerReader.Close()
if layerReader.CurrentSize() != layerReader.Size() {
log.WithFields(log.Fields{
"layer": fsLayer,
"currentSize": layerReader.CurrentSize(),
"size": layerReader.Size(),
}).Warn("Local layer incomplete")
return fmt.Errorf("Local layer incomplete")
}
length, err := c.BlobLength(name, fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to check existence of remote layer")
return err
}
if length >= 0 {
log.WithField("layer", fsLayer).Info("Layer already exists")
return nil
}
location, err := c.InitiateBlobUpload(name)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
}
err = c.UploadBlob(location, layerReader, int(layerReader.CurrentSize()), fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
}
return nil
}