492 lines
13 KiB
Go
492 lines
13 KiB
Go
|
/*
|
||
|
Copyright 2012 Google Inc.
|
||
|
|
||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
you may not use this file except in compliance with the License.
|
||
|
You may obtain a copy of the License at
|
||
|
|
||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||
|
|
||
|
Unless required by applicable law or agreed to in writing, software
|
||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
See the License for the specific language governing permissions and
|
||
|
limitations under the License.
|
||
|
*/
|
||
|
|
||
|
// Package groupcache provides a data loading mechanism with caching
|
||
|
// and de-duplication that works across a set of peer processes.
|
||
|
//
|
||
|
// Each data Get first consults its local cache, otherwise delegates
|
||
|
// to the requested key's canonical owner, which then checks its cache
|
||
|
// or finally gets the data. In the common case, many concurrent
|
||
|
// cache misses across a set of peers for the same key result in just
|
||
|
// one cache fill.
|
||
|
package groupcache
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"math/rand"
|
||
|
"strconv"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
|
||
|
pb "github.com/golang/groupcache/groupcachepb"
|
||
|
"github.com/golang/groupcache/lru"
|
||
|
"github.com/golang/groupcache/singleflight"
|
||
|
)
|
||
|
|
||
|
// A Getter loads data for a key.
|
||
|
type Getter interface {
|
||
|
// Get returns the value identified by key, populating dest.
|
||
|
//
|
||
|
// The returned data must be unversioned. That is, key must
|
||
|
// uniquely describe the loaded data, without an implicit
|
||
|
// current time, and without relying on cache expiration
|
||
|
// mechanisms.
|
||
|
Get(ctx Context, key string, dest Sink) error
|
||
|
}
|
||
|
|
||
|
// A GetterFunc implements Getter with a function.
|
||
|
type GetterFunc func(ctx Context, key string, dest Sink) error
|
||
|
|
||
|
func (f GetterFunc) Get(ctx Context, key string, dest Sink) error {
|
||
|
return f(ctx, key, dest)
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
mu sync.RWMutex
|
||
|
groups = make(map[string]*Group)
|
||
|
|
||
|
initPeerServerOnce sync.Once
|
||
|
initPeerServer func()
|
||
|
)
|
||
|
|
||
|
// GetGroup returns the named group previously created with NewGroup, or
|
||
|
// nil if there's no such group.
|
||
|
func GetGroup(name string) *Group {
|
||
|
mu.RLock()
|
||
|
g := groups[name]
|
||
|
mu.RUnlock()
|
||
|
return g
|
||
|
}
|
||
|
|
||
|
// NewGroup creates a coordinated group-aware Getter from a Getter.
|
||
|
//
|
||
|
// The returned Getter tries (but does not guarantee) to run only one
|
||
|
// Get call at once for a given key across an entire set of peer
|
||
|
// processes. Concurrent callers both in the local process and in
|
||
|
// other processes receive copies of the answer once the original Get
|
||
|
// completes.
|
||
|
//
|
||
|
// The group name must be unique for each getter.
|
||
|
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
|
||
|
return newGroup(name, cacheBytes, getter, nil)
|
||
|
}
|
||
|
|
||
|
// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
|
||
|
func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
|
||
|
if getter == nil {
|
||
|
panic("nil Getter")
|
||
|
}
|
||
|
mu.Lock()
|
||
|
defer mu.Unlock()
|
||
|
initPeerServerOnce.Do(callInitPeerServer)
|
||
|
if _, dup := groups[name]; dup {
|
||
|
panic("duplicate registration of group " + name)
|
||
|
}
|
||
|
g := &Group{
|
||
|
name: name,
|
||
|
getter: getter,
|
||
|
peers: peers,
|
||
|
cacheBytes: cacheBytes,
|
||
|
loadGroup: &singleflight.Group{},
|
||
|
}
|
||
|
if fn := newGroupHook; fn != nil {
|
||
|
fn(g)
|
||
|
}
|
||
|
groups[name] = g
|
||
|
return g
|
||
|
}
|
||
|
|
||
|
// newGroupHook, if non-nil, is called right after a new group is created.
|
||
|
var newGroupHook func(*Group)
|
||
|
|
||
|
// RegisterNewGroupHook registers a hook that is run each time
|
||
|
// a group is created.
|
||
|
func RegisterNewGroupHook(fn func(*Group)) {
|
||
|
if newGroupHook != nil {
|
||
|
panic("RegisterNewGroupHook called more than once")
|
||
|
}
|
||
|
newGroupHook = fn
|
||
|
}
|
||
|
|
||
|
// RegisterServerStart registers a hook that is run when the first
|
||
|
// group is created.
|
||
|
func RegisterServerStart(fn func()) {
|
||
|
if initPeerServer != nil {
|
||
|
panic("RegisterServerStart called more than once")
|
||
|
}
|
||
|
initPeerServer = fn
|
||
|
}
|
||
|
|
||
|
func callInitPeerServer() {
|
||
|
if initPeerServer != nil {
|
||
|
initPeerServer()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// A Group is a cache namespace and associated data loaded spread over
|
||
|
// a group of 1 or more machines.
|
||
|
type Group struct {
|
||
|
name string
|
||
|
getter Getter
|
||
|
peersOnce sync.Once
|
||
|
peers PeerPicker
|
||
|
cacheBytes int64 // limit for sum of mainCache and hotCache size
|
||
|
|
||
|
// mainCache is a cache of the keys for which this process
|
||
|
// (amongst its peers) is authoritative. That is, this cache
|
||
|
// contains keys which consistent hash on to this process's
|
||
|
// peer number.
|
||
|
mainCache cache
|
||
|
|
||
|
// hotCache contains keys/values for which this peer is not
|
||
|
// authoritative (otherwise they would be in mainCache), but
|
||
|
// are popular enough to warrant mirroring in this process to
|
||
|
// avoid going over the network to fetch from a peer. Having
|
||
|
// a hotCache avoids network hotspotting, where a peer's
|
||
|
// network card could become the bottleneck on a popular key.
|
||
|
// This cache is used sparingly to maximize the total number
|
||
|
// of key/value pairs that can be stored globally.
|
||
|
hotCache cache
|
||
|
|
||
|
// loadGroup ensures that each key is only fetched once
|
||
|
// (either locally or remotely), regardless of the number of
|
||
|
// concurrent callers.
|
||
|
loadGroup flightGroup
|
||
|
|
||
|
_ int32 // force Stats to be 8-byte aligned on 32-bit platforms
|
||
|
|
||
|
// Stats are statistics on the group.
|
||
|
Stats Stats
|
||
|
}
|
||
|
|
||
|
// flightGroup is defined as an interface which flightgroup.Group
|
||
|
// satisfies. We define this so that we may test with an alternate
|
||
|
// implementation.
|
||
|
type flightGroup interface {
|
||
|
// Done is called when Do is done.
|
||
|
Do(key string, fn func() (interface{}, error)) (interface{}, error)
|
||
|
}
|
||
|
|
||
|
// Stats are per-group statistics.
|
||
|
type Stats struct {
|
||
|
Gets AtomicInt // any Get request, including from peers
|
||
|
CacheHits AtomicInt // either cache was good
|
||
|
PeerLoads AtomicInt // either remote load or remote cache hit (not an error)
|
||
|
PeerErrors AtomicInt
|
||
|
Loads AtomicInt // (gets - cacheHits)
|
||
|
LoadsDeduped AtomicInt // after singleflight
|
||
|
LocalLoads AtomicInt // total good local loads
|
||
|
LocalLoadErrs AtomicInt // total bad local loads
|
||
|
ServerRequests AtomicInt // gets that came over the network from peers
|
||
|
}
|
||
|
|
||
|
// Name returns the name of the group.
|
||
|
func (g *Group) Name() string {
|
||
|
return g.name
|
||
|
}
|
||
|
|
||
|
func (g *Group) initPeers() {
|
||
|
if g.peers == nil {
|
||
|
g.peers = getPeers(g.name)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (g *Group) Get(ctx Context, key string, dest Sink) error {
|
||
|
g.peersOnce.Do(g.initPeers)
|
||
|
g.Stats.Gets.Add(1)
|
||
|
if dest == nil {
|
||
|
return errors.New("groupcache: nil dest Sink")
|
||
|
}
|
||
|
value, cacheHit := g.lookupCache(key)
|
||
|
|
||
|
if cacheHit {
|
||
|
g.Stats.CacheHits.Add(1)
|
||
|
return setSinkView(dest, value)
|
||
|
}
|
||
|
|
||
|
// Optimization to avoid double unmarshalling or copying: keep
|
||
|
// track of whether the dest was already populated. One caller
|
||
|
// (if local) will set this; the losers will not. The common
|
||
|
// case will likely be one caller.
|
||
|
destPopulated := false
|
||
|
value, destPopulated, err := g.load(ctx, key, dest)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if destPopulated {
|
||
|
return nil
|
||
|
}
|
||
|
return setSinkView(dest, value)
|
||
|
}
|
||
|
|
||
|
// load loads key either by invoking the getter locally or by sending it to another machine.
|
||
|
func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
|
||
|
g.Stats.Loads.Add(1)
|
||
|
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
|
||
|
// Check the cache again because singleflight can only dedup calls
|
||
|
// that overlap concurrently. It's possible for 2 concurrent
|
||
|
// requests to miss the cache, resulting in 2 load() calls. An
|
||
|
// unfortunate goroutine scheduling would result in this callback
|
||
|
// being run twice, serially. If we don't check the cache again,
|
||
|
// cache.nbytes would be incremented below even though there will
|
||
|
// be only one entry for this key.
|
||
|
//
|
||
|
// Consider the following serialized event ordering for two
|
||
|
// goroutines in which this callback gets called twice for hte
|
||
|
// same key:
|
||
|
// 1: Get("key")
|
||
|
// 2: Get("key")
|
||
|
// 1: lookupCache("key")
|
||
|
// 2: lookupCache("key")
|
||
|
// 1: load("key")
|
||
|
// 2: load("key")
|
||
|
// 1: loadGroup.Do("key", fn)
|
||
|
// 1: fn()
|
||
|
// 2: loadGroup.Do("key", fn)
|
||
|
// 2: fn()
|
||
|
if value, cacheHit := g.lookupCache(key); cacheHit {
|
||
|
g.Stats.CacheHits.Add(1)
|
||
|
return value, nil
|
||
|
}
|
||
|
g.Stats.LoadsDeduped.Add(1)
|
||
|
var value ByteView
|
||
|
var err error
|
||
|
if peer, ok := g.peers.PickPeer(key); ok {
|
||
|
value, err = g.getFromPeer(ctx, peer, key)
|
||
|
if err == nil {
|
||
|
g.Stats.PeerLoads.Add(1)
|
||
|
return value, nil
|
||
|
}
|
||
|
g.Stats.PeerErrors.Add(1)
|
||
|
// TODO(bradfitz): log the peer's error? keep
|
||
|
// log of the past few for /groupcachez? It's
|
||
|
// probably boring (normal task movement), so not
|
||
|
// worth logging I imagine.
|
||
|
}
|
||
|
value, err = g.getLocally(ctx, key, dest)
|
||
|
if err != nil {
|
||
|
g.Stats.LocalLoadErrs.Add(1)
|
||
|
return nil, err
|
||
|
}
|
||
|
g.Stats.LocalLoads.Add(1)
|
||
|
destPopulated = true // only one caller of load gets this return value
|
||
|
g.populateCache(key, value, &g.mainCache)
|
||
|
return value, nil
|
||
|
})
|
||
|
if err == nil {
|
||
|
value = viewi.(ByteView)
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
|
||
|
err := g.getter.Get(ctx, key, dest)
|
||
|
if err != nil {
|
||
|
return ByteView{}, err
|
||
|
}
|
||
|
return dest.view()
|
||
|
}
|
||
|
|
||
|
func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
|
||
|
req := &pb.GetRequest{
|
||
|
Group: &g.name,
|
||
|
Key: &key,
|
||
|
}
|
||
|
res := &pb.GetResponse{}
|
||
|
err := peer.Get(ctx, req, res)
|
||
|
if err != nil {
|
||
|
return ByteView{}, err
|
||
|
}
|
||
|
value := ByteView{b: res.Value}
|
||
|
// TODO(bradfitz): use res.MinuteQps or something smart to
|
||
|
// conditionally populate hotCache. For now just do it some
|
||
|
// percentage of the time.
|
||
|
if rand.Intn(10) == 0 {
|
||
|
g.populateCache(key, value, &g.hotCache)
|
||
|
}
|
||
|
return value, nil
|
||
|
}
|
||
|
|
||
|
func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
|
||
|
if g.cacheBytes <= 0 {
|
||
|
return
|
||
|
}
|
||
|
value, ok = g.mainCache.get(key)
|
||
|
if ok {
|
||
|
return
|
||
|
}
|
||
|
value, ok = g.hotCache.get(key)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (g *Group) populateCache(key string, value ByteView, cache *cache) {
|
||
|
if g.cacheBytes <= 0 {
|
||
|
return
|
||
|
}
|
||
|
cache.add(key, value)
|
||
|
|
||
|
// Evict items from cache(s) if necessary.
|
||
|
for {
|
||
|
mainBytes := g.mainCache.bytes()
|
||
|
hotBytes := g.hotCache.bytes()
|
||
|
if mainBytes+hotBytes <= g.cacheBytes {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// TODO(bradfitz): this is good-enough-for-now logic.
|
||
|
// It should be something based on measurements and/or
|
||
|
// respecting the costs of different resources.
|
||
|
victim := &g.mainCache
|
||
|
if hotBytes > mainBytes/8 {
|
||
|
victim = &g.hotCache
|
||
|
}
|
||
|
victim.removeOldest()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// CacheType represents a type of cache.
|
||
|
type CacheType int
|
||
|
|
||
|
const (
|
||
|
// The MainCache is the cache for items that this peer is the
|
||
|
// owner for.
|
||
|
MainCache CacheType = iota + 1
|
||
|
|
||
|
// The HotCache is the cache for items that seem popular
|
||
|
// enough to replicate to this node, even though it's not the
|
||
|
// owner.
|
||
|
HotCache
|
||
|
)
|
||
|
|
||
|
// CacheStats returns stats about the provided cache within the group.
|
||
|
func (g *Group) CacheStats(which CacheType) CacheStats {
|
||
|
switch which {
|
||
|
case MainCache:
|
||
|
return g.mainCache.stats()
|
||
|
case HotCache:
|
||
|
return g.hotCache.stats()
|
||
|
default:
|
||
|
return CacheStats{}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// cache is a wrapper around an *lru.Cache that adds synchronization,
|
||
|
// makes values always be ByteView, and counts the size of all keys and
|
||
|
// values.
|
||
|
type cache struct {
|
||
|
mu sync.RWMutex
|
||
|
nbytes int64 // of all keys and values
|
||
|
lru *lru.Cache
|
||
|
nhit, nget int64
|
||
|
nevict int64 // number of evictions
|
||
|
}
|
||
|
|
||
|
func (c *cache) stats() CacheStats {
|
||
|
c.mu.RLock()
|
||
|
defer c.mu.RUnlock()
|
||
|
return CacheStats{
|
||
|
Bytes: c.nbytes,
|
||
|
Items: c.itemsLocked(),
|
||
|
Gets: c.nget,
|
||
|
Hits: c.nhit,
|
||
|
Evictions: c.nevict,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *cache) add(key string, value ByteView) {
|
||
|
c.mu.Lock()
|
||
|
defer c.mu.Unlock()
|
||
|
if c.lru == nil {
|
||
|
c.lru = &lru.Cache{
|
||
|
OnEvicted: func(key lru.Key, value interface{}) {
|
||
|
val := value.(ByteView)
|
||
|
c.nbytes -= int64(len(key.(string))) + int64(val.Len())
|
||
|
c.nevict++
|
||
|
},
|
||
|
}
|
||
|
}
|
||
|
c.lru.Add(key, value)
|
||
|
c.nbytes += int64(len(key)) + int64(value.Len())
|
||
|
}
|
||
|
|
||
|
func (c *cache) get(key string) (value ByteView, ok bool) {
|
||
|
c.mu.Lock()
|
||
|
defer c.mu.Unlock()
|
||
|
c.nget++
|
||
|
if c.lru == nil {
|
||
|
return
|
||
|
}
|
||
|
vi, ok := c.lru.Get(key)
|
||
|
if !ok {
|
||
|
return
|
||
|
}
|
||
|
c.nhit++
|
||
|
return vi.(ByteView), true
|
||
|
}
|
||
|
|
||
|
func (c *cache) removeOldest() {
|
||
|
c.mu.Lock()
|
||
|
defer c.mu.Unlock()
|
||
|
if c.lru != nil {
|
||
|
c.lru.RemoveOldest()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *cache) bytes() int64 {
|
||
|
c.mu.RLock()
|
||
|
defer c.mu.RUnlock()
|
||
|
return c.nbytes
|
||
|
}
|
||
|
|
||
|
func (c *cache) items() int64 {
|
||
|
c.mu.RLock()
|
||
|
defer c.mu.RUnlock()
|
||
|
return c.itemsLocked()
|
||
|
}
|
||
|
|
||
|
func (c *cache) itemsLocked() int64 {
|
||
|
if c.lru == nil {
|
||
|
return 0
|
||
|
}
|
||
|
return int64(c.lru.Len())
|
||
|
}
|
||
|
|
||
|
// An AtomicInt is an int64 to be accessed atomically.
|
||
|
type AtomicInt int64
|
||
|
|
||
|
// Add atomically adds n to i.
|
||
|
func (i *AtomicInt) Add(n int64) {
|
||
|
atomic.AddInt64((*int64)(i), n)
|
||
|
}
|
||
|
|
||
|
// Get atomically gets the value of i.
|
||
|
func (i *AtomicInt) Get() int64 {
|
||
|
return atomic.LoadInt64((*int64)(i))
|
||
|
}
|
||
|
|
||
|
func (i *AtomicInt) String() string {
|
||
|
return strconv.FormatInt(i.Get(), 10)
|
||
|
}
|
||
|
|
||
|
// CacheStats are returned by stats accessors on Group.
|
||
|
type CacheStats struct {
|
||
|
Bytes int64
|
||
|
Items int64
|
||
|
Gets int64
|
||
|
Hits int64
|
||
|
Evictions int64
|
||
|
}
|