Merge pull request #330 from stevvooe/parallelize-signature-fetch
registry/storage: parallelize signature fetch in signature store
This commit is contained in:
		
						commit
						4e2d176d27
					
				
					 1 changed files with 34 additions and 8 deletions
				
			
		|  | @ -2,8 +2,10 @@ package storage | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"path" | 	"path" | ||||||
|  | 	"sync" | ||||||
| 
 | 
 | ||||||
| 	"github.com/docker/distribution" | 	"github.com/docker/distribution" | ||||||
|  | 	"github.com/docker/distribution/context" | ||||||
| 	"github.com/docker/distribution/digest" | 	"github.com/docker/distribution/digest" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | @ -33,18 +35,42 @@ func (s *signatureStore) Get(dgst digest.Digest) ([][]byte, error) { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	var signatures [][]byte | 	var wg sync.WaitGroup | ||||||
| 	for _, sigPath := range signaturePaths { | 	signatures := make([][]byte, len(signaturePaths)) // make space for everything | ||||||
|  | 	errCh := make(chan error, 1)                      // buffered chan so one proceeds | ||||||
|  | 	for i, sigPath := range signaturePaths { | ||||||
| 		// Append the link portion | 		// Append the link portion | ||||||
| 		sigPath = path.Join(sigPath, "link") | 		sigPath = path.Join(sigPath, "link") | ||||||
| 
 | 
 | ||||||
| 		// TODO(stevvooe): These fetches should be parallelized for performance. | 		wg.Add(1) | ||||||
| 		p, err := s.blobStore.linked(sigPath) | 		go func(idx int, sigPath string) { | ||||||
| 		if err != nil { | 			defer wg.Done() | ||||||
| 			return nil, err | 			context.GetLogger(s.ctx). | ||||||
| 		} | 				Debugf("fetching signature from %q", sigPath) | ||||||
|  | 			p, err := s.blobStore.linked(sigPath) | ||||||
|  | 			if err != nil { | ||||||
|  | 				context.GetLogger(s.ctx). | ||||||
|  | 					Errorf("error fetching signature from %q: %v", sigPath, err) | ||||||
| 
 | 
 | ||||||
| 		signatures = append(signatures, p) | 				// try to send an error, if it hasn't already been sent. | ||||||
|  | 				select { | ||||||
|  | 				case errCh <- err: | ||||||
|  | 				default: | ||||||
|  | 				} | ||||||
|  | 
 | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			signatures[idx] = p | ||||||
|  | 		}(i, sigPath) | ||||||
|  | 	} | ||||||
|  | 	wg.Wait() | ||||||
|  | 
 | ||||||
|  | 	select { | ||||||
|  | 	case err := <-errCh: | ||||||
|  | 		// just return the first error, similar to single threaded code. | ||||||
|  | 		return nil, err | ||||||
|  | 	default: | ||||||
|  | 		// pass | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return signatures, nil | 	return signatures, nil | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue