Merge pull request #22 from BrianBland/upload-tokens
Serializes upload state to an HMAC token for subsequent requests
This commit is contained in:
commit
396e81000e
11 changed files with 225 additions and 50 deletions
|
@ -459,6 +459,8 @@ func pushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Digest,
|
||||||
}
|
}
|
||||||
|
|
||||||
u.RawQuery = url.Values{
|
u.RawQuery = url.Values{
|
||||||
|
"_state": u.Query()["_state"],
|
||||||
|
|
||||||
"digest": []string{dgst.String()},
|
"digest": []string{dgst.String()},
|
||||||
|
|
||||||
// TODO(stevvooe): Layer upload can be completed with and without size
|
// TODO(stevvooe): Layer upload can be completed with and without size
|
||||||
|
|
3
app.go
3
app.go
|
@ -29,6 +29,8 @@ type App struct {
|
||||||
// services contains the main services instance for the application.
|
// services contains the main services instance for the application.
|
||||||
services *storage.Services
|
services *storage.Services
|
||||||
|
|
||||||
|
tokenProvider tokenProvider
|
||||||
|
|
||||||
accessController auth.AccessController
|
accessController auth.AccessController
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,6 +64,7 @@ func NewApp(configuration configuration.Configuration) *App {
|
||||||
|
|
||||||
app.driver = driver
|
app.driver = driver
|
||||||
app.services = storage.NewServices(app.driver)
|
app.services = storage.NewServices(app.driver)
|
||||||
|
app.tokenProvider = newHMACTokenProvider(configuration.HTTP.Secret)
|
||||||
|
|
||||||
authType := configuration.Auth.Type()
|
authType := configuration.Auth.Type()
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,9 @@ type Configuration struct {
|
||||||
HTTP struct {
|
HTTP struct {
|
||||||
// Addr specifies the bind address for the registry instance.
|
// Addr specifies the bind address for the registry instance.
|
||||||
Addr string `yaml:"addr"`
|
Addr string `yaml:"addr"`
|
||||||
|
|
||||||
|
// Secret specifies the secret key which HMAC tokens are created with.
|
||||||
|
Secret string `yaml:"secret"`
|
||||||
} `yaml:"http"`
|
} `yaml:"http"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
|
@ -32,9 +33,17 @@ func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler {
|
||||||
if luh.UUID != "" {
|
if luh.UUID != "" {
|
||||||
luh.log = luh.log.WithField("uuid", luh.UUID)
|
luh.log = luh.log.WithField("uuid", luh.UUID)
|
||||||
|
|
||||||
layers := ctx.services.Layers()
|
state, err := ctx.tokenProvider.layerUploadStateFromToken(r.FormValue("_state"))
|
||||||
upload, err := layers.Resume(luh.UUID)
|
if err != nil {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
logrus.Infof("error resolving upload: %v", err)
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
luh.Errors.Push(v2.ErrorCodeUnknown, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
layers := ctx.services.Layers()
|
||||||
|
upload, err := layers.Resume(state)
|
||||||
if err != nil && err != storage.ErrLayerUploadUnknown {
|
if err != nil && err != storage.ErrLayerUploadUnknown {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
logrus.Infof("error resolving upload: %v", err)
|
logrus.Infof("error resolving upload: %v", err)
|
||||||
|
@ -162,7 +171,14 @@ func (luh *layerUploadHandler) CancelLayerUpload(w http.ResponseWriter, r *http.
|
||||||
// chunk responses. This sets the correct headers but the response status is
|
// chunk responses. This sets the correct headers but the response status is
|
||||||
// left to the caller.
|
// left to the caller.
|
||||||
func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error {
|
func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *http.Request) error {
|
||||||
uploadURL, err := luh.urlBuilder.BuildBlobUploadChunkURL(luh.Upload.Name(), luh.Upload.UUID())
|
values := make(url.Values)
|
||||||
|
stateToken, err := luh.Context.tokenProvider.layerUploadStateToToken(storage.LayerUploadState{Name: luh.Upload.Name(), UUID: luh.Upload.UUID(), Offset: luh.Upload.Offset()})
|
||||||
|
if err != nil {
|
||||||
|
logrus.Infof("error building upload state token: %s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
values.Set("_state", stateToken)
|
||||||
|
uploadURL, err := luh.urlBuilder.BuildBlobUploadChunkURL(luh.Upload.Name(), luh.Upload.UUID(), values)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Infof("error building upload url: %s", err)
|
logrus.Infof("error building upload url: %s", err)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -58,7 +58,7 @@ func TestSimpleLayerUpload(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do a resume, get unknown upload
|
// Do a resume, get unknown upload
|
||||||
layerUpload, err = ls.Resume(layerUpload.UUID())
|
layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()})
|
||||||
if err != ErrLayerUploadUnknown {
|
if err != ErrLayerUploadUnknown {
|
||||||
t.Fatalf("unexpected error resuming upload, should be unkown: %v", err)
|
t.Fatalf("unexpected error resuming upload, should be unkown: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -90,7 +90,7 @@ func TestSimpleLayerUpload(t *testing.T) {
|
||||||
layerUpload.Close()
|
layerUpload.Close()
|
||||||
|
|
||||||
// Do a resume, for good fun
|
// Do a resume, for good fun
|
||||||
layerUpload, err = ls.Resume(layerUpload.UUID())
|
layerUpload, err = ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error resuming upload: %v", err)
|
t.Fatalf("unexpected error resuming upload: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -103,7 +103,7 @@ func TestSimpleLayerUpload(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// After finishing an upload, it should no longer exist.
|
// After finishing an upload, it should no longer exist.
|
||||||
if _, err := ls.Resume(layerUpload.UUID()); err != ErrLayerUploadUnknown {
|
if _, err := ls.Resume(LayerUploadState{Name: layerUpload.Name(), UUID: layerUpload.UUID(), Offset: layerUpload.Offset()}); err != ErrLayerUploadUnknown {
|
||||||
t.Fatalf("expected layer upload to be unknown, got %v", err)
|
t.Fatalf("expected layer upload to be unknown, got %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,8 +76,8 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) {
|
||||||
|
|
||||||
// Resume continues an in progress layer upload, returning the current
|
// Resume continues an in progress layer upload, returning the current
|
||||||
// state of the upload.
|
// state of the upload.
|
||||||
func (ls *layerStore) Resume(uuid string) (LayerUpload, error) {
|
func (ls *layerStore) Resume(lus LayerUploadState) (LayerUpload, error) {
|
||||||
lus, err := ls.uploadStore.GetState(uuid)
|
_, err := ls.uploadStore.GetState(lus.UUID)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -13,8 +13,6 @@ import (
|
||||||
"github.com/docker/distribution/manifest"
|
"github.com/docker/distribution/manifest"
|
||||||
"github.com/docker/distribution/storagedriver"
|
"github.com/docker/distribution/storagedriver"
|
||||||
"github.com/docker/docker/pkg/tarsum"
|
"github.com/docker/docker/pkg/tarsum"
|
||||||
|
|
||||||
"io"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// LayerUploadState captures the state serializable state of the layer upload.
|
// LayerUploadState captures the state serializable state of the layer upload.
|
||||||
|
@ -61,7 +59,8 @@ type layerUploadStore interface {
|
||||||
New(name string) (LayerUploadState, error)
|
New(name string) (LayerUploadState, error)
|
||||||
Open(uuid string) (layerFile, error)
|
Open(uuid string) (layerFile, error)
|
||||||
GetState(uuid string) (LayerUploadState, error)
|
GetState(uuid string) (LayerUploadState, error)
|
||||||
SaveState(lus LayerUploadState) error
|
// TODO: factor this method back in
|
||||||
|
// SaveState(lus LayerUploadState) error
|
||||||
DeleteState(uuid string) error
|
DeleteState(uuid string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,11 +170,6 @@ func (luc *layerUploadController) Write(p []byte) (int, error) {
|
||||||
|
|
||||||
luc.LayerUploadState.Offset += int64(n)
|
luc.LayerUploadState.Offset += int64(n)
|
||||||
|
|
||||||
if err := luc.uploadStore.SaveState(luc.LayerUploadState); err != nil {
|
|
||||||
// TODO(stevvooe): This failure case may require more thought.
|
|
||||||
return n, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -384,10 +378,6 @@ func (llufs *localFSLayerUploadStore) New(name string) (LayerUploadState, error)
|
||||||
return lus, err
|
return lus, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := llufs.SaveState(lus); err != nil {
|
|
||||||
return lus, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return lus, nil
|
return lus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -402,43 +392,18 @@ func (llufs *localFSLayerUploadStore) Open(uuid string) (layerFile, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
|
func (llufs *localFSLayerUploadStore) GetState(uuid string) (LayerUploadState, error) {
|
||||||
// TODO(stevvoe): Storing this state on the local file system is an
|
|
||||||
// intermediate stop gap. This technique is unlikely to handle any kind of
|
|
||||||
// concurrency very well.
|
|
||||||
|
|
||||||
var lus LayerUploadState
|
var lus LayerUploadState
|
||||||
fp, err := os.Open(llufs.path(uuid, "state.json"))
|
|
||||||
if err != nil {
|
if _, err := os.Stat(llufs.path(uuid, "")); err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return lus, ErrLayerUploadUnknown
|
return lus, ErrLayerUploadUnknown
|
||||||
}
|
}
|
||||||
|
|
||||||
return lus, err
|
return lus, err
|
||||||
}
|
}
|
||||||
defer fp.Close()
|
|
||||||
|
|
||||||
dec := json.NewDecoder(fp)
|
|
||||||
if err := dec.Decode(&lus); err != nil {
|
|
||||||
return lus, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return lus, nil
|
return lus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (llufs *localFSLayerUploadStore) SaveState(lus LayerUploadState) error {
|
|
||||||
p, err := json.Marshal(lus)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ioutil.WriteFile(llufs.path(lus.UUID, "state.json"), p, 0644)
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return ErrLayerUploadUnknown
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
|
func (llufs *localFSLayerUploadStore) DeleteState(uuid string) error {
|
||||||
if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
|
if err := os.RemoveAll(llufs.path(uuid, "")); err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
|
|
|
@ -153,6 +153,6 @@ func (mockedExistenceLayerService) Upload(name string) (LayerUpload, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mockedExistenceLayerService) Resume(uuid string) (LayerUpload, error) {
|
func (mockedExistenceLayerService) Resume(lus LayerUploadState) (LayerUpload, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,5 +83,5 @@ type LayerService interface {
|
||||||
|
|
||||||
// Resume continues an in progress layer upload, returning the current
|
// Resume continues an in progress layer upload, returning the current
|
||||||
// state of the upload.
|
// state of the upload.
|
||||||
Resume(uuid string) (LayerUpload, error)
|
Resume(layerUploadState LayerUploadState) (LayerUpload, error)
|
||||||
}
|
}
|
||||||
|
|
65
tokens.go
Normal file
65
tokens.go
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
package registry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/hmac"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/docker/distribution/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
// tokenProvider contains methods for serializing and deserializing state from token strings.
|
||||||
|
type tokenProvider interface {
|
||||||
|
// layerUploadStateFromToken retrieves the LayerUploadState for a given state token.
|
||||||
|
layerUploadStateFromToken(stateToken string) (storage.LayerUploadState, error)
|
||||||
|
|
||||||
|
// layerUploadStateToToken returns a token string representing the given LayerUploadState.
|
||||||
|
layerUploadStateToToken(layerUploadState storage.LayerUploadState) (string, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type hmacTokenProvider struct {
|
||||||
|
secret string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHMACTokenProvider(secret string) tokenProvider {
|
||||||
|
return &hmacTokenProvider{secret: secret}
|
||||||
|
}
|
||||||
|
|
||||||
|
// layerUploadStateFromToken deserializes the given HMAC stateToken and validates the prefix HMAC
|
||||||
|
func (ts *hmacTokenProvider) layerUploadStateFromToken(stateToken string) (storage.LayerUploadState, error) {
|
||||||
|
var lus storage.LayerUploadState
|
||||||
|
|
||||||
|
tokenBytes, err := base64.URLEncoding.DecodeString(stateToken)
|
||||||
|
if err != nil {
|
||||||
|
return lus, err
|
||||||
|
}
|
||||||
|
mac := hmac.New(sha256.New, []byte(ts.secret))
|
||||||
|
|
||||||
|
if len(tokenBytes) < mac.Size() {
|
||||||
|
return lus, fmt.Errorf("Invalid token")
|
||||||
|
}
|
||||||
|
|
||||||
|
macBytes := tokenBytes[:mac.Size()]
|
||||||
|
messageBytes := tokenBytes[mac.Size():]
|
||||||
|
|
||||||
|
mac.Write(messageBytes)
|
||||||
|
if !hmac.Equal(mac.Sum(nil), macBytes) {
|
||||||
|
return lus, fmt.Errorf("Invalid token")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(messageBytes, &lus); err != nil {
|
||||||
|
return lus, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return lus, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// layerUploadStateToToken serializes the given LayerUploadState to JSON with an HMAC prepended
|
||||||
|
func (ts *hmacTokenProvider) layerUploadStateToToken(lus storage.LayerUploadState) (string, error) {
|
||||||
|
mac := hmac.New(sha256.New, []byte(ts.secret))
|
||||||
|
stateJSON := fmt.Sprintf("{\"Name\": \"%s\", \"UUID\": \"%s\", \"Offset\": %d}", lus.Name, lus.UUID, lus.Offset)
|
||||||
|
mac.Write([]byte(stateJSON))
|
||||||
|
return base64.URLEncoding.EncodeToString(append(mac.Sum(nil), stateJSON...)), nil
|
||||||
|
}
|
121
tokens_test.go
Normal file
121
tokens_test.go
Normal file
|
@ -0,0 +1,121 @@
|
||||||
|
package registry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/docker/distribution/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
var layerUploadStates = []storage.LayerUploadState{
|
||||||
|
{
|
||||||
|
Name: "hello",
|
||||||
|
UUID: "abcd-1234-qwer-0987",
|
||||||
|
Offset: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "hello-world",
|
||||||
|
UUID: "abcd-1234-qwer-0987",
|
||||||
|
Offset: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "h3ll0_w0rld",
|
||||||
|
UUID: "abcd-1234-qwer-0987",
|
||||||
|
Offset: 1337,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "ABCDEFG",
|
||||||
|
UUID: "ABCD-1234-QWER-0987",
|
||||||
|
Offset: 1234567890,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "this-is-A-sort-of-Long-name-for-Testing",
|
||||||
|
UUID: "dead-1234-beef-0987",
|
||||||
|
Offset: 8675309,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var secrets = []string{
|
||||||
|
"supersecret",
|
||||||
|
"12345",
|
||||||
|
"a",
|
||||||
|
"SuperSecret",
|
||||||
|
"Sup3r... S3cr3t!",
|
||||||
|
"This is a reasonably long secret key that is used for the purpose of testing.",
|
||||||
|
"\u2603+\u2744", // snowman+snowflake
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLayerUploadTokens constructs stateTokens from LayerUploadStates and
|
||||||
|
// validates that the tokens can be used to reconstruct the proper upload state.
|
||||||
|
func TestLayerUploadTokens(t *testing.T) {
|
||||||
|
tokenProvider := newHMACTokenProvider("supersecret")
|
||||||
|
|
||||||
|
for _, testcase := range layerUploadStates {
|
||||||
|
token, err := tokenProvider.layerUploadStateToToken(testcase)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lus, err := tokenProvider.layerUploadStateFromToken(token)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
assertLayerUploadStateEquals(t, testcase, lus)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHMACValidate ensures that any HMAC token providers are compatible if and
|
||||||
|
// only if they share the same secret.
|
||||||
|
func TestHMACValidation(t *testing.T) {
|
||||||
|
for _, secret := range secrets {
|
||||||
|
tokenProvider1 := newHMACTokenProvider(secret)
|
||||||
|
tokenProvider2 := newHMACTokenProvider(secret)
|
||||||
|
badTokenProvider := newHMACTokenProvider("DifferentSecret")
|
||||||
|
|
||||||
|
for _, testcase := range layerUploadStates {
|
||||||
|
token, err := tokenProvider1.layerUploadStateToToken(testcase)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lus, err := tokenProvider2.layerUploadStateFromToken(token)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
assertLayerUploadStateEquals(t, testcase, lus)
|
||||||
|
|
||||||
|
_, err = badTokenProvider.layerUploadStateFromToken(token)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Expected token provider to fail at retrieving state from token: %s", token)
|
||||||
|
}
|
||||||
|
|
||||||
|
badToken, err := badTokenProvider.layerUploadStateToToken(testcase)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tokenProvider1.layerUploadStateFromToken(badToken)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Expected token provider to fail at retrieving state from token: %s", badToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tokenProvider2.layerUploadStateFromToken(badToken)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Expected token provider to fail at retrieving state from token: %s", badToken)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertLayerUploadStateEquals(t *testing.T, expected storage.LayerUploadState, received storage.LayerUploadState) {
|
||||||
|
if expected.Name != received.Name {
|
||||||
|
t.Fatalf("Expected Name=%q, Received Name=%q", expected.Name, received.Name)
|
||||||
|
}
|
||||||
|
if expected.UUID != received.UUID {
|
||||||
|
t.Fatalf("Expected UUID=%q, Received UUID=%q", expected.UUID, received.UUID)
|
||||||
|
}
|
||||||
|
if expected.Offset != received.Offset {
|
||||||
|
t.Fatalf("Expected Offset=%d, Received Offset=%d", expected.Offset, received.Offset)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue