Implement regulator in filesystem driver

This commit refactors base.regulator into the 2.4 interfaces and adds a
filesystem configuration option `maxthreads` to configure the regulator.

By default `maxthreads` is set to 100. This means the FS driver is
limited to 100 concurrent blocking file operations. Any subsequent
operations will block in Go until previous filesystem operations
complete.

This ensures that the registry can never open thousands of simultaneous
threads from os filesystem operations.

Note that `maxthreads` can never be less than 25.

Add test case covering parsable string maxthreads

Signed-off-by: Tony Holdstock-Brown <tony@docker.com>
This commit is contained in:
Tony Holdstock-Brown 2016-04-26 14:36:38 -07:00
parent a88088a59d
commit cbae4dd7bf
4 changed files with 193 additions and 38 deletions

View file

@ -8,6 +8,8 @@ import (
"io/ioutil"
"os"
"path"
"reflect"
"strconv"
"time"
"github.com/docker/distribution/context"
@ -16,8 +18,23 @@ import (
"github.com/docker/distribution/registry/storage/driver/factory"
)
const driverName = "filesystem"
const defaultRootDirectory = "/var/lib/registry"
const (
driverName = "filesystem"
defaultRootDirectory = "/var/lib/registry"
defaultMaxThreads = uint64(100)
// minThreads is the minimum value for the maxthreads configuration
// parameter. If the driver's parameters are less than this we set
// the parameters to minThreads
minThreads = uint64(25)
)
// DriverParameters represents all configuration options available for the
// filesystem driver
type DriverParameters struct {
RootDirectory string
MaxThreads uint64
}
func init() {
factory.Register(driverName, &filesystemDriverFactory{})
@ -27,7 +44,7 @@ func init() {
type filesystemDriverFactory struct{}
func (factory *filesystemDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters), nil
return FromParameters(parameters)
}
type driver struct {
@ -47,25 +64,67 @@ type Driver struct {
// FromParameters constructs a new Driver with a given parameters map
// Optional Parameters:
// - rootdirectory
func FromParameters(parameters map[string]interface{}) *Driver {
var rootDirectory = defaultRootDirectory
// - maxthreads
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
params, err := fromParametersImpl(parameters)
if err != nil || params == nil {
return nil, err
}
return New(*params), nil
}
func fromParametersImpl(parameters map[string]interface{}) (*DriverParameters, error) {
var (
err error
maxThreads = defaultMaxThreads
rootDirectory = defaultRootDirectory
)
if parameters != nil {
rootDir, ok := parameters["rootdirectory"]
if ok {
if rootDir, ok := parameters["rootdirectory"]; ok {
rootDirectory = fmt.Sprint(rootDir)
}
// Get maximum number of threads for blocking filesystem operations,
// if specified
threads := parameters["maxthreads"]
switch v := threads.(type) {
case string:
if maxThreads, err = strconv.ParseUint(v, 0, 64); err != nil {
return nil, fmt.Errorf("maxthreads parameter must be an integer, %v invalid", threads)
}
case uint64:
maxThreads = v
case int, int32, int64:
maxThreads = uint64(reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Int())
case uint, uint32:
maxThreads = reflect.ValueOf(v).Convert(reflect.TypeOf(threads)).Uint()
case nil:
// do nothing
default:
return nil, fmt.Errorf("invalid value for maxthreads: %#v", threads)
}
if maxThreads < minThreads {
maxThreads = minThreads
}
}
return New(rootDirectory)
params := &DriverParameters{
RootDirectory: rootDirectory,
MaxThreads: maxThreads,
}
return params, nil
}
// New constructs a new Driver with a given rootDirectory
func New(rootDirectory string) *Driver {
fsDriver := &driver{rootDirectory: rootDirectory}
func New(params DriverParameters) *Driver {
fsDriver := &driver{rootDirectory: params.RootDirectory}
return &Driver{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: base.NewRegulator(fsDriver, 100),
StorageDriver: base.NewRegulator(fsDriver, params.MaxThreads),
},
},
}