2016-11-22 19:32:10 +00:00
package copy
import (
"bytes"
"compress/gzip"
2017-08-05 11:40:46 +00:00
"context"
2016-11-22 19:32:10 +00:00
"fmt"
"io"
"io/ioutil"
"reflect"
2017-07-20 20:31:51 +00:00
"runtime"
2017-05-17 17:18:35 +00:00
"strings"
2017-03-13 16:33:17 +00:00
"time"
2016-11-22 19:32:10 +00:00
"github.com/containers/image/image"
2017-03-13 16:33:17 +00:00
"github.com/containers/image/pkg/compression"
2016-11-22 19:32:10 +00:00
"github.com/containers/image/signature"
"github.com/containers/image/transports"
"github.com/containers/image/types"
2016-10-17 13:53:40 +00:00
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
2017-08-05 11:40:46 +00:00
"github.com/sirupsen/logrus"
2017-11-03 17:36:13 +00:00
pb "gopkg.in/cheggaaa/pb.v1"
2016-11-22 19:32:10 +00:00
)
type digestingReader struct {
source io . Reader
digester digest . Digester
expectedDigest digest . Digest
validationFailed bool
}
// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
// and set validationFailed to true if the source stream does not match expectedDigest.
func newDigestingReader ( source io . Reader , expectedDigest digest . Digest ) ( * digestingReader , error ) {
if err := expectedDigest . Validate ( ) ; err != nil {
2016-10-17 13:53:40 +00:00
return nil , errors . Errorf ( "Invalid digest specification %s" , expectedDigest )
2016-11-22 19:32:10 +00:00
}
digestAlgorithm := expectedDigest . Algorithm ( )
if ! digestAlgorithm . Available ( ) {
2016-10-17 13:53:40 +00:00
return nil , errors . Errorf ( "Invalid digest specification %s: unsupported digest algorithm %s" , expectedDigest , digestAlgorithm )
2016-11-22 19:32:10 +00:00
}
return & digestingReader {
source : source ,
2016-10-17 13:53:40 +00:00
digester : digestAlgorithm . Digester ( ) ,
2016-11-22 19:32:10 +00:00
expectedDigest : expectedDigest ,
validationFailed : false ,
} , nil
}
func ( d * digestingReader ) Read ( p [ ] byte ) ( int , error ) {
n , err := d . source . Read ( p )
if n > 0 {
if n2 , err := d . digester . Hash ( ) . Write ( p [ : n ] ) ; n2 != n || err != nil {
// Coverage: This should not happen, the hash.Hash interface requires
// d.digest.Write to never return an error, and the io.Writer interface
// requires n2 == len(input) if no error is returned.
2016-10-17 13:53:40 +00:00
return 0 , errors . Wrapf ( err , "Error updating digest during verification: %d vs. %d" , n2 , n )
2016-11-22 19:32:10 +00:00
}
}
if err == io . EOF {
actualDigest := d . digester . Digest ( )
if actualDigest != d . expectedDigest {
d . validationFailed = true
2016-10-17 13:53:40 +00:00
return 0 , errors . Errorf ( "Digest did not match, expected %s, got %s" , d . expectedDigest , actualDigest )
2016-11-22 19:32:10 +00:00
}
}
return n , err
}
2017-11-03 17:36:13 +00:00
// copier allows us to keep track of diffID values for blobs, and other
// data shared across one or more images in a possible manifest list.
type copier struct {
copiedBlobs map [ digest . Digest ] digest . Digest
cachedDiffIDs map [ digest . Digest ] digest . Digest
dest types . ImageDestination
rawSource types . ImageSource
reportWriter io . Writer
progressInterval time . Duration
progress chan types . ProgressProperties
}
// imageCopier tracks state specific to a single image (possibly an item of a manifest list)
type imageCopier struct {
c * copier
manifestUpdates * types . ManifestUpdateOptions
src types . Image
diffIDsAreNeeded bool
canModifyManifest bool
}
2016-11-22 19:32:10 +00:00
// Options allows supplying non-default configuration modifying the behavior of CopyImage.
type Options struct {
RemoveSignatures bool // Remove any pre-existing signatures. SignBy will still add a new signature.
SignBy string // If non-empty, asks for a signature to be added during the copy, and specifies a key ID, as accepted by signature.NewGPGSigningMechanism().SignDockerManifest(),
ReportWriter io . Writer
SourceCtx * types . SystemContext
DestinationCtx * types . SystemContext
2017-03-13 16:33:17 +00:00
ProgressInterval time . Duration // time to wait between reports to signal the progress channel
Progress chan types . ProgressProperties // Reported to when ProgressInterval has arrived for a single artifact+offset.
2017-11-03 17:36:13 +00:00
// manifest MIME type of image set by user. "" is default and means use the autodetection to the the manifest MIME type
ForceManifestMIMEType string
2016-11-22 19:32:10 +00:00
}
2017-03-13 16:33:17 +00:00
// Image copies image from srcRef to destRef, using policyContext to validate
// source image admissibility.
func Image ( policyContext * signature . PolicyContext , destRef , srcRef types . ImageReference , options * Options ) ( retErr error ) {
// NOTE this function uses an output parameter for the error return value.
// Setting this and returning is the ideal way to return an error.
//
// the defers in this routine will wrap the error return with its own errors
// which can be valuable context in the middle of a multi-streamed copy.
if options == nil {
options = & Options { }
}
2016-11-22 19:32:10 +00:00
reportWriter := ioutil . Discard
2017-03-13 16:33:17 +00:00
if options . ReportWriter != nil {
2016-11-22 19:32:10 +00:00
reportWriter = options . ReportWriter
}
2017-03-13 16:33:17 +00:00
2016-11-22 19:32:10 +00:00
dest , err := destRef . NewImageDestination ( options . DestinationCtx )
if err != nil {
2016-10-17 13:53:40 +00:00
return errors . Wrapf ( err , "Error initializing destination %s" , transports . ImageName ( destRef ) )
2016-11-22 19:32:10 +00:00
}
2017-03-13 16:33:17 +00:00
defer func ( ) {
if err := dest . Close ( ) ; err != nil {
retErr = errors . Wrapf ( retErr , " (dest: %v)" , err )
}
} ( )
2017-09-13 17:01:06 +00:00
rawSource , err := srcRef . NewImageSource ( options . SourceCtx )
2016-11-22 19:32:10 +00:00
if err != nil {
2016-10-17 13:53:40 +00:00
return errors . Wrapf ( err , "Error initializing source %s" , transports . ImageName ( srcRef ) )
2016-11-22 19:32:10 +00:00
}
defer func ( ) {
2017-11-03 17:36:13 +00:00
if err := rawSource . Close ( ) ; err != nil {
retErr = errors . Wrapf ( retErr , " (src: %v)" , err )
2016-11-22 19:32:10 +00:00
}
} ( )
2017-11-03 17:36:13 +00:00
c := & copier {
copiedBlobs : make ( map [ digest . Digest ] digest . Digest ) ,
cachedDiffIDs : make ( map [ digest . Digest ] digest . Digest ) ,
dest : dest ,
rawSource : rawSource ,
reportWriter : reportWriter ,
progressInterval : options . ProgressInterval ,
progress : options . Progress ,
}
unparsedToplevel := image . UnparsedInstance ( rawSource , nil )
multiImage , err := isMultiImage ( unparsedToplevel )
if err != nil {
return errors . Wrapf ( err , "Error determining manifest MIME type for %s" , transports . ImageName ( srcRef ) )
}
if ! multiImage {
// The simple case: Just copy a single image.
if err := c . copyOneImage ( policyContext , options , unparsedToplevel ) ; err != nil {
return err
}
} else {
// This is a manifest list. Choose a single image and copy it.
// FIXME: Copy to destinations which support manifest lists, one image at a time.
instanceDigest , err := image . ChooseManifestInstanceFromManifestList ( options . SourceCtx , unparsedToplevel )
if err != nil {
return errors . Wrapf ( err , "Error choosing an image from manifest list %s" , transports . ImageName ( srcRef ) )
}
logrus . Debugf ( "Source is a manifest list; copying (only) instance %s" , instanceDigest )
unparsedInstance := image . UnparsedInstance ( rawSource , & instanceDigest )
if err := c . copyOneImage ( policyContext , options , unparsedInstance ) ; err != nil {
return err
}
}
if err := c . dest . Commit ( ) ; err != nil {
return errors . Wrap ( err , "Error committing the finished image" )
}
return nil
}
// Image copies a single (on-manifest-list) image unparsedImage, using policyContext to validate
// source image admissibility.
func ( c * copier ) copyOneImage ( policyContext * signature . PolicyContext , options * Options , unparsedImage * image . UnparsedImage ) ( retErr error ) {
// The caller is handling manifest lists; this could happen only if a manifest list contains a manifest list.
// Make sure we fail cleanly in such cases.
multiImage , err := isMultiImage ( unparsedImage )
if err != nil {
// FIXME FIXME: How to name a reference for the sub-image?
return errors . Wrapf ( err , "Error determining manifest MIME type for %s" , transports . ImageName ( unparsedImage . Reference ( ) ) )
}
if multiImage {
return fmt . Errorf ( "Unexpectedly received a manifest list instead of a manifest for a single image" )
}
2016-11-22 19:32:10 +00:00
// Please keep this policy check BEFORE reading any other information about the image.
2017-11-03 17:36:13 +00:00
// (the multiImage check above only matches the MIME type, which we have received anyway.
// Actual parsing of anything should be deferred.)
2016-11-22 19:32:10 +00:00
if allowed , err := policyContext . IsRunningImageAllowed ( unparsedImage ) ; ! allowed || err != nil { // Be paranoid and fail if either return value indicates so.
2016-10-17 13:53:40 +00:00
return errors . Wrap ( err , "Source image rejected" )
2016-11-22 19:32:10 +00:00
}
2017-11-03 17:36:13 +00:00
src , err := image . FromUnparsedImage ( options . SourceCtx , unparsedImage )
2016-11-22 19:32:10 +00:00
if err != nil {
2017-11-03 17:36:13 +00:00
return errors . Wrapf ( err , "Error initializing image from source %s" , transports . ImageName ( c . rawSource . Reference ( ) ) )
2016-11-22 19:32:10 +00:00
}
2017-11-03 17:36:13 +00:00
if err := checkImageDestinationForCurrentRuntimeOS ( options . DestinationCtx , src , c . dest ) ; err != nil {
2017-07-20 20:31:51 +00:00
return err
}
2016-11-22 19:32:10 +00:00
var sigs [ ] [ ] byte
2017-03-13 16:33:17 +00:00
if options . RemoveSignatures {
2016-11-22 19:32:10 +00:00
sigs = [ ] [ ] byte { }
} else {
2017-11-03 17:36:13 +00:00
c . Printf ( "Getting image source signatures\n" )
2017-08-05 11:40:46 +00:00
s , err := src . Signatures ( context . TODO ( ) )
2016-11-22 19:32:10 +00:00
if err != nil {
2016-10-17 13:53:40 +00:00
return errors . Wrap ( err , "Error reading signatures" )
2016-11-22 19:32:10 +00:00
}
sigs = s
}
if len ( sigs ) != 0 {
2017-11-03 17:36:13 +00:00
c . Printf ( "Checking if image destination supports signatures\n" )
if err := c . dest . SupportsSignatures ( ) ; err != nil {
2016-10-17 13:53:40 +00:00
return errors . Wrap ( err , "Can not copy signatures" )
2016-11-22 19:32:10 +00:00
}
}
2017-11-03 17:36:13 +00:00
ic := imageCopier {
c : c ,
manifestUpdates : & types . ManifestUpdateOptions { InformationOnly : types . ManifestUpdateInformation { Destination : c . dest } } ,
src : src ,
// diffIDsAreNeeded is computed later
canModifyManifest : len ( sigs ) == 0 ,
}
2016-11-22 19:32:10 +00:00
2017-11-03 17:36:13 +00:00
if err := ic . updateEmbeddedDockerReference ( ) ; err != nil {
2017-05-17 17:18:35 +00:00
return err
}
// We compute preferredManifestMIMEType only to show it in error messages.
// Without having to add this context in an error message, we would be happy enough to know only that no conversion is needed.
2017-11-03 17:36:13 +00:00
preferredManifestMIMEType , otherManifestMIMETypeCandidates , err := ic . determineManifestConversion ( c . dest . SupportedManifestMIMETypes ( ) , options . ForceManifestMIMEType )
2017-05-17 17:18:35 +00:00
if err != nil {
2016-11-22 19:32:10 +00:00
return err
}
2017-11-03 17:36:13 +00:00
// If src.UpdatedImageNeedsLayerDiffIDs(ic.manifestUpdates) will be true, it needs to be true by the time we get here.
ic . diffIDsAreNeeded = src . UpdatedImageNeedsLayerDiffIDs ( * ic . manifestUpdates )
2016-11-22 19:32:10 +00:00
if err := ic . copyLayers ( ) ; err != nil {
return err
}
2017-05-17 17:18:35 +00:00
// With docker/distribution registries we do not know whether the registry accepts schema2 or schema1 only;
// and at least with the OpenShift registry "acceptschema2" option, there is no way to detect the support
// without actually trying to upload something and getting a types.ManifestTypeRejectedError.
// So, try the preferred manifest MIME type. If the process succeeds, fine…
manifest , err := ic . copyUpdatedConfigAndManifest ( )
if err != nil {
logrus . Debugf ( "Writing manifest using preferred type %s failed: %v" , preferredManifestMIMEType , err )
// … if it fails, _and_ the failure is because the manifest is rejected, we may have other options.
if _ , isManifestRejected := errors . Cause ( err ) . ( types . ManifestTypeRejectedError ) ; ! isManifestRejected || len ( otherManifestMIMETypeCandidates ) == 0 {
// We don’ t have other options.
// In principle the code below would handle this as well, but the resulting error message is fairly ugly.
// Don’ t bother the user with MIME types if we have no choice.
return err
2016-11-22 19:32:10 +00:00
}
2017-05-17 17:18:35 +00:00
// If the original MIME type is acceptable, determineManifestConversion always uses it as preferredManifestMIMEType.
// So if we are here, we will definitely be trying to convert the manifest.
2017-11-03 17:36:13 +00:00
// With !ic.canModifyManifest, that would just be a string of repeated failures for the same reason,
2017-05-17 17:18:35 +00:00
// so let’ s bail out early and with a better error message.
2017-11-03 17:36:13 +00:00
if ! ic . canModifyManifest {
2017-05-17 17:18:35 +00:00
return errors . Wrap ( err , "Writing manifest failed (and converting it is not possible)" )
2016-11-22 19:32:10 +00:00
}
2017-05-17 17:18:35 +00:00
// errs is a list of errors when trying various manifest types. Also serves as an "upload succeeded" flag when set to nil.
errs := [ ] string { fmt . Sprintf ( "%s(%v)" , preferredManifestMIMEType , err ) }
for _ , manifestMIMEType := range otherManifestMIMETypeCandidates {
logrus . Debugf ( "Trying to use manifest type %s…" , manifestMIMEType )
2017-11-03 17:36:13 +00:00
ic . manifestUpdates . ManifestMIMEType = manifestMIMEType
2017-05-17 17:18:35 +00:00
attemptedManifest , err := ic . copyUpdatedConfigAndManifest ( )
if err != nil {
logrus . Debugf ( "Upload of manifest type %s failed: %v" , manifestMIMEType , err )
errs = append ( errs , fmt . Sprintf ( "%s(%v)" , manifestMIMEType , err ) )
continue
}
2016-11-22 19:32:10 +00:00
2017-05-17 17:18:35 +00:00
// We have successfully uploaded a manifest.
manifest = attemptedManifest
errs = nil // Mark this as a success so that we don't abort below.
break
2017-04-03 07:22:44 +00:00
}
2017-05-17 17:18:35 +00:00
if errs != nil {
return fmt . Errorf ( "Uploading manifest failed, attempted the following formats: %s" , strings . Join ( errs , ", " ) )
2016-11-22 19:32:10 +00:00
}
2017-05-17 17:18:35 +00:00
}
2016-11-22 19:32:10 +00:00
2017-05-17 17:18:35 +00:00
if options . SignBy != "" {
2017-11-03 17:36:13 +00:00
newSig , err := c . createSignature ( manifest , options . SignBy )
2016-11-22 19:32:10 +00:00
if err != nil {
2017-05-17 17:18:35 +00:00
return err
2016-11-22 19:32:10 +00:00
}
sigs = append ( sigs , newSig )
}
2017-11-03 17:36:13 +00:00
c . Printf ( "Storing signatures\n" )
if err := c . dest . PutSignatures ( sigs ) ; err != nil {
2016-10-17 13:53:40 +00:00
return errors . Wrap ( err , "Error writing signatures" )
2016-11-22 19:32:10 +00:00
}
return nil
}
2017-11-03 17:36:13 +00:00
// Printf writes a formatted string to c.reportWriter.
// Note that the method name Printf is not entirely arbitrary: (go tool vet)
// has a built-in list of functions/methods (whatever object they are for)
// which have their format strings checked; for other names we would have
// to pass a parameter to every (go tool vet) invocation.
func ( c * copier ) Printf ( format string , a ... interface { } ) {
fmt . Fprintf ( c . reportWriter , format , a ... )
}
func checkImageDestinationForCurrentRuntimeOS ( ctx * types . SystemContext , src types . Image , dest types . ImageDestination ) error {
2017-07-20 20:31:51 +00:00
if dest . MustMatchRuntimeOS ( ) {
2017-11-03 17:36:13 +00:00
wantedOS := runtime . GOOS
if ctx != nil && ctx . OSChoice != "" {
wantedOS = ctx . OSChoice
}
2017-07-20 20:31:51 +00:00
c , err := src . OCIConfig ( )
if err != nil {
return errors . Wrapf ( err , "Error parsing image configuration" )
}
2017-11-03 17:36:13 +00:00
osErr := fmt . Errorf ( "image operating system %q cannot be used on %q" , c . OS , wantedOS )
if wantedOS == "windows" && c . OS == "linux" {
2017-07-20 20:31:51 +00:00
return osErr
2017-11-03 17:36:13 +00:00
} else if wantedOS != "windows" && c . OS == "windows" {
2017-07-20 20:31:51 +00:00
return osErr
}
}
return nil
}
2017-05-17 17:18:35 +00:00
// updateEmbeddedDockerReference handles the Docker reference embedded in Docker schema1 manifests.
2017-11-03 17:36:13 +00:00
func ( ic * imageCopier ) updateEmbeddedDockerReference ( ) error {
destRef := ic . c . dest . Reference ( ) . DockerReference ( )
2017-05-17 17:18:35 +00:00
if destRef == nil {
return nil // Destination does not care about Docker references
}
2017-11-03 17:36:13 +00:00
if ! ic . src . EmbeddedDockerReferenceConflicts ( destRef ) {
2017-05-17 17:18:35 +00:00
return nil // No reference embedded in the manifest, or it matches destRef already.
}
2017-11-03 17:36:13 +00:00
if ! ic . canModifyManifest {
2017-05-17 17:18:35 +00:00
return errors . Errorf ( "Copying a schema1 image with an embedded Docker reference to %s (Docker reference %s) would invalidate existing signatures. Explicitly enable signature removal to proceed anyway" ,
2017-11-03 17:36:13 +00:00
transports . ImageName ( ic . c . dest . Reference ( ) ) , destRef . String ( ) )
2017-05-17 17:18:35 +00:00
}
2017-11-03 17:36:13 +00:00
ic . manifestUpdates . EmbeddedDockerReference = destRef
2017-05-17 17:18:35 +00:00
return nil
}
2017-11-03 17:36:13 +00:00
// copyLayers copies layers from ic.src/ic.c.rawSource to dest, using and updating ic.manifestUpdates if necessary and ic.canModifyManifest.
2016-11-22 19:32:10 +00:00
func ( ic * imageCopier ) copyLayers ( ) error {
srcInfos := ic . src . LayerInfos ( )
destInfos := [ ] types . BlobInfo { }
diffIDs := [ ] digest . Digest { }
2017-11-03 17:36:13 +00:00
updatedSrcInfos := ic . src . LayerInfosForCopy ( )
srcInfosUpdated := false
if updatedSrcInfos != nil && ! reflect . DeepEqual ( srcInfos , updatedSrcInfos ) {
if ! ic . canModifyManifest {
return errors . Errorf ( "Internal error: copyLayers() needs to use an updated manifest but that was known to be forbidden" )
}
srcInfos = updatedSrcInfos
srcInfosUpdated = true
}
2016-11-22 19:32:10 +00:00
for _ , srcLayer := range srcInfos {
var (
destInfo types . BlobInfo
diffID digest . Digest
err error
)
2017-11-03 17:36:13 +00:00
if ic . c . dest . AcceptsForeignLayerURLs ( ) && len ( srcLayer . URLs ) != 0 {
2016-11-22 19:32:10 +00:00
// DiffIDs are, currently, needed only when converting from schema1.
// In which case src.LayerInfos will not have URLs because schema1
// does not support them.
if ic . diffIDsAreNeeded {
return errors . New ( "getting DiffID for foreign layers is unimplemented" )
}
destInfo = srcLayer
2017-11-03 17:36:13 +00:00
ic . c . Printf ( "Skipping foreign layer %q copy to %s\n" , destInfo . Digest , ic . c . dest . Reference ( ) . Transport ( ) . Name ( ) )
2016-11-22 19:32:10 +00:00
} else {
destInfo , diffID , err = ic . copyLayer ( srcLayer )
if err != nil {
return err
}
}
destInfos = append ( destInfos , destInfo )
diffIDs = append ( diffIDs , diffID )
}
ic . manifestUpdates . InformationOnly . LayerInfos = destInfos
if ic . diffIDsAreNeeded {
ic . manifestUpdates . InformationOnly . LayerDiffIDs = diffIDs
}
2017-11-03 17:36:13 +00:00
if srcInfosUpdated || layerDigestsDiffer ( srcInfos , destInfos ) {
2016-11-22 19:32:10 +00:00
ic . manifestUpdates . LayerInfos = destInfos
}
return nil
}
// layerDigestsDiffer return true iff the digests in a and b differ (ignoring sizes and possible other fields)
func layerDigestsDiffer ( a , b [ ] types . BlobInfo ) bool {
if len ( a ) != len ( b ) {
return true
}
for i := range a {
if a [ i ] . Digest != b [ i ] . Digest {
return true
}
}
return false
}
2017-05-17 17:18:35 +00:00
// copyUpdatedConfigAndManifest updates the image per ic.manifestUpdates, if necessary,
// stores the resulting config and manifest to the destination, and returns the stored manifest.
func ( ic * imageCopier ) copyUpdatedConfigAndManifest ( ) ( [ ] byte , error ) {
pendingImage := ic . src
if ! reflect . DeepEqual ( * ic . manifestUpdates , types . ManifestUpdateOptions { InformationOnly : ic . manifestUpdates . InformationOnly } ) {
if ! ic . canModifyManifest {
return nil , errors . Errorf ( "Internal error: copy needs an updated manifest but that was known to be forbidden" )
}
if ! ic . diffIDsAreNeeded && ic . src . UpdatedImageNeedsLayerDiffIDs ( * ic . manifestUpdates ) {
// We have set ic.diffIDsAreNeeded based on the preferred MIME type returned by determineManifestConversion.
// So, this can only happen if we are trying to upload using one of the other MIME type candidates.
// Because UpdatedImageNeedsLayerDiffIDs is true only when converting from s1 to s2, this case should only arise
2017-11-03 17:36:13 +00:00
// when ic.c.dest.SupportedManifestMIMETypes() includes both s1 and s2, the upload using s1 failed, and we are now trying s2.
2017-05-17 17:18:35 +00:00
// Supposedly s2-only registries do not exist or are extremely rare, so failing with this error message is good enough for now.
// If handling such registries turns out to be necessary, we could compute ic.diffIDsAreNeeded based on the full list of manifest MIME type candidates.
return nil , errors . Errorf ( "Can not convert image to %s, preparing DiffIDs for this case is not supported" , ic . manifestUpdates . ManifestMIMEType )
}
pi , err := ic . src . UpdatedImage ( * ic . manifestUpdates )
if err != nil {
return nil , errors . Wrap ( err , "Error creating an updated image manifest" )
}
pendingImage = pi
}
manifest , _ , err := pendingImage . Manifest ( )
if err != nil {
return nil , errors . Wrap ( err , "Error reading manifest" )
}
2017-11-03 17:36:13 +00:00
if err := ic . c . copyConfig ( pendingImage ) ; err != nil {
2017-05-17 17:18:35 +00:00
return nil , err
}
2017-11-03 17:36:13 +00:00
ic . c . Printf ( "Writing manifest to image destination\n" )
if err := ic . c . dest . PutManifest ( manifest ) ; err != nil {
2017-05-17 17:18:35 +00:00
return nil , errors . Wrap ( err , "Error writing manifest" )
}
return manifest , nil
}
2016-11-22 19:32:10 +00:00
// copyConfig copies config.json, if any, from src to dest.
2017-11-03 17:36:13 +00:00
func ( c * copier ) copyConfig ( src types . Image ) error {
2016-11-22 19:32:10 +00:00
srcInfo := src . ConfigInfo ( )
if srcInfo . Digest != "" {
2017-11-03 17:36:13 +00:00
c . Printf ( "Copying config %s\n" , srcInfo . Digest )
2016-11-22 19:32:10 +00:00
configBlob , err := src . ConfigBlob ( )
if err != nil {
2016-10-17 13:53:40 +00:00
return errors . Wrapf ( err , "Error reading config blob %s" , srcInfo . Digest )
2016-11-22 19:32:10 +00:00
}
2017-11-03 17:36:13 +00:00
destInfo , err := c . copyBlobFromStream ( bytes . NewReader ( configBlob ) , srcInfo , nil , false )
2016-11-22 19:32:10 +00:00
if err != nil {
return err
}
if destInfo . Digest != srcInfo . Digest {
2016-10-17 13:53:40 +00:00
return errors . Errorf ( "Internal error: copying uncompressed config blob %s changed digest to %s" , srcInfo . Digest , destInfo . Digest )
2016-11-22 19:32:10 +00:00
}
}
return nil
}
// diffIDResult contains both a digest value and an error from diffIDComputationGoroutine.
// We could also send the error through the pipeReader, but this more cleanly separates the copying of the layer and the DiffID computation.
type diffIDResult struct {
digest digest . Digest
err error
}
// copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func ( ic * imageCopier ) copyLayer ( srcInfo types . BlobInfo ) ( types . BlobInfo , digest . Digest , error ) {
// Check if we already have a blob with this digest
2017-11-03 17:36:13 +00:00
haveBlob , extantBlobSize , err := ic . c . dest . HasBlob ( srcInfo )
2017-04-03 07:22:44 +00:00
if err != nil {
2016-10-17 13:53:40 +00:00
return types . BlobInfo { } , "" , errors . Wrapf ( err , "Error checking for blob %s at destination" , srcInfo . Digest )
2016-11-22 19:32:10 +00:00
}
// If we already have a cached diffID for this blob, we don't need to compute it
2017-11-03 17:36:13 +00:00
diffIDIsNeeded := ic . diffIDsAreNeeded && ( ic . c . cachedDiffIDs [ srcInfo . Digest ] == "" )
2016-11-22 19:32:10 +00:00
// If we already have the blob, and we don't need to recompute the diffID, then we might be able to avoid reading it again
if haveBlob && ! diffIDIsNeeded {
// Check the blob sizes match, if we were given a size this time
if srcInfo . Size != - 1 && srcInfo . Size != extantBlobSize {
2016-10-17 13:53:40 +00:00
return types . BlobInfo { } , "" , errors . Errorf ( "Error: blob %s is already present, but with size %d instead of %d" , srcInfo . Digest , extantBlobSize , srcInfo . Size )
2016-11-22 19:32:10 +00:00
}
srcInfo . Size = extantBlobSize
// Tell the image destination that this blob's delta is being applied again. For some image destinations, this can be faster than using GetBlob/PutBlob
2017-11-03 17:36:13 +00:00
blobinfo , err := ic . c . dest . ReapplyBlob ( srcInfo )
2016-11-22 19:32:10 +00:00
if err != nil {
2016-10-17 13:53:40 +00:00
return types . BlobInfo { } , "" , errors . Wrapf ( err , "Error reapplying blob %s at destination" , srcInfo . Digest )
2016-11-22 19:32:10 +00:00
}
2017-11-03 17:36:13 +00:00
ic . c . Printf ( "Skipping fetch of repeat blob %s\n" , srcInfo . Digest )
return blobinfo , ic . c . cachedDiffIDs [ srcInfo . Digest ] , err
2016-11-22 19:32:10 +00:00
}
// Fallback: copy the layer, computing the diffID if we need to do so
2017-11-03 17:36:13 +00:00
ic . c . Printf ( "Copying blob %s\n" , srcInfo . Digest )
srcStream , srcBlobSize , err := ic . c . rawSource . GetBlob ( srcInfo )
2016-11-22 19:32:10 +00:00
if err != nil {
2016-10-17 13:53:40 +00:00
return types . BlobInfo { } , "" , errors . Wrapf ( err , "Error reading blob %s" , srcInfo . Digest )
2016-11-22 19:32:10 +00:00
}
defer srcStream . Close ( )
blobInfo , diffIDChan , err := ic . copyLayerFromStream ( srcStream , types . BlobInfo { Digest : srcInfo . Digest , Size : srcBlobSize } ,
diffIDIsNeeded )
if err != nil {
return types . BlobInfo { } , "" , err
}
var diffIDResult diffIDResult // = {digest:""}
if diffIDIsNeeded {
diffIDResult = <- diffIDChan
if diffIDResult . err != nil {
2016-10-17 13:53:40 +00:00
return types . BlobInfo { } , "" , errors . Wrap ( diffIDResult . err , "Error computing layer DiffID" )
2016-11-22 19:32:10 +00:00
}
logrus . Debugf ( "Computed DiffID %s for layer %s" , diffIDResult . digest , srcInfo . Digest )
2017-11-03 17:36:13 +00:00
ic . c . cachedDiffIDs [ srcInfo . Digest ] = diffIDResult . digest
2016-11-22 19:32:10 +00:00
}
return blobInfo , diffIDResult . digest , nil
}
// copyLayerFromStream is an implementation detail of copyLayer; mostly providing a separate “defer” scope.
// it copies a blob with srcInfo (with known Digest and possibly known Size) from srcStream to dest,
// perhaps compressing the stream if canCompress,
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func ( ic * imageCopier ) copyLayerFromStream ( srcStream io . Reader , srcInfo types . BlobInfo ,
diffIDIsNeeded bool ) ( types . BlobInfo , <- chan diffIDResult , error ) {
2017-03-13 16:33:17 +00:00
var getDiffIDRecorder func ( compression . DecompressorFunc ) io . Writer // = nil
2016-11-22 19:32:10 +00:00
var diffIDChan chan diffIDResult
err := errors . New ( "Internal error: unexpected panic in copyLayer" ) // For pipeWriter.CloseWithError below
if diffIDIsNeeded {
diffIDChan = make ( chan diffIDResult , 1 ) // Buffered, so that sending a value after this or our caller has failed and exited does not block.
pipeReader , pipeWriter := io . Pipe ( )
defer func ( ) { // Note that this is not the same as {defer pipeWriter.CloseWithError(err)}; we need err to be evaluated lazily.
pipeWriter . CloseWithError ( err ) // CloseWithError(nil) is equivalent to Close()
} ( )
2017-03-13 16:33:17 +00:00
getDiffIDRecorder = func ( decompressor compression . DecompressorFunc ) io . Writer {
2016-11-22 19:32:10 +00:00
// If this fails, e.g. because we have exited and due to pipeWriter.CloseWithError() above further
// reading from the pipe has failed, we don’ t really care.
// We only read from diffIDChan if the rest of the flow has succeeded, and when we do read from it,
// the return value includes an error indication, which we do check.
//
// If this gets never called, pipeReader will not be used anywhere, but pipeWriter will only be
// closed above, so we are happy enough with both pipeReader and pipeWriter to just get collected by GC.
go diffIDComputationGoroutine ( diffIDChan , pipeReader , decompressor ) // Closes pipeReader
return pipeWriter
}
}
2017-11-03 17:36:13 +00:00
blobInfo , err := ic . c . copyBlobFromStream ( srcStream , srcInfo , getDiffIDRecorder , ic . canModifyManifest ) // Sets err to nil on success
2016-11-22 19:32:10 +00:00
return blobInfo , diffIDChan , err
// We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan
}
// diffIDComputationGoroutine reads all input from layerStream, uncompresses using decompressor if necessary, and sends its digest, and status, if any, to dest.
2017-03-13 16:33:17 +00:00
func diffIDComputationGoroutine ( dest chan <- diffIDResult , layerStream io . ReadCloser , decompressor compression . DecompressorFunc ) {
2016-11-22 19:32:10 +00:00
result := diffIDResult {
digest : "" ,
err : errors . New ( "Internal error: unexpected panic in diffIDComputationGoroutine" ) ,
}
defer func ( ) { dest <- result } ( )
defer layerStream . Close ( ) // We do not care to bother the other end of the pipe with other failures; we send them to dest instead.
result . digest , result . err = computeDiffID ( layerStream , decompressor )
}
// computeDiffID reads all input from layerStream, uncompresses it using decompressor if necessary, and returns its digest.
2017-03-13 16:33:17 +00:00
func computeDiffID ( stream io . Reader , decompressor compression . DecompressorFunc ) ( digest . Digest , error ) {
2016-11-22 19:32:10 +00:00
if decompressor != nil {
s , err := decompressor ( stream )
if err != nil {
return "" , err
}
stream = s
}
return digest . Canonical . FromReader ( stream )
}
// copyBlobFromStream copies a blob with srcInfo (with known Digest and possibly known Size) from srcStream to dest,
// perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil,
// perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied blob.
2017-11-03 17:36:13 +00:00
func ( c * copier ) copyBlobFromStream ( srcStream io . Reader , srcInfo types . BlobInfo ,
2017-03-13 16:33:17 +00:00
getOriginalLayerCopyWriter func ( decompressor compression . DecompressorFunc ) io . Writer ,
2016-11-22 19:32:10 +00:00
canCompress bool ) ( types . BlobInfo , error ) {
// The copying happens through a pipeline of connected io.Readers.
// === Input: srcStream
// === Process input through digestingReader to validate against the expected digest.
// Be paranoid; in case PutBlob somehow managed to ignore an error from digestingReader,
// use a separate validation failure indicator.
// Note that we don't use a stronger "validationSucceeded" indicator, because
// dest.PutBlob may detect that the layer already exists, in which case we don't
// read stream to the end, and validation does not happen.
digestingReader , err := newDigestingReader ( srcStream , srcInfo . Digest )
if err != nil {
2016-10-17 13:53:40 +00:00
return types . BlobInfo { } , errors . Wrapf ( err , "Error preparing to verify blob %s" , srcInfo . Digest )
2016-11-22 19:32:10 +00:00
}
var destStream io . Reader = digestingReader
// === Detect compression of the input stream.
2017-03-13 16:33:17 +00:00
// This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by DetectCompression.
decompressor , destStream , err := compression . DetectCompression ( destStream ) // We could skip this in some cases, but let's keep the code path uniform
2016-11-22 19:32:10 +00:00
if err != nil {
2016-10-17 13:53:40 +00:00
return types . BlobInfo { } , errors . Wrapf ( err , "Error reading blob %s" , srcInfo . Digest )
2016-11-22 19:32:10 +00:00
}
isCompressed := decompressor != nil
// === Report progress using a pb.Reader.
bar := pb . New ( int ( srcInfo . Size ) ) . SetUnits ( pb . U_BYTES )
2017-11-03 17:36:13 +00:00
bar . Output = c . reportWriter
2016-11-22 19:32:10 +00:00
bar . SetMaxWidth ( 80 )
bar . ShowTimeLeft = false
bar . ShowPercent = false
bar . Start ( )
destStream = bar . NewProxyReader ( destStream )
2017-10-10 14:11:06 +00:00
defer bar . Finish ( )
2016-11-22 19:32:10 +00:00
// === Send a copy of the original, uncompressed, stream, to a separate path if necessary.
var originalLayerReader io . Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so.
if getOriginalLayerCopyWriter != nil {
destStream = io . TeeReader ( destStream , getOriginalLayerCopyWriter ( decompressor ) )
originalLayerReader = destStream
}
// === Compress the layer if it is uncompressed and compression is desired
var inputInfo types . BlobInfo
2017-11-03 17:36:13 +00:00
if ! canCompress || isCompressed || ! c . dest . ShouldCompressLayers ( ) {
2016-11-22 19:32:10 +00:00
logrus . Debugf ( "Using original blob without modification" )
inputInfo = srcInfo
} else {
logrus . Debugf ( "Compressing blob on the fly" )
pipeReader , pipeWriter := io . Pipe ( )
defer pipeReader . Close ( )
// If this fails while writing data, it will do pipeWriter.CloseWithError(); if it fails otherwise,
// e.g. because we have exited and due to pipeReader.Close() above further writing to the pipe has failed,
// we don’ t care.
go compressGoroutine ( pipeWriter , destStream ) // Closes pipeWriter
destStream = pipeReader
inputInfo . Digest = ""
inputInfo . Size = - 1
}
2017-11-03 17:36:13 +00:00
// === Report progress using the c.progress channel, if required.
if c . progress != nil && c . progressInterval > 0 {
2017-03-13 16:33:17 +00:00
destStream = & progressReader {
source : destStream ,
2017-11-03 17:36:13 +00:00
channel : c . progress ,
interval : c . progressInterval ,
2017-03-13 16:33:17 +00:00
artifact : srcInfo ,
lastTime : time . Now ( ) ,
}
}
2016-11-22 19:32:10 +00:00
// === Finally, send the layer stream to dest.
2017-11-03 17:36:13 +00:00
uploadedInfo , err := c . dest . PutBlob ( destStream , inputInfo )
2016-11-22 19:32:10 +00:00
if err != nil {
2016-10-17 13:53:40 +00:00
return types . BlobInfo { } , errors . Wrap ( err , "Error writing blob" )
2016-11-22 19:32:10 +00:00
}
// This is fairly horrible: the writer from getOriginalLayerCopyWriter wants to consumer
// all of the input (to compute DiffIDs), even if dest.PutBlob does not need it.
// So, read everything from originalLayerReader, which will cause the rest to be
// sent there if we are not already at EOF.
if getOriginalLayerCopyWriter != nil {
logrus . Debugf ( "Consuming rest of the original blob to satisfy getOriginalLayerCopyWriter" )
_ , err := io . Copy ( ioutil . Discard , originalLayerReader )
if err != nil {
2016-10-17 13:53:40 +00:00
return types . BlobInfo { } , errors . Wrapf ( err , "Error reading input blob %s" , srcInfo . Digest )
2016-11-22 19:32:10 +00:00
}
}
if digestingReader . validationFailed { // Coverage: This should never happen.
2016-10-17 13:53:40 +00:00
return types . BlobInfo { } , errors . Errorf ( "Internal error writing blob %s, digest verification failed but was ignored" , srcInfo . Digest )
2016-11-22 19:32:10 +00:00
}
if inputInfo . Digest != "" && uploadedInfo . Digest != inputInfo . Digest {
2016-10-17 13:53:40 +00:00
return types . BlobInfo { } , errors . Errorf ( "Internal error writing blob %s, blob with digest %s saved with digest %s" , srcInfo . Digest , inputInfo . Digest , uploadedInfo . Digest )
2016-11-22 19:32:10 +00:00
}
return uploadedInfo , nil
}
// compressGoroutine reads all input from src and writes its compressed equivalent to dest.
func compressGoroutine ( dest * io . PipeWriter , src io . Reader ) {
err := errors . New ( "Internal error: unexpected panic in compressGoroutine" )
defer func ( ) { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily.
dest . CloseWithError ( err ) // CloseWithError(nil) is equivalent to Close()
} ( )
zipper := gzip . NewWriter ( dest )
defer zipper . Close ( )
_ , err = io . Copy ( zipper , src ) // Sets err to nil, i.e. causes dest.Close()
}