// Copyright 2016 Apcera Inc. All rights reserved. package server import ( "errors" "fmt" "math" "net" "net/url" "regexp" "sort" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/nats-io/gnatsd/auth" "github.com/nats-io/gnatsd/server" natsd "github.com/nats-io/gnatsd/test" "github.com/nats-io/go-nats" "github.com/nats-io/go-nats-streaming/pb" "github.com/nats-io/nats-streaming-server/spb" stores "github.com/nats-io/nats-streaming-server/stores" "github.com/nats-io/nuid" ) // A single STAN server // Server defaults. const ( // VERSION is the current version for the NATS Streaming server. VERSION = "0.3.4" DefaultClusterID = "test-cluster" DefaultDiscoverPrefix = "_STAN.discover" DefaultPubPrefix = "_STAN.pub" DefaultSubPrefix = "_STAN.sub" DefaultSubClosePrefix = "_STAN.subclose" DefaultUnSubPrefix = "_STAN.unsub" DefaultClosePrefix = "_STAN.close" DefaultStoreType = stores.TypeMemory // Heartbeat intervals. DefaultHeartBeatInterval = 30 * time.Second DefaultClientHBTimeout = 10 * time.Second DefaultMaxFailedHeartBeats = int((5 * time.Minute) / DefaultHeartBeatInterval) // Max number of outstanding go-routines handling connect requests for // duplicate client IDs. defaultMaxDupCIDRoutines = 100 // Timeout used to ping the known client when processing a connection // request for a duplicate client ID. defaultCheckDupCIDTimeout = 500 * time.Millisecond // DefaultIOBatchSize is the maximum number of messages to accumulate before flushing a store. DefaultIOBatchSize = 1024 // DefaultIOSleepTime is the duration (in micro-seconds) the server waits for more messages // before starting processing. Set to 0 (or negative) to disable the wait. DefaultIOSleepTime = int64(0) ) // Constant to indicate that sendMsgToSub() should check number of acks pending // against MaxInFlight to know if message should be sent out. const ( forceDelivery = true honorMaxInFlight = false ) // Used for display of limits const ( limitCount = iota limitBytes limitDuration ) // Errors. var ( ErrInvalidSubject = errors.New("stan: invalid subject") ErrInvalidSequence = errors.New("stan: invalid start sequence") ErrInvalidTime = errors.New("stan: invalid start time") ErrInvalidSub = errors.New("stan: invalid subscription") ErrInvalidClient = errors.New("stan: clientID already registered") ErrInvalidAckWait = errors.New("stan: invalid ack wait time, should be >= 1s") ErrInvalidConnReq = errors.New("stan: invalid connection request") ErrInvalidPubReq = errors.New("stan: invalid publish request") ErrInvalidSubReq = errors.New("stan: invalid subscription request") ErrInvalidUnsubReq = errors.New("stan: invalid unsubscribe request") ErrInvalidCloseReq = errors.New("stan: invalid close request") ErrDupDurable = errors.New("stan: duplicate durable registration") ErrInvalidDurName = errors.New("stan: durable name of a durable queue subscriber can't contain the character ':'") ErrUnknownClient = errors.New("stan: unknown clientID") ) // Shared regular expression to check clientID validity. // No lock required since from doc: https://golang.org/pkg/regexp/ // A Regexp is safe for concurrent use by multiple goroutines. var clientIDRegEx *regexp.Regexp func init() { if re, err := regexp.Compile("^[a-zA-Z0-9_-]+$"); err != nil { panic("Unable to compile regular expression") } else { clientIDRegEx = re } } // ioPendingMsg is a record that embeds the pointer to the incoming // NATS Message, the PubMsg and PubAck structures so we reduce the // number of memory allocations to 1 when processing a message from // producer. type ioPendingMsg struct { m *nats.Msg pm pb.PubMsg pa pb.PubAck } // Constant that defines the size of the channel that feeds the IO thread. const ioChannelSize = 64 * 1024 const ( useLocking = true dontUseLocking = false ) const ( scheduleRequest = true processRequest = false ) // StanServer structure represents the STAN server type StanServer struct { // Keep all members for which we use atomic at the beginning of the // struct and make sure they are all 64bits (or use padding if necessary). // atomic.* functions crash on 32bit machines if operand is not aligned // at 64bit. See https://github.com/golang/go/issues/599 ioChannelStatsMaxBatchSize int64 // stats of the max number of messages than went into a single batch sync.RWMutex shutdown bool serverID string info spb.ServerInfo // Contains cluster ID and subjects natsServer *server.Server opts *Options // For scalability, a dedicated connection is used to publish // messages to subscribers. nc *nats.Conn // used for most protocol messages ncs *nats.Conn // used for sending to subscribers and acking publishers wg sync.WaitGroup // Wait on go routines during shutdown // For now, these will be set to the constants DefaultHeartBeatInterval, etc... // but allow to override in tests. hbInterval time.Duration hbTimeout time.Duration maxFailedHB int // Used when processing connect requests for client ID already registered dupCIDGuard sync.RWMutex dupCIDMap map[string]struct{} dupCIDwg sync.WaitGroup // To wait for one routine to end when we have reached the max. dupCIDswg bool // To instruct one go routine to decrement the wait group. dupCIDTimeout time.Duration dupMaxCIDRoutines int // Clients clients *clientStore // Store store stores.Store // IO Channel ioChannel chan *ioPendingMsg ioChannelQuit chan struct{} ioChannelWG sync.WaitGroup // Used to fix out-of-order processing of subUnsub/subClose/connClose // requests due to use of different NATS subscribers for various // protocols. srvCtrlMsgID string // NUID used to filter control messages not intended for this server. closeProtosMu sync.Mutex // Mutex used for unsub/close requests. connCloseReqs map[string]int // Key: clientID Value: ref count // Use these flags for Debug/Trace in places where speed matters. // Normally, Debugf and Tracef will check an atomic variable to // figure out if the statement should be logged, however, the // cost of calling Debugf/Tracef is still significant since there // may be memory allocations to format the string passed to these // calls. So in those situations, use these flags to surround the // calls to Debugf/Tracef. trace bool debug bool } // subStore holds all known state for all subscriptions type subStore struct { sync.RWMutex psubs []*subState // plain subscribers qsubs map[string]*queueState // queue subscribers durables map[string]*subState // durables lookup acks map[string]*subState // ack inbox lookup stan *StanServer // back link to Stan server } // Holds all queue subsribers for a subject/group and // tracks lastSent for the group. type queueState struct { sync.RWMutex lastSent uint64 subs []*subState stalled bool shadow *subState // For durable case, when last member leaves and group is not closed. } // Holds Subscription state type subState struct { sync.RWMutex spb.SubState // Embedded protobuf. Used for storage. subject string qstate *queueState ackWait time.Duration // SubState.AckWaitInSecs expressed as a time.Duration ackTimer *time.Timer ackTimeFloor int64 ackSub *nats.Subscription acksPending map[uint64]struct{} stalled bool newOnHold bool // Prevents delivery of new msgs until old are redelivered (on restart) store stores.SubStore // for easy access to the store interface } // Looks up, or create a new channel if it does not exist func (s *StanServer) lookupOrCreateChannel(channel string) (*stores.ChannelStore, error) { if cs := s.store.LookupChannel(channel); cs != nil { return cs, nil } // It's possible that more than one go routine comes here at the same // time. `ss` will then be simply gc'ed. ss := s.createSubStore() cs, _, err := s.store.CreateChannel(channel, ss) if err != nil { return nil, err } return cs, nil } // createSubStore creates a new instance of `subStore`. func (s *StanServer) createSubStore() *subStore { subs := &subStore{ psubs: make([]*subState, 0, 4), qsubs: make(map[string]*queueState), durables: make(map[string]*subState), acks: make(map[string]*subState), stan: s, } return subs } // Store adds this subscription to the server's `subStore` and also in storage func (ss *subStore) Store(sub *subState) error { if sub == nil { return nil } // `sub` has just been created and can't be referenced anywhere else in // the code, so we don't need locking. // Adds to storage. err := sub.store.CreateSub(&sub.SubState) if err != nil { Errorf("Unable to store subscription [%v:%v] on [%s]: %v", sub.ClientID, sub.Inbox, sub.subject, err) return err } ss.Lock() ss.updateState(sub) ss.Unlock() return nil } // Updates the subStore state with this sub. // The subStore is locked on entry (or does not need, as during server restart). // However, `sub` does not need locking since it has just been created. func (ss *subStore) updateState(sub *subState) { // First store by ackInbox for ack direct lookup ss.acks[sub.AckInbox] = sub // Store by type if sub.isQueueSubscriber() { // Queue subscriber. qs := ss.qsubs[sub.QGroup] if qs == nil { qs = &queueState{ subs: make([]*subState, 0, 4), } ss.qsubs[sub.QGroup] = qs } qs.Lock() // The recovered shadow queue sub will have ClientID=="", // keep a reference to it until a member re-joins the group. if sub.ClientID == "" { // Should not happen, if it does, panic if qs.shadow != nil { panic(fmt.Errorf("there should be only one shadow subscriber for [%q] queue group", sub.QGroup)) } qs.shadow = sub } else { qs.subs = append(qs.subs, sub) } // Needed in the case of server restart, where // the queue group's last sent needs to be updated // based on the recovered subscriptions. if sub.LastSent > qs.lastSent { qs.lastSent = sub.LastSent } qs.Unlock() sub.qstate = qs } else { // Plain subscriber. ss.psubs = append(ss.psubs, sub) } // Hold onto durables in special lookup. if sub.isDurableSubscriber() { ss.durables[sub.durableKey()] = sub } } // Remove a subscriber from the subscription store, leaving durable // subscriptions unless `unsubscribe` is true. func (ss *subStore) Remove(cs *stores.ChannelStore, sub *subState, unsubscribe bool) { if sub == nil { return } sub.Lock() sub.clearAckTimer() durableKey := "" // Do this before clearing the sub.ClientID since this is part of the key!!! if sub.isDurableSubscriber() { durableKey = sub.durableKey() } // Clear the subscriptions clientID sub.ClientID = "" if sub.ackSub != nil { sub.ackSub.Unsubscribe() sub.ackSub = nil } ackInbox := sub.AckInbox qs := sub.qstate isDurable := sub.IsDurable subid := sub.ID store := sub.store qgroup := sub.QGroup sub.Unlock() // Delete from storage non durable subscribers on either connection // close or call to Unsubscribe(), and durable subscribers only on // Unsubscribe(). Leave durable queue subs for now, they need to // be treated differently. if !isDurable || (unsubscribe && durableKey != "") { store.DeleteSub(subid) } ss.Lock() // Delete from ackInbox lookup. delete(ss.acks, ackInbox) // Delete from durable if needed if unsubscribe && durableKey != "" { delete(ss.durables, durableKey) } // Delete ourselves from the list if qs != nil { storageUpdate := false // For queue state, we need to lock specifically, // because qs.subs can be modified by findBestQueueSub, // for which we don't have substore lock held. qs.Lock() qs.subs, _ = sub.deleteFromList(qs.subs) if len(qs.subs) == 0 { // If it was the last being removed, also remove the // queue group from the subStore map, but only if // non durable or explicit unsubscribe. if !isDurable || unsubscribe { delete(ss.qsubs, qgroup) // Delete from storage too. store.DeleteSub(subid) } else { // Group is durable and last member just left the group, // but didn't call Unsubscribe(). Need to keep a reference // to this sub to maintain the state. qs.shadow = sub // Clear the stalled flag qs.stalled = false // Will need to update the LastSent and clear the ClientID // with a storage update. storageUpdate = true } } else { // If there are pending messages in this sub, they need to be // transfered to remaining queue subscribers. numQSubs := len(qs.subs) idx := 0 sub.RLock() // Need to update if this member was the one with the last // message of the group. storageUpdate = sub.LastSent == qs.lastSent sortedSequences := makeSortedSequences(sub.acksPending) for _, seq := range sortedSequences { m := cs.Msgs.Lookup(seq) if m == nil { // Don't need to ack it since we are destroying this subscription continue } // Get one of the remaning queue subscribers. qsub := qs.subs[idx] qsub.Lock() // Store in storage if err := qsub.store.AddSeqPending(qsub.ID, m.Sequence); err != nil { Errorf("STAN: [Client:%s] Unable to update subscription for %s:%v (%v)", qsub.ClientID, m.Subject, m.Sequence, err) qsub.Unlock() continue } // We don't need to update if the sub's lastSent is transfered // to another queue subscriber. if storageUpdate && m.Sequence == qs.lastSent { storageUpdate = false } // Update LastSent if applicable if m.Sequence > qsub.LastSent { qsub.LastSent = m.Sequence } // Store in ackPending. qsub.acksPending[m.Sequence] = struct{}{} // Make sure we set its ack timer if none already set, otherwise // adjust the ackTimer floor as needed.s if qsub.ackTimer == nil { ss.stan.setupAckTimer(qsub, qsub.ackWait) } else if qsub.ackTimeFloor > 0 && m.Timestamp < qsub.ackTimeFloor { qsub.ackTimeFloor = m.Timestamp } qsub.Unlock() // Move to the next queue subscriber, going back to first if needed. idx++ if idx == numQSubs { idx = 0 } } sub.RUnlock() } if storageUpdate { // If we have a shadow sub, use that one, othewise any queue subscriber // will do, so use the first. qsub := qs.shadow if qsub == nil { qsub = qs.subs[0] } qsub.Lock() qsub.LastSent = qs.lastSent qsub.store.UpdateSub(&qsub.SubState) qsub.Unlock() } qs.Unlock() } else { ss.psubs, _ = sub.deleteFromList(ss.psubs) } ss.Unlock() } // Lookup by durable name. func (ss *subStore) LookupByDurable(durableName string) *subState { ss.RLock() sub := ss.durables[durableName] ss.RUnlock() return sub } // Lookup by ackInbox name. func (ss *subStore) LookupByAckInbox(ackInbox string) *subState { ss.RLock() sub := ss.acks[ackInbox] ss.RUnlock() return sub } // Options for STAN Server type Options struct { ID string DiscoverPrefix string StoreType string FilestoreDir string FileStoreOpts stores.FileStoreOptions stores.StoreLimits // Store limits (MaxChannels, etc..) Trace bool // Verbose trace Debug bool // Debug trace Secure bool // Create a TLS enabled connection w/o server verification ClientCert string // Client Certificate for TLS ClientKey string // Client Key for TLS ClientCA string // Client CAs for TLS IOBatchSize int // Number of messages we collect from clients before processing them. IOSleepTime int64 // Duration (in micro-seconds) the server waits for more message to fill up a batch. NATSServerURL string // URL for external NATS Server to connect to. If empty, NATS Server is embedded. } // DefaultOptions are default options for the STAN server var defaultOptions = Options{ ID: DefaultClusterID, DiscoverPrefix: DefaultDiscoverPrefix, StoreType: DefaultStoreType, FileStoreOpts: stores.DefaultFileStoreOptions, IOBatchSize: DefaultIOBatchSize, IOSleepTime: DefaultIOSleepTime, NATSServerURL: "", } // GetDefaultOptions returns default options for the STAN server func GetDefaultOptions() (o *Options) { opts := defaultOptions opts.StoreLimits = stores.DefaultStoreLimits return &opts } // DefaultNatsServerOptions are default options for the NATS server var DefaultNatsServerOptions = server.Options{ Host: "localhost", Port: 4222, NoLog: true, NoSigs: true, } // Used only by tests func setDebugAndTraceToDefaultOptions(val bool) { defaultOptions.Trace = val defaultOptions.Debug = val } func stanDisconnectedHandler(nc *nats.Conn) { if nc.LastError() != nil { Errorf("STAN: connection %q has been disconnected: %v", nc.Opts.Name, nc.LastError()) } } func stanReconnectedHandler(nc *nats.Conn) { Noticef("STAN: connection %q reconnected to NATS Server at %q", nc.Opts.Name, nc.ConnectedUrl()) } func stanClosedHandler(nc *nats.Conn) { Debugf("STAN: connection %q has been closed", nc.Opts.Name) } func stanErrorHandler(nc *nats.Conn, sub *nats.Subscription, err error) { Errorf("STAN: Asynchronous error on connection %s, subject %s: %s", nc.Opts.Name, sub.Subject, err) } func (s *StanServer) buildServerURLs(sOpts *Options, opts *server.Options) ([]string, error) { var hostport string natsURL := sOpts.NATSServerURL // If the URL to an external NATS is provided... if natsURL != "" { // If it has user/pwd info or is a list of urls... if strings.Contains(natsURL, "@") || strings.Contains(natsURL, ",") { // Return the array urls := strings.Split(natsURL, ",") for i, s := range urls { urls[i] = strings.Trim(s, " ") } return urls, nil } // Otherwise, prepare the host and port and continue to see // if user/pass needs to be added. // First trim the protocol. parts := strings.Split(natsURL, "://") if len(parts) != 2 { return nil, fmt.Errorf("malformed url: %v", natsURL) } natsURL = parts[1] host, port, err := net.SplitHostPort(natsURL) if err != nil { return nil, err } // Use net.Join to support IPV6 addresses. hostport = net.JoinHostPort(host, port) } else { // We embed the server, so it is local. If host is "any", // use 127.0.0.1 or ::1 for host address (important for // Windows since connect with 0.0.0.0 or :: fails). sport := strconv.Itoa(opts.Port) if opts.Host == "0.0.0.0" { hostport = net.JoinHostPort("127.0.0.1", sport) } else if opts.Host == "::" || opts.Host == "[::]" { hostport = net.JoinHostPort("::1", sport) } else { hostport = net.JoinHostPort(opts.Host, sport) } } var userpart string if opts.Authorization != "" { userpart = opts.Authorization } else if opts.Username != "" { userpart = fmt.Sprintf("%s:%s", opts.Username, opts.Password) } if userpart != "" { return []string{fmt.Sprintf("nats://%s@%s", userpart, hostport)}, nil } return []string{fmt.Sprintf("nats://%s", hostport)}, nil } // createNatsClientConn creates a connection to the NATS server, using // TLS if configured. Pass in the NATS server options to derive a // connection url, and for other future items (e.g. auth) func (s *StanServer) createNatsClientConn(name string, sOpts *Options, nOpts *server.Options) (*nats.Conn, error) { var err error ncOpts := nats.DefaultOptions ncOpts.Servers, err = s.buildServerURLs(sOpts, nOpts) if err != nil { return nil, err } ncOpts.Name = fmt.Sprintf("_NSS-%s-%s", sOpts.ID, name) if err = nats.ErrorHandler(stanErrorHandler)(&ncOpts); err != nil { return nil, err } if err = nats.ReconnectHandler(stanReconnectedHandler)(&ncOpts); err != nil { return nil, err } if err = nats.ClosedHandler(stanClosedHandler)(&ncOpts); err != nil { return nil, err } if err = nats.DisconnectHandler(stanDisconnectedHandler)(&ncOpts); err != nil { return nil, err } if sOpts.Secure { if err = nats.Secure()(&ncOpts); err != nil { return nil, err } } if sOpts.ClientCA != "" { if err = nats.RootCAs(sOpts.ClientCA)(&ncOpts); err != nil { return nil, err } } if sOpts.ClientCert != "" { if err = nats.ClientCert(sOpts.ClientCert, sOpts.ClientKey)(&ncOpts); err != nil { return nil, err } } Tracef("STAN: NATS conn opts: %v", ncOpts) var nc *nats.Conn if nc, err = ncOpts.Connect(); err != nil { return nil, err } return nc, err } func (s *StanServer) createNatsConnections(sOpts *Options, nOpts *server.Options) { var err error if s.ncs, err = s.createNatsClientConn("send", sOpts, nOpts); err != nil { panic(fmt.Sprintf("Can't connect to NATS server (send): %v\n", err)) } if s.nc, err = s.createNatsClientConn("general", sOpts, nOpts); err != nil { panic(fmt.Sprintf("Can't connect to NATS server (general): %v\n", err)) } } // RunServer will startup an embedded STAN server and a nats-server to support it. func RunServer(ID string) *StanServer { sOpts := GetDefaultOptions() sOpts.ID = ID nOpts := DefaultNatsServerOptions return RunServerWithOpts(sOpts, &nOpts) } // RunServerWithOpts will startup an embedded STAN server and a nats-server to support it. func RunServerWithOpts(stanOpts *Options, natsOpts *server.Options) *StanServer { // Run a nats server by default sOpts := stanOpts nOpts := natsOpts if stanOpts == nil { sOpts = GetDefaultOptions() } if natsOpts == nil { no := DefaultNatsServerOptions nOpts = &no } Noticef("Starting nats-streaming-server[%s] version %s", sOpts.ID, VERSION) s := StanServer{ serverID: nuid.Next(), opts: sOpts, hbInterval: DefaultHeartBeatInterval, hbTimeout: DefaultClientHBTimeout, maxFailedHB: DefaultMaxFailedHeartBeats, dupCIDMap: make(map[string]struct{}), dupMaxCIDRoutines: defaultMaxDupCIDRoutines, dupCIDTimeout: defaultCheckDupCIDTimeout, ioChannelQuit: make(chan struct{}, 1), srvCtrlMsgID: nuid.Next(), connCloseReqs: make(map[string]int), trace: sOpts.Trace, debug: sOpts.Debug, } // Ensure that we shutdown the server if there is a panic during startup. // This will ensure that stores are closed (which otherwise would cause // issues during testing) and that the NATS Server (if started) is also // properly shutdown. To do so, we recover from the panic in order to // call Shutdown, then issue the original panic. defer func() { if r := recover(); r != nil { s.Shutdown() // Log the reason for the panic. We use noticef here since // Fatalf() would cause an exit. Noticef("Failed to start: %v", r) // Issue the original panic now that the store is closed. panic(r) } }() // Get the store limits limits := &sOpts.StoreLimits var err error var recoveredState *stores.RecoveredState var recoveredSubs []*subState var store stores.Store // Ensure store type option is in upper-case sOpts.StoreType = strings.ToUpper(sOpts.StoreType) // Create the store. So far either memory or file-based. switch sOpts.StoreType { case stores.TypeFile: // The dir must be specified if sOpts.FilestoreDir == "" { err = fmt.Errorf("for %v stores, root directory must be specified", stores.TypeFile) break } store, recoveredState, err = stores.NewFileStore(sOpts.FilestoreDir, limits, stores.AllOptions(&sOpts.FileStoreOpts)) case stores.TypeMemory: store, err = stores.NewMemoryStore(limits) default: err = fmt.Errorf("unsupported store type: %v", sOpts.StoreType) } if err != nil { panic(err) } // StanServer.store (s.store here) is of type stores.Store, which is an // interace. If we assign s.store in the call of the constructor and there // is an error, although the call returns "nil" for the store, we can no // longer have a test such as "if s.store != nil" (as we do in shutdown). // This is because the constructors return a store implementention. // We would need to use reflection such as reflect.ValueOf(s.store).IsNil(). // So to not do that, we simply delay the setting of s.store when we know // that it was successful. s.store = store // Create clientStore s.clients = &clientStore{store: s.store} callStoreInit := false if recoveredState != nil { // Copy content s.info = *recoveredState.Info // Check cluster IDs match if s.opts.ID != s.info.ClusterID { panic(fmt.Errorf("Cluster ID %q does not match recovered value of %q", s.opts.ID, s.info.ClusterID)) } // Check to see if SubClose subject is present or not. // If not, it means we recovered from an older server, so // need to update. if s.info.SubClose == "" { s.info.SubClose = fmt.Sprintf("%s.%s", DefaultSubClosePrefix, nuid.Next()) // Update the store with the server info callStoreInit = true } // Restore clients state s.processRecoveredClients(recoveredState.Clients) // Process recovered channels (if any). recoveredSubs = s.processRecoveredChannels(recoveredState.Subs) } else { s.info.ClusterID = s.opts.ID // Generate Subjects // FIXME(dlc) guid needs to be shared in cluster mode s.info.Discovery = fmt.Sprintf("%s.%s", s.opts.DiscoverPrefix, s.info.ClusterID) s.info.Publish = fmt.Sprintf("%s.%s", DefaultPubPrefix, nuid.Next()) s.info.Subscribe = fmt.Sprintf("%s.%s", DefaultSubPrefix, nuid.Next()) s.info.SubClose = fmt.Sprintf("%s.%s", DefaultSubClosePrefix, nuid.Next()) s.info.Unsubscribe = fmt.Sprintf("%s.%s", DefaultUnSubPrefix, nuid.Next()) s.info.Close = fmt.Sprintf("%s.%s", DefaultClosePrefix, nuid.Next()) callStoreInit = true } if callStoreInit { // Initialize the store with the server info if err := s.store.Init(&s.info); err != nil { panic(fmt.Errorf("Unable to initialize the store: %v", err)) } } // If no NATS server url is provided, it means that we embed the NATS Server if sOpts.NATSServerURL == "" { s.startNATSServer(nOpts) } s.createNatsConnections(sOpts, nOpts) s.ensureRunningStandAlone() s.initSubscriptions() if recoveredState != nil { // Do some post recovery processing (create subs on AckInbox, setup // some timers, etc...) if err := s.postRecoveryProcessing(recoveredState.Clients, recoveredSubs); err != nil { panic(fmt.Errorf("error during post recovery processing: %v\n", err)) } } // Flush to make sure all subscriptions are processed before // we return control to the user. if err := s.nc.Flush(); err != nil { panic(fmt.Sprintf("Could not flush the subscriptions, %v\n", err)) } Noticef("STAN: Message store is %s", s.store.Name()) Noticef("STAN: --------- Store Limits ---------") Noticef("STAN: Channels: %s", getLimitStr(true, int64(limits.MaxChannels), int64(stores.DefaultStoreLimits.MaxChannels), limitCount)) Noticef("STAN: -------- channels limits -------") printLimits(true, &limits.ChannelLimits, &stores.DefaultStoreLimits.ChannelLimits) for cn, cl := range limits.PerChannel { Noticef("STAN: Channel: %q", cn) printLimits(false, cl, &limits.ChannelLimits) } Noticef("STAN: --------------------------------") // Execute (in a go routine) redelivery of unacknowledged messages, // and release newOnHold s.wg.Add(1) go s.performRedeliveryOnStartup(recoveredSubs) return &s } func printLimits(isGlobal bool, limits, parentLimits *stores.ChannelLimits) { plMaxSubs := int64(parentLimits.MaxSubscriptions) plMaxMsgs := int64(parentLimits.MaxMsgs) plMaxBytes := parentLimits.MaxBytes plMaxAge := parentLimits.MaxAge Noticef("STAN: Subscriptions: %s", getLimitStr(isGlobal, int64(limits.MaxSubscriptions), plMaxSubs, limitCount)) Noticef("STAN: Messages : %s", getLimitStr(isGlobal, int64(limits.MaxMsgs), plMaxMsgs, limitCount)) Noticef("STAN: Bytes : %s", getLimitStr(isGlobal, limits.MaxBytes, plMaxBytes, limitBytes)) Noticef("STAN: Age : %s", getLimitStr(isGlobal, int64(limits.MaxAge), int64(plMaxAge), limitDuration)) } func getLimitStr(isGlobal bool, val, parentVal int64, limitType int) string { valStr := "" inherited := "" if !isGlobal && val == 0 { val = parentVal } if val == parentVal { inherited = " *" } if val == 0 { valStr = "unlimited" } else { switch limitType { case limitBytes: valStr = friendlyBytes(val) case limitDuration: valStr = fmt.Sprintf("%v", time.Duration(val)) default: valStr = fmt.Sprintf("%v", val) } } return fmt.Sprintf("%13s%s", valStr, inherited) } func friendlyBytes(msgbytes int64) string { bytes := float64(msgbytes) base := 1024 pre := []string{"K", "M", "G", "T", "P", "E"} var post = "B" if bytes < float64(base) { return fmt.Sprintf("%v B", bytes) } exp := int(math.Log(bytes) / math.Log(float64(base))) index := exp - 1 units := pre[index] + post return fmt.Sprintf("%.2f %s", bytes/math.Pow(float64(base), float64(exp)), units) } // TODO: Explore parameter passing in gnatsd. Keep seperate for now. func (s *StanServer) configureClusterOpts(opts *server.Options) error { if opts.Cluster.ListenStr == "" { if opts.RoutesStr != "" { Fatalf("Solicited routes require cluster capabilities, e.g. --cluster.") } return nil } clusterURL, err := url.Parse(opts.Cluster.ListenStr) h, p, err := net.SplitHostPort(clusterURL.Host) if err != nil { return err } opts.Cluster.Host = h _, err = fmt.Sscan(p, &opts.Cluster.Port) if err != nil { return err } if clusterURL.User != nil { pass, hasPassword := clusterURL.User.Password() if !hasPassword { return fmt.Errorf("Expected cluster password to be set.") } opts.Cluster.Password = pass user := clusterURL.User.Username() opts.Cluster.Username = user } // If we have routes but no config file, fill in here. if opts.RoutesStr != "" && opts.Routes == nil { opts.Routes = server.RoutesFromStr(opts.RoutesStr) } return nil } // configureNATSServerTLS sets up TLS for the NATS Server. // Additional TLS parameters (e.g. cipher suites) will need to be placed // in a configuration file specified through the -config parameter. func (s *StanServer) configureNATSServerTLS(opts *server.Options) { tlsSet := false tc := server.TLSConfigOpts{} if opts.TLSCert != "" { tc.CertFile = opts.TLSCert tlsSet = true } if opts.TLSKey != "" { tc.KeyFile = opts.TLSKey tlsSet = true } if opts.TLSCaCert != "" { tc.CaFile = opts.TLSCaCert tlsSet = true } if opts.TLSVerify { tc.Verify = true tlsSet = true } var err error if tlsSet { if opts.TLSConfig, err = server.GenTLSConfig(&tc); err != nil { // The connection will fail later if the problem is severe enough. Errorf("STAN: Unable to setup NATS Server TLS: %v", err) } } } // configureNATSServerAuth sets up user authentication for the NATS Server. func (s *StanServer) configureNATSServerAuth(opts *server.Options) server.Auth { // setup authorization var a server.Auth if opts.Authorization != "" { a = &auth.Token{Token: opts.Authorization} } if opts.Username != "" { a = &auth.Plain{Username: opts.Username, Password: opts.Password} } if opts.Users != nil { a = auth.NewMultiUser(opts.Users) } return a } // startNATSServer massages options as necessary, and starts the embedded // NATS server. No errors, only panics upon error conditions. func (s *StanServer) startNATSServer(opts *server.Options) { s.configureClusterOpts(opts) s.configureNATSServerTLS(opts) a := s.configureNATSServerAuth(opts) s.natsServer = natsd.RunServerWithAuth(opts, a) } // ensureRunningStandAlone prevents this streaming server from starting // if another is found using the same cluster ID - a possibility when // routing is enabled. func (s *StanServer) ensureRunningStandAlone() { clusterID := s.ClusterID() hbInbox := nats.NewInbox() timeout := time.Millisecond * 250 // We cannot use the client's API here as it will create a dependency // cycle in the streaming client, so build our request and see if we // get a response. req := &pb.ConnectRequest{ClientID: clusterID, HeartbeatInbox: hbInbox} b, _ := req.Marshal() reply, err := s.nc.Request(s.info.Discovery, b, timeout) if err == nats.ErrTimeout { Debugf("Did not detect another server instance.") return } if err != nil { Errorf("Request error detecting another server instance: %v", err) return } // See if the response is valid and can be unmarshalled. cr := &pb.ConnectResponse{} err = cr.Unmarshal(reply.Data) if err != nil { // something other than a compatible streaming server responded // so continue. Errorf("Unmarshall error while detecting another server instance: %v", err) return } // Another streaming server was found, cleanup then panic. clreq := &pb.CloseRequest{ClientID: clusterID} b, _ = clreq.Marshal() s.nc.Request(cr.CloseRequests, b, timeout) panic(fmt.Errorf("discovered another streaming server with cluster ID %q", clusterID)) } // Binds server's view of a client with stored Client objects. func (s *StanServer) processRecoveredClients(clients []*stores.Client) { for _, sc := range clients { // Create a client object and set it as UserData on the stored Client. // No lock needed here because no other routine is going to use this // until the server is finished recovering. sc.UserData = &client{subs: make([]*subState, 0, 4)} } } // Reconstruct the subscription state on restart. // We don't use locking in there because there is no communication // with the NATS server and/or clients, so no chance that the state // changes while we are doing this. func (s *StanServer) processRecoveredChannels(subscriptions stores.RecoveredSubscriptions) []*subState { // We will return the recovered subscriptions allSubs := make([]*subState, 0, 16) for channelName, recoveredSubs := range subscriptions { // Lookup the ChannelStore from the store channel := s.store.LookupChannel(channelName) // Create the subStore for this channel ss := s.createSubStore() // Set it into the channel store channel.UserData = ss // Get the recovered subscriptions for this channel. for _, recSub := range recoveredSubs { // Create a subState sub := &subState{ subject: channelName, ackWait: time.Duration(recSub.Sub.AckWaitInSecs) * time.Second, store: channel.Subs, } sub.acksPending = make(map[uint64]struct{}, len(recSub.Pending)) for seq := range recSub.Pending { sub.acksPending[seq] = struct{}{} } if len(sub.acksPending) > 0 { // Prevent delivery of new messages until resent of old ones sub.newOnHold = true // We may not need to set this because this would be set // during the initial redelivery attempt, but does not hurt. if int32(len(sub.acksPending)) >= sub.MaxInFlight { sub.stalled = true } } // Copy over fields from SubState protobuf sub.SubState = *recSub.Sub // When recovering older stores, IsDurable may not exist for // durable subscribers. Set it now. durableSub := sub.isDurableSubscriber() // not a durable queue sub! if durableSub { sub.IsDurable = true } // Add the subscription to the corresponding client added := s.clients.AddSub(sub.ClientID, sub) if added || sub.IsDurable { // Add this subscription to subStore. ss.updateState(sub) // If this is a durable and the client was not recovered // (was offline), we need to clear the ClientID otherwise // it won't be able to reconnect if durableSub && !added { sub.ClientID = "" } // Add to the array, unless this is the shadow durable queue sub that // was left in the store in order to maintain the group's state. if !sub.isShadowQueueDurable() { allSubs = append(allSubs, sub) } } } } return allSubs } // Do some final setup. Be minded of locking here since the server // has started communication with NATS server/clients. func (s *StanServer) postRecoveryProcessing(recoveredClients []*stores.Client, recoveredSubs []*subState) error { var err error for _, sub := range recoveredSubs { sub.Lock() // To be on the safe side, just check that the ackSub has not // been created (may happen with durables that may reconnect maybe?) if sub.ackSub == nil { // Subscribe to acks sub.ackSub, err = s.nc.Subscribe(sub.AckInbox, s.processAckMsg) if err != nil { sub.Unlock() return err } sub.ackSub.SetPendingLimits(-1, -1) } sub.Unlock() } // Go through the list of clients and ensure their Hb timer is set. for _, sc := range recoveredClients { c := sc.UserData.(*client) c.Lock() // Client could have been unregisted by now since the server has its // internal subscriptions started (and may receive client requests). if !c.unregistered && c.hbt == nil { // Because of the loop, we need to make copy for the closure // to time.AfterFunc cID := sc.ID c.hbt = time.AfterFunc(s.hbInterval, func() { s.checkClientHealth(cID) }) } c.Unlock() } return nil } // Redelivers unacknowledged messages and release the hold for new messages delivery func (s *StanServer) performRedeliveryOnStartup(recoveredSubs []*subState) { defer s.wg.Done() for _, sub := range recoveredSubs { // Ignore subs that did not have any ack pendings on startup. sub.Lock() if !sub.newOnHold { sub.Unlock() continue } // Create the delivery timer since performAckExpirationRedelivery // may need to reset the timer (which would not work if timer is nil). // Set it to a high value, it will be correctly reset or cleared. s.setupAckTimer(sub, time.Hour) // If this is a durable and it is offline, then skip the rest. if sub.isOfflineDurableSubscriber() { sub.newOnHold = false sub.Unlock() continue } // Unlock in order to call function below sub.Unlock() // Send old messages (lock is acquired in that function) s.performAckExpirationRedelivery(sub) // Regrab lock sub.Lock() // Allow new messages to be delivered sub.newOnHold = false subject := sub.subject qs := sub.qstate sub.Unlock() cs := s.store.LookupChannel(subject) if cs == nil { continue } // Kick delivery of (possible) new messages if qs != nil { s.sendAvailableMessagesToQueue(cs, qs) } else { s.sendAvailableMessages(cs, sub) } } } // initSubscriptions will setup initial subscriptions for discovery etc. func (s *StanServer) initSubscriptions() { s.startIOLoop() // Listen for connection requests. _, err := s.nc.Subscribe(s.info.Discovery, s.connectCB) if err != nil { panic(fmt.Sprintf("Could not subscribe to discover subject, %v\n", err)) } // Receive published messages from clients. pubSubject := fmt.Sprintf("%s.>", s.info.Publish) _, err = s.nc.Subscribe(pubSubject, s.processClientPublish) if err != nil { panic(fmt.Sprintf("Could not subscribe to publish subject, %v\n", err)) } // Receive subscription requests from clients. _, err = s.nc.Subscribe(s.info.Subscribe, s.processSubscriptionRequest) if err != nil { panic(fmt.Sprintf("Could not subscribe to subscribe request subject, %v\n", err)) } // Receive unsubscribe requests from clients. _, err = s.nc.Subscribe(s.info.Unsubscribe, s.processUnsubscribeRequest) if err != nil { panic(fmt.Sprintf("Could not subscribe to unsubscribe request subject, %v\n", err)) } // Receive subscription close requests from clients. _, err = s.nc.Subscribe(s.info.SubClose, s.processSubCloseRequest) if err != nil { panic(fmt.Sprintf("Could not subscribe to subscription close request subject, %v\n", err)) } // Receive close requests from clients. _, err = s.nc.Subscribe(s.info.Close, s.processCloseRequest) if err != nil { panic(fmt.Sprintf("Could not subscribe to close request subject, %v\n", err)) } Debugf("STAN: Discover subject: %s", s.info.Discovery) Debugf("STAN: Publish subject: %s", pubSubject) Debugf("STAN: Subscribe subject: %s", s.info.Subscribe) Debugf("STAN: Unsubscribe subject: %s", s.info.Unsubscribe) Debugf("STAN: Close subject: %s", s.info.Close) } // Process a client connect request func (s *StanServer) connectCB(m *nats.Msg) { req := &pb.ConnectRequest{} err := req.Unmarshal(m.Data) if err != nil || !clientIDRegEx.MatchString(req.ClientID) || req.HeartbeatInbox == "" { Debugf("STAN: [Client:?] Invalid conn request: ClientID=%s, Inbox=%s, err=%v", req.ClientID, req.HeartbeatInbox, err) s.sendConnectErr(m.Reply, ErrInvalidConnReq.Error()) return } // Try to register client, isNew, err := s.clients.Register(req.ClientID, req.HeartbeatInbox) if err != nil { Debugf("STAN: [Client:%s] Error registering client: %v", req.ClientID, err) s.sendConnectErr(m.Reply, err.Error()) return } // Handle duplicate IDs in a dedicated go-routine if !isNew { // Do we have a routine in progress for this client ID? s.dupCIDGuard.RLock() _, inProgress := s.dupCIDMap[req.ClientID] s.dupCIDGuard.RUnlock() // Yes, fail this request here. if inProgress { Debugf("STAN: [Client:%s] Connect failed; already connected", req.ClientID) s.sendConnectErr(m.Reply, ErrInvalidClient.Error()) return } // If server has started shutdown, we can't call wg.Add() so we need // to check on shutdown status. Note that s.wg is for all server's // go routines, not specific to duplicate CID handling. Use server's // lock here. s.Lock() shutdown := s.shutdown if !shutdown { // Assume we are going to start a go routine. s.wg.Add(1) } s.Unlock() if shutdown { // The client will timeout on connect return } // If we have exhausted the max number of go routines, we will have // to wait that one finishes. needToWait := false s.dupCIDGuard.Lock() s.dupCIDMap[req.ClientID] = struct{}{} if len(s.dupCIDMap) > s.dupMaxCIDRoutines { s.dupCIDswg = true s.dupCIDwg.Add(1) needToWait = true } s.dupCIDGuard.Unlock() // If we need to wait for a go routine to return... if needToWait { s.dupCIDwg.Wait() } // Start a go-routine to handle this connect request go func() { s.processConnectRequestWithDupID(client, req, m.Reply) }() return } // Here, we accept this client's incoming connect request. s.finishConnectRequest(client, req, m.Reply) } func (s *StanServer) finishConnectRequest(sc *stores.Client, req *pb.ConnectRequest, replyInbox string) { cr := &pb.ConnectResponse{ PubPrefix: s.info.Publish, SubRequests: s.info.Subscribe, UnsubRequests: s.info.Unsubscribe, SubCloseRequests: s.info.SubClose, CloseRequests: s.info.Close, } b, _ := cr.Marshal() s.nc.Publish(replyInbox, b) s.RLock() hbInterval := s.hbInterval s.RUnlock() clientID := req.ClientID hbInbox := req.HeartbeatInbox client := sc.UserData.(*client) // Heartbeat timer. client.Lock() client.hbt = time.AfterFunc(hbInterval, func() { s.checkClientHealth(clientID) }) client.Unlock() Debugf("STAN: [Client:%s] Connected (Inbox=%v)", clientID, hbInbox) } func (s *StanServer) processConnectRequestWithDupID(sc *stores.Client, req *pb.ConnectRequest, replyInbox string) { sendErr := true hbInbox := sc.HbInbox clientID := sc.ID defer func() { s.dupCIDGuard.Lock() delete(s.dupCIDMap, clientID) if s.dupCIDswg { s.dupCIDswg = false s.dupCIDwg.Done() } s.dupCIDGuard.Unlock() s.wg.Done() }() // This is the HbInbox from the "old" client. See if it is up and // running by sending a ping to that inbox. if _, err := s.nc.Request(hbInbox, nil, s.dupCIDTimeout); err != nil { // The old client didn't reply, assume it is dead, close it and continue. s.closeClient(useLocking, clientID) // Between the close and the new registration below, it is possible // that a connection request came in (in connectCB) and since the // client is now unregistered, the new connection was accepted there. // The registration below will then fail, in which case we will fail // this request. // Need to re-register now based on the new request info. var isNew bool sc, isNew, err = s.clients.Register(req.ClientID, req.HeartbeatInbox) if err == nil && isNew { // We could register the new client. Debugf("STAN: [Client:%s] Replaced old client (Inbox=%v)", req.ClientID, hbInbox) sendErr = false } } // The currently registered client is responding, or we failed to register, // so fail the request of the incoming client connect request. if sendErr { Debugf("STAN: [Client:%s] Connect failed; already connected", clientID) s.sendConnectErr(replyInbox, ErrInvalidClient.Error()) return } // We have replaced the old with the new. s.finishConnectRequest(sc, req, replyInbox) } func (s *StanServer) sendConnectErr(replyInbox, err string) { cr := &pb.ConnectResponse{Error: err} b, _ := cr.Marshal() s.nc.Publish(replyInbox, b) } // Send a heartbeat call to the client. func (s *StanServer) checkClientHealth(clientID string) { sc := s.store.GetClient(clientID) if sc == nil { return } client := sc.UserData.(*client) hbInbox := sc.HbInbox // Capture these under lock (as of now, there are not configurable, // but we tweak them in tests and maybe they will be settable in // the future) s.RLock() hbInterval := s.hbInterval hbTimeout := s.hbTimeout maxFailedHB := s.maxFailedHB s.RUnlock() client.Lock() if client.unregistered { client.Unlock() return } if _, err := s.nc.Request(hbInbox, nil, hbTimeout); err != nil { client.fhb++ if client.fhb > maxFailedHB { Debugf("STAN: [Client:%s] Timed out on heartbeats.", clientID) client.Unlock() s.closeClient(useLocking, clientID) return } } else { client.fhb = 0 } client.hbt.Reset(hbInterval) client.Unlock() } // Close a client func (s *StanServer) closeClient(lock bool, clientID string) bool { if lock { s.closeProtosMu.Lock() defer s.closeProtosMu.Unlock() } // Remove from our clientStore. sc := s.clients.Unregister(clientID) if sc == nil { return false } hbInbox := sc.HbInbox // At this point, client.unregistered has been set to true, // in Unregister() preventing any addition/removal of subs, etc.. client := sc.UserData.(*client) client.Lock() if client.hbt != nil { client.hbt.Stop() } client.Unlock() // Remove all non-durable subscribers. s.removeAllNonDurableSubscribers(client) Debugf("STAN: [Client:%s] Closed (Inbox=%v)", clientID, hbInbox) return true } // processCloseRequest process inbound messages from clients. func (s *StanServer) processCloseRequest(m *nats.Msg) { req := &pb.CloseRequest{} err := req.Unmarshal(m.Data) if err != nil { Errorf("STAN: Received invalid close request, subject=%s.", m.Subject) s.sendCloseErr(m.Reply, ErrInvalidCloseReq.Error()) return } // Lock for the remainder of the function s.closeProtosMu.Lock() defer s.closeProtosMu.Unlock() ctrlMsg := &spb.CtrlMsg{ MsgType: spb.CtrlMsg_ConnClose, ServerID: s.srvCtrlMsgID, Data: []byte(req.ClientID), } ctrlBytes, _ := ctrlMsg.Marshal() ctrlMsgNatsMsg := &nats.Msg{ Subject: s.info.Publish + ".close", // any pub subject will do Reply: m.Reply, Data: ctrlBytes, } refs := 0 if s.ncs.PublishMsg(ctrlMsgNatsMsg) == nil { refs++ } subs := s.clients.GetSubs(req.ClientID) if len(subs) > 0 { // There are subscribers, we will schedule the connection // close request to subscriber's ackInbox subscribers. for _, sub := range subs { sub.Lock() if sub.ackSub != nil { ctrlMsgNatsMsg.Subject = sub.AckInbox if s.ncs.PublishMsg(ctrlMsgNatsMsg) == nil { refs++ } } sub.Unlock() } } // If were unable to schedule a single proto, then execute // performConnClose from here. if refs == 0 { s.connCloseReqs[req.ClientID] = 1 s.performConnClose(dontUseLocking, m, req.ClientID) } else { // Store our reference count and wait for performConnClose to // be invoked... s.connCloseReqs[req.ClientID] = refs } } // performConnClose performs a connection close operation after all // client's pubMsg or client acks have been processed. func (s *StanServer) performConnClose(locking bool, m *nats.Msg, clientID string) { if locking { s.closeProtosMu.Lock() defer s.closeProtosMu.Unlock() } refs := s.connCloseReqs[clientID] refs-- if refs > 0 { // Not done yet, update reference count s.connCloseReqs[clientID] = refs return } // Perform the connection close here... delete(s.connCloseReqs, clientID) // The function or the caller is already locking, so do not use // locking in that function. if !s.closeClient(dontUseLocking, clientID) { Errorf("STAN: Unknown client %q in close request", clientID) s.sendCloseErr(m.Reply, ErrUnknownClient.Error()) return } resp := &pb.CloseResponse{} b, _ := resp.Marshal() s.nc.Publish(m.Reply, b) } func (s *StanServer) sendCloseErr(subj, err string) { resp := &pb.CloseResponse{Error: err} if b, err := resp.Marshal(); err == nil { s.nc.Publish(subj, b) } } // processClientPublish process inbound messages from clients. func (s *StanServer) processClientPublish(m *nats.Msg) { iopm := &ioPendingMsg{m: m} pm := &iopm.pm if pm.Unmarshal(m.Data) != nil { // Expecting only a connection close request... if s.processInternalCloseRequest(m, true) { return } // else we will report an error below... } // Make sure we have a clientID, guid, etc. if pm.Guid == "" || !s.clients.IsValid(pm.ClientID) || !isValidSubject(pm.Subject) { Errorf("STAN: Received invalid client publish message %v", pm) s.sendPublishErr(m.Reply, pm.Guid, ErrInvalidPubReq) return } s.ioChannel <- iopm } // processInternalCloseRequest processes the incoming message has // a CtrlMsg. If this is not a CtrlMsg, returns false to indicate an error. // If the CtrlMsg's ServerID is not this server, the request is simply // ignored and this function returns true (so the caller does not fail). // Based on the CtrlMsg type, invokes appropriate function to // do final processing of unsub/subclose/conn close request. func (s *StanServer) processInternalCloseRequest(m *nats.Msg, onlyConnClose bool) bool { cm := &spb.CtrlMsg{} if cm.Unmarshal(m.Data) != nil { return false } // If this control message is not intended for us, simply // ignore the request and does not return a failure. if cm.ServerID != s.srvCtrlMsgID { return true } // If we expect only a connection close request but get // something else, report as a failure. if onlyConnClose && cm.MsgType != spb.CtrlMsg_ConnClose { return false } switch cm.MsgType { case spb.CtrlMsg_SubUnsubscribe: // SubUnsub and SubClose use same function, using cm.MsgType // to differentiate between unsubscribe and close. fallthrough case spb.CtrlMsg_SubClose: req := &pb.UnsubscribeRequest{} req.Unmarshal(cm.Data) s.performSubUnsubOrClose(cm.MsgType, processRequest, m, req) case spb.CtrlMsg_ConnClose: clientID := string(cm.Data) s.performConnClose(useLocking, m, clientID) default: return false // Valid ctrl message, but unexpected type, return failure. } return true } func (s *StanServer) sendPublishErr(subj, guid string, err error) { badMsgAck := &pb.PubAck{Guid: guid, Error: err.Error()} if b, err := badMsgAck.Marshal(); err == nil { s.ncs.Publish(subj, b) } } // FIXME(dlc) - place holder to pick sub that has least outstanding, should just sort, // or use insertion sort, etc. func findBestQueueSub(sl []*subState) (rsub *subState) { for _, sub := range sl { if rsub == nil { rsub = sub continue } rsub.RLock() rOut := len(rsub.acksPending) rStalled := rsub.stalled rsub.RUnlock() sub.RLock() sOut := len(sub.acksPending) sStalled := sub.stalled sub.RUnlock() // Favor non stalled subscribers if (!sStalled || rStalled) && (sOut < rOut) { rsub = sub } } len := len(sl) if len > 1 && rsub == sl[0] { copy(sl, sl[1:len]) sl[len-1] = rsub } return } // Send a message to the queue group // Assumes qs lock held for write func (s *StanServer) sendMsgToQueueGroup(qs *queueState, m *pb.MsgProto, force bool) (*subState, bool, bool) { if qs == nil { return nil, false, false } sub := findBestQueueSub(qs.subs) if sub == nil { return nil, false, false } sub.Lock() didSend, sendMore := s.sendMsgToSub(sub, m, force) lastSent := sub.LastSent sub.Unlock() if didSend && lastSent > qs.lastSent { qs.lastSent = lastSent } if !sendMore { qs.stalled = true } return sub, didSend, sendMore } // processMsg will proces a message, and possibly send to clients, etc. func (s *StanServer) processMsg(cs *stores.ChannelStore) { ss := cs.UserData.(*subStore) // Since we iterate through them all. ss.RLock() // Walk the plain subscribers and deliver to each one for _, sub := range ss.psubs { s.sendAvailableMessages(cs, sub) } // Check the queue subscribers for _, qs := range ss.qsubs { s.sendAvailableMessagesToQueue(cs, qs) } ss.RUnlock() } // Used for sorting by sequence type bySeq []uint64 func (a bySeq) Len() int { return (len(a)) } func (a bySeq) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a bySeq) Less(i, j int) bool { return a[i] < a[j] } func makeSortedSequences(sequences map[uint64]struct{}) []uint64 { results := make([]uint64, 0, len(sequences)) for seq := range sequences { results = append(results, seq) } sort.Sort(bySeq(results)) return results } // Redeliver all outstanding messages to a durable subscriber, used on resubscribe. func (s *StanServer) performDurableRedelivery(cs *stores.ChannelStore, sub *subState) { // Sort our messages outstanding from acksPending, grab some state and unlock. sub.RLock() sortedSeqs := makeSortedSequences(sub.acksPending) clientID := sub.ClientID sub.RUnlock() if s.debug { sub.RLock() durName := sub.DurableName if durName == "" { durName = sub.QGroup } sub.RUnlock() Debugf("STAN: [Client:%s] Redelivering to durable %s", clientID, durName) } // If we don't find the client, we are done. client := s.clients.Lookup(clientID) if client == nil { return } // Go through all messages for _, seq := range sortedSeqs { m := s.getMsgForRedelivery(cs, sub, seq) if m == nil { continue } if s.trace { Tracef("STAN: [Client:%s] Redelivery, sending seqno=%d", clientID, m.Sequence) } // Flag as redelivered. m.Redelivered = true sub.Lock() // Force delivery s.sendMsgToSub(sub, m, forceDelivery) sub.Unlock() } } // Redeliver all outstanding messages that have expired. func (s *StanServer) performAckExpirationRedelivery(sub *subState) { // Sort our messages outstanding from acksPending, grab some state and unlock. sub.RLock() expTime := int64(sub.ackWait) cs := s.store.LookupChannel(sub.subject) sortedSequences := makeSortedSequences(sub.acksPending) subject := sub.subject qs := sub.qstate clientID := sub.ClientID floorTimestamp := sub.ackTimeFloor inbox := sub.Inbox sub.RUnlock() // If we don't find the client, we are done. client := s.clients.Lookup(clientID) if client == nil { return } // If the client has some failed heartbeats, ignore this request. client.RLock() fhbs := client.fhb client.RUnlock() if fhbs != 0 { // Reset the timer. sub.Lock() if sub.ackTimer != nil { sub.ackTimer.Reset(sub.ackWait) } sub.Unlock() if s.debug { Debugf("STAN: [Client:%s] Skipping redelivering on ack expiration due to client missed hearbeat, subject=%s, inbox=%s", clientID, subject, inbox) } return } if s.debug { Debugf("STAN: [Client:%s] Redelivering on ack expiration, subject=%s, inbox=%s", clientID, subject, inbox) } now := time.Now().UnixNano() var pick *subState sent := false // The messages from sortedSequences are possibly going to be acknowledged // by the end of this function, but we are going to set the timer based on // the oldest on that list, which is the sooner the timer should fire anyway. // The timer will correctly be adjusted. firstUnacked := int64(0) // We will move through acksPending(sorted) and see what needs redelivery. for _, seq := range sortedSequences { m := s.getMsgForRedelivery(cs, sub, seq) if m == nil { continue } if firstUnacked == 0 { firstUnacked = m.Timestamp } // Ignore messages with a timestamp below our floor if floorTimestamp > 0 && floorTimestamp > m.Timestamp { continue } if m.Timestamp+expTime > now { // the messages are ordered by seq so the expiration // times are ascending. Once we've get here, we've hit an // unexpired message, and we're done. Reset the sub's ack // timer to fire on the next message expiration. if s.trace { Tracef("STAN: [Client:%s] redelivery, skipping seqno=%d.", clientID, m.Sequence) } sub.adjustAckTimer(m.Timestamp) return } // Flag as redelivered. m.Redelivered = true if s.trace { Tracef("STAN: [Client:%s] Redelivery, sending seqno=%d", clientID, m.Sequence) } // Handle QueueSubscribers differently, since we will choose best subscriber // to redeliver to, not necessarily the same one. if qs != nil { qs.Lock() pick, sent, _ = s.sendMsgToQueueGroup(qs, m, forceDelivery) qs.Unlock() if pick == nil { Errorf("STAN: [Client:%s] Unable to find queue subscriber", clientID) break } // If the message is redelivered to a different queue subscriber, // we need to process an implicit ack for the original subscriber. // We do this only after confirmation that it was successfully added // as pending on the other queue subscriber. if pick != sub && sent { s.processAck(cs, sub, m.Sequence) } } else { sub.Lock() s.sendMsgToSub(sub, m, forceDelivery) sub.Unlock() } } // Adjust the timer sub.adjustAckTimer(firstUnacked) } // getMsgForRedelivery looks up the message from storage. If not found - // because it has been removed due to limit - processes an ACK for this // sub/sequence number and returns nil, otherwise return a copy of the // message (since it is going to be modified: m.Redelivered = true) func (s *StanServer) getMsgForRedelivery(cs *stores.ChannelStore, sub *subState, seq uint64) *pb.MsgProto { m := cs.Msgs.Lookup(seq) if m == nil { // Ack it so that it does not reincarnate on restart s.processAck(cs, sub, seq) return nil } // The store implementation does not return a copy, we need one mcopy := *m return &mcopy } // Sends the message to the subscriber // Unless `force` is true, in which case message is always sent, if the number // of acksPending is greater or equal to the sub's MaxInFlight limit, messages // are not sent and subscriber is marked as stalled. // Sub lock should be held before calling. func (s *StanServer) sendMsgToSub(sub *subState, m *pb.MsgProto, force bool) (bool, bool) { if sub == nil || m == nil || (sub.newOnHold && !m.Redelivered) { return false, false } if s.trace { Tracef("STAN: [Client:%s] Sending msg subject=%s inbox=%s seqno=%d.", sub.ClientID, m.Subject, sub.Inbox, m.Sequence) } // Don't send if we have too many outstanding already, unless forced to send. ap := int32(len(sub.acksPending)) if !force && (ap >= sub.MaxInFlight) { sub.stalled = true if s.debug { Debugf("STAN: [Client:%s] Stalled msgseq %s:%d to %s.", sub.ClientID, m.Subject, m.Sequence, sub.Inbox) } return false, false } b, _ := m.Marshal() if err := s.ncs.Publish(sub.Inbox, b); err != nil { Errorf("STAN: [Client:%s] Failed Sending msgseq %s:%d to %s (%s).", sub.ClientID, m.Subject, m.Sequence, sub.Inbox, err) return false, false } // Setup the ackTimer as needed now. I don't want to use defer in this // function, and want to make sure that if we exit before the end, the // timer is set. It will be adjusted/stopped as needed. if sub.ackTimer == nil { s.setupAckTimer(sub, sub.ackWait) } // If this message is already pending, nothing else to do. if _, present := sub.acksPending[m.Sequence]; present { return true, true } // Store in storage if err := sub.store.AddSeqPending(sub.ID, m.Sequence); err != nil { Errorf("STAN: [Client:%s] Unable to update subscription for %s:%v (%v)", sub.ClientID, m.Subject, m.Sequence, err) return false, false } // Update LastSent if applicable if m.Sequence > sub.LastSent { sub.LastSent = m.Sequence } // Store in ackPending. sub.acksPending[m.Sequence] = struct{}{} // Now that we have added to acksPending, check again if we // have reached the max and tell the caller that it should not // be sending more at this time. if !force && (ap+1 == sub.MaxInFlight) { sub.stalled = true if s.debug { Debugf("STAN: [Client:%s] Stalling after msgseq %s:%d to %s.", sub.ClientID, m.Subject, m.Sequence, sub.Inbox) } return true, false } return true, true } // Sets up the ackTimer to fire at the given duration. // sub's lock held on entry. func (s *StanServer) setupAckTimer(sub *subState, d time.Duration) { sub.ackTimer = time.AfterFunc(d, func() { s.performAckExpirationRedelivery(sub) }) } func (s *StanServer) startIOLoop() { s.ioChannelWG.Add(1) s.ioChannel = make(chan *ioPendingMsg, ioChannelSize) // Use wait group to ensure that the loop is as ready as // possible before we setup the subscriptions and open the door // to incoming NATS messages. ready := &sync.WaitGroup{} ready.Add(1) go s.ioLoop(ready) ready.Wait() } func (s *StanServer) ioLoop(ready *sync.WaitGroup) { defer s.ioChannelWG.Done() //////////////////////////////////////////////////////////////////////////// // This is where we will store the message and wait for others in the // potential cluster to do so as well, once we have a quorom someone can // ack the publisher. We simply do so here for now. //////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////// // Once we have ack'd the publisher, we need to assign this a sequence ID. // This will be done by a master election within the cluster, for now we // assume we are the master and assign the sequence ID here. //////////////////////////////////////////////////////////////////////////// storesToFlush := make(map[*stores.ChannelStore]struct{}, 64) var _pendingMsgs [ioChannelSize]*ioPendingMsg var pendingMsgs = _pendingMsgs[:0] storeIOPendingMsg := func(iopm *ioPendingMsg) { cs, err := s.assignAndStore(&iopm.pm) if err != nil { Errorf("STAN: [Client:%s] Error processing message for subject %q: %v", iopm.pm.ClientID, iopm.m.Subject, err) s.sendPublishErr(iopm.m.Reply, iopm.pm.Guid, err) } else { pendingMsgs = append(pendingMsgs, iopm) storesToFlush[cs] = struct{}{} } } batchSize := s.opts.IOBatchSize sleepTime := s.opts.IOSleepTime sleepDur := time.Duration(sleepTime) * time.Microsecond max := 0 ready.Done() for { select { case iopm := <-s.ioChannel: // store the one we just pulled storeIOPendingMsg(iopm) remaining := batchSize - 1 // fill the pending messages slice with at most our batch size, // unless the channel is empty. for remaining > 0 { ioChanLen := len(s.ioChannel) // if we are empty, wait, check again, and break if nothing. // While this adds some latency, it optimizes batching. if ioChanLen == 0 { if sleepTime > 0 { time.Sleep(sleepDur) ioChanLen = len(s.ioChannel) if ioChanLen == 0 { break } } else { break } } // stick to our buffer size if ioChanLen > remaining { ioChanLen = remaining } for i := 0; i < ioChanLen; i++ { storeIOPendingMsg(<-s.ioChannel) } // Keep track of max number of messages in a batch if ioChanLen > max { max = ioChanLen atomic.StoreInt64(&(s.ioChannelStatsMaxBatchSize), int64(max)) } remaining -= ioChanLen } // flush all the stores with messages written to them... for cs := range storesToFlush { if err := cs.Msgs.Flush(); err != nil { // TODO: Attempt recovery, notify publishers of error. panic(fmt.Errorf("Unable to flush msg store: %v", err)) } // Call this here, so messages are sent to subscribers, // which means that msg seq is added to subscription file s.processMsg(cs) if err := cs.Subs.Flush(); err != nil { panic(fmt.Errorf("Unable to flush sub store: %v", err)) } // Remove entry from map (this is safe in Go) delete(storesToFlush, cs) } // Ack our messages back to the publisher for i := range pendingMsgs { iopm := pendingMsgs[i] s.ackPublisher(iopm) pendingMsgs[i] = nil } // clear out pending messages pendingMsgs = pendingMsgs[:0] case <-s.ioChannelQuit: return } } } // assignAndStore will assign a sequence ID and then store the message. func (s *StanServer) assignAndStore(pm *pb.PubMsg) (*stores.ChannelStore, error) { cs, err := s.lookupOrCreateChannel(pm.Subject) if err != nil { return nil, err } if _, err := cs.Msgs.Store(pm.Data); err != nil { return nil, err } return cs, nil } // ackPublisher sends the ack for a message. func (s *StanServer) ackPublisher(iopm *ioPendingMsg) { msgAck := &iopm.pa msgAck.Guid = iopm.pm.Guid var buf [32]byte b := buf[:] n, _ := msgAck.MarshalTo(b) if s.trace { pm := &iopm.pm Tracef("STAN: [Client:%s] Acking Publisher subj=%s guid=%s", pm.ClientID, pm.Subject, pm.Guid) } s.ncs.Publish(iopm.m.Reply, b[:n]) } // Delete a sub from a given list. func (sub *subState) deleteFromList(sl []*subState) ([]*subState, bool) { for i := 0; i < len(sl); i++ { if sl[i] == sub { sl[i] = sl[len(sl)-1] sl[len(sl)-1] = nil sl = sl[:len(sl)-1] return shrinkSubListIfNeeded(sl), true } } return sl, false } // Checks if we need to do a resize. This is for very large growth then // subsequent return to a more normal size. func shrinkSubListIfNeeded(sl []*subState) []*subState { lsl := len(sl) csl := cap(sl) // Don't bother if list not too big if csl <= 8 { return sl } pFree := float32(csl-lsl) / float32(csl) if pFree > 0.50 { return append([]*subState(nil), sl...) } return sl } // removeAllNonDurableSubscribers will remove all non-durable subscribers for the client. func (s *StanServer) removeAllNonDurableSubscribers(client *client) { // client has been unregistered and no other routine can add/remove // subscriptions, so it is safe to use the original. client.RLock() subs := client.subs client.RUnlock() for _, sub := range subs { sub.RLock() subject := sub.subject sub.RUnlock() // Get the ChannelStore cs := s.store.LookupChannel(subject) if cs == nil { continue } // Get the subStore from the ChannelStore ss := cs.UserData.(*subStore) // Don't remove durables ss.Remove(cs, sub, false) } } // processUnsubscribeRequest will process a unsubscribe request. func (s *StanServer) processUnsubscribeRequest(m *nats.Msg) { req := &pb.UnsubscribeRequest{} err := req.Unmarshal(m.Data) if err != nil { Errorf("STAN: Invalid unsub request from %s.", m.Subject) s.sendSubscriptionResponseErr(m.Reply, ErrInvalidUnsubReq) return } s.performSubUnsubOrClose(spb.CtrlMsg_SubUnsubscribe, scheduleRequest, m, req) } // processSubCloseRequest will process a subscription close request. func (s *StanServer) processSubCloseRequest(m *nats.Msg) { req := &pb.UnsubscribeRequest{} err := req.Unmarshal(m.Data) if err != nil { Errorf("STAN: Invalid sub close request from %s.", m.Subject) s.sendSubscriptionResponseErr(m.Reply, ErrInvalidUnsubReq) return } s.performSubUnsubOrClose(spb.CtrlMsg_SubClose, scheduleRequest, m, req) } // performSubUnsubOrClose either schedules the request to the // subscriber's AckInbox subscriber, or processes the request in place. func (s *StanServer) performSubUnsubOrClose(reqType spb.CtrlMsg_Type, schedule bool, m *nats.Msg, req *pb.UnsubscribeRequest) { action := "unsub" isSubClose := false if reqType == spb.CtrlMsg_SubClose { action = "sub close" isSubClose = true } cs := s.store.LookupChannel(req.Subject) if cs == nil { Errorf("STAN: [Client:%s] %s request missing subject %s.", req.ClientID, action, req.Subject) s.sendSubscriptionResponseErr(m.Reply, ErrInvalidSub) return } // Get the subStore ss := cs.UserData.(*subStore) sub := ss.LookupByAckInbox(req.Inbox) if sub == nil { Errorf("STAN: [Client:%s] %s request for missing inbox %s.", req.ClientID, action, req.Inbox) s.sendSubscriptionResponseErr(m.Reply, ErrInvalidSub) return } // Lock for the remainder of the function s.closeProtosMu.Lock() defer s.closeProtosMu.Unlock() if schedule { processInPlace := true sub.Lock() if sub.ackSub != nil { ctrlMsg := &spb.CtrlMsg{ MsgType: reqType, ServerID: s.srvCtrlMsgID, Data: m.Data, } ctrlBytes, _ := ctrlMsg.Marshal() ctrlMsgNatsMsg := &nats.Msg{ Subject: sub.AckInbox, Reply: m.Reply, Data: ctrlBytes, } if s.ncs.PublishMsg(ctrlMsgNatsMsg) == nil { // This function will be called from processAckMsg with // internal == true. processInPlace = false } } sub.Unlock() if !processInPlace { return } } // Remove from Client if !s.clients.RemoveSub(req.ClientID, sub) { Errorf("STAN: [Client:%s] %s request for missing client", req.ClientID, action) s.sendSubscriptionResponseErr(m.Reply, ErrUnknownClient) return } // Remove the subscription unsubscribe := !isSubClose ss.Remove(cs, sub, unsubscribe) if s.debug { if isSubClose { Debugf("STAN: [Client:%s] Unsubscribing subject=%s.", req.ClientID, req.Subject) } else { Debugf("STAN: [Client:%s] Closing subscription subject=%s.", req.ClientID, req.Subject) } } // Create a non-error response resp := &pb.SubscriptionResponse{AckInbox: req.Inbox} b, _ := resp.Marshal() s.ncs.Publish(m.Reply, b) } func (s *StanServer) sendSubscriptionResponseErr(reply string, err error) { resp := &pb.SubscriptionResponse{Error: err.Error()} b, _ := resp.Marshal() s.ncs.Publish(reply, b) } // Check for valid subjects func isValidSubject(subject string) bool { if subject == "" { return false } for i := 0; i < len(subject); i++ { c := subject[i] if c == '*' || c == '>' { return false } } return true } // Clear the ackTimer. // sub Lock held in entry. func (sub *subState) clearAckTimer() { if sub.ackTimer != nil { sub.ackTimer.Stop() sub.ackTimer = nil } } // adjustAckTimer adjusts the timer based on a given timestamp // The timer will be stopped if there is no more pending ack. // If there are pending acks, the timer will be reset to the // default sub.ackWait value if the given timestamp is // 0 or in the past. Otherwise, it is set to the remaining time // between the given timestamp and now. func (sub *subState) adjustAckTimer(firstUnackedTimestamp int64) { sub.Lock() defer sub.Unlock() // Possible that the subscriber has been destroyed, and timer cleared if sub.ackTimer == nil { return } // Reset the floor (it will be set if needed) sub.ackTimeFloor = 0 // Check if there are still pending acks if len(sub.acksPending) > 0 { // Capture time now := time.Now().UnixNano() // ackWait in int64 expTime := int64(sub.ackWait) // If the message timestamp + expiration is in the past // (which will happen when a message is redelivered more // than once), or if timestamp is 0, use the default ackWait if firstUnackedTimestamp+expTime <= now { sub.ackTimer.Reset(sub.ackWait) } else { // Compute the time the ackTimer should fire, which is the // ack timeout less the duration the message has been in // the server. fireIn := (firstUnackedTimestamp + expTime - now) sub.ackTimer.Reset(time.Duration(fireIn)) // Skip redelivery of messages before this one. sub.ackTimeFloor = firstUnackedTimestamp } } else { // No more pending acks, clear the timer. sub.clearAckTimer() } } // Used to generate durable key. This should not be called on non-durables. func (sub *subState) durableKey() string { if sub.DurableName == "" { return "" } return fmt.Sprintf("%s-%s-%s", sub.ClientID, sub.subject, sub.DurableName) } // Returns true if this sub is a queue subscriber (durable or not) func (sub *subState) isQueueSubscriber() bool { return sub.QGroup != "" } // Returns true if this is a "shadow" durable queue subscriber func (sub *subState) isShadowQueueDurable() bool { return sub.IsDurable && sub.QGroup != "" && sub.ClientID == "" } // Returns true if this sub is a durable subscriber (not a durable queue sub) func (sub *subState) isDurableSubscriber() bool { return sub.DurableName != "" } // Returns true if this is an offline durable subscriber. func (sub *subState) isOfflineDurableSubscriber() bool { return sub.DurableName != "" && sub.ClientID == "" } // Used to generate durable key. This should not be called on non-durables. func durableKey(sr *pb.SubscriptionRequest) string { if sr.DurableName == "" { return "" } return fmt.Sprintf("%s-%s-%s", sr.ClientID, sr.Subject, sr.DurableName) } // addSubscription adds `sub` to the client and store. func (s *StanServer) addSubscription(ss *subStore, sub *subState) error { // Store in client if !s.clients.AddSub(sub.ClientID, sub) { return fmt.Errorf("can't find clientID: %v", sub.ClientID) } // Store this subscription in subStore if err := ss.Store(sub); err != nil { return err } return nil } // updateDurable adds back `sub` to the client and updates the store. // No lock is needed for `sub` since it has just been created. func (s *StanServer) updateDurable(ss *subStore, sub *subState) error { // Store in the client if !s.clients.AddSub(sub.ClientID, sub) { return fmt.Errorf("can't find clientID: %v", sub.ClientID) } // Update this subscription in the store if err := sub.store.UpdateSub(&sub.SubState); err != nil { return err } ss.Lock() // Do this only for durable subscribers (not durable queue subscribers). if sub.isDurableSubscriber() { // Add back into plain subscribers ss.psubs = append(ss.psubs, sub) } // And in ackInbox lookup map. ss.acks[sub.AckInbox] = sub ss.Unlock() return nil } // processSubscriptionRequest will process a subscription request. func (s *StanServer) processSubscriptionRequest(m *nats.Msg) { sr := &pb.SubscriptionRequest{} err := sr.Unmarshal(m.Data) if err != nil { Errorf("STAN: Invalid Subscription request from %s.", m.Subject) s.sendSubscriptionResponseErr(m.Reply, ErrInvalidSubReq) return } // FIXME(dlc) check for multiple errors, mis-configurations, etc. // AckWait must be >= 1s if sr.AckWaitInSecs <= 0 { Debugf("STAN: [Client:%s] Invalid AckWait in subscription request from %s.", sr.ClientID, m.Subject) s.sendSubscriptionResponseErr(m.Reply, ErrInvalidAckWait) return } // Make sure subject is valid if !isValidSubject(sr.Subject) { Debugf("STAN: [Client:%s] Invalid subject <%s> in subscription request from %s.", sr.ClientID, sr.Subject, m.Subject) s.sendSubscriptionResponseErr(m.Reply, ErrInvalidSubject) return } // ClientID must not be empty. if sr.ClientID == "" { Debugf("STAN: missing clientID in subscription request from %s", m.Subject) s.sendSubscriptionResponseErr(m.Reply, errors.New("stan: malformed subscription request, clientID missing")) return } // Grab channel state, create a new one if needed. cs, err := s.lookupOrCreateChannel(sr.Subject) if err != nil { Errorf("STAN: Unable to create store for subject %s.", sr.Subject) s.sendSubscriptionResponseErr(m.Reply, err) return } // Get the subStore ss := cs.UserData.(*subStore) var sub *subState ackInbox := nats.NewInbox() // Will be true for durable queue subscribers and durable subscribers alike. isDurable := false // Will be set to false for en existing durable subscriber or existing // queue group (durable or not). setStartPos := true // Check for durable queue subscribers if sr.QGroup != "" { if sr.DurableName != "" { // For queue subscribers, we prevent DurableName to contain // the ':' character, since we use it for the compound name. if strings.Contains(sr.DurableName, ":") { Debugf("STAN: [Client:%s] %s", sr.ClientID, ErrInvalidDurName) s.sendSubscriptionResponseErr(m.Reply, ErrInvalidDurName) return } isDurable = true // Make the queue group a compound name between durable name and q group. sr.QGroup = fmt.Sprintf("%s:%s", sr.DurableName, sr.QGroup) // Clear DurableName from this subscriber. sr.DurableName = "" } // Lookup for an existing group. Only interested in situation where // the group exist, but is empty and had a shadow subscriber. ss.RLock() qs := ss.qsubs[sr.QGroup] if qs != nil { qs.Lock() if qs.shadow != nil { sub = qs.shadow qs.shadow = nil qs.subs = append(qs.subs, sub) } qs.Unlock() setStartPos = false } ss.RUnlock() } else if sr.DurableName != "" { // Check for DurableSubscriber status if sub = ss.LookupByDurable(durableKey(sr)); sub != nil { sub.RLock() clientID := sub.ClientID sub.RUnlock() if clientID != "" { Debugf("STAN: [Client:%s] Invalid client id in subscription request from %s.", sr.ClientID, m.Subject) s.sendSubscriptionResponseErr(m.Reply, ErrDupDurable) return } setStartPos = false } isDurable = true } if sub != nil { // ok we have a remembered subscription sub.Lock() // Set ClientID and new AckInbox but leave LastSent to the // remembered value. sub.AckInbox = ackInbox sub.ClientID = sr.ClientID sub.Inbox = sr.Inbox sub.IsDurable = true // Use some of the new options, but ignore the ones regarding start position sub.MaxInFlight = sr.MaxInFlight sub.AckWaitInSecs = sr.AckWaitInSecs sub.ackWait = time.Duration(sr.AckWaitInSecs) * time.Second sub.stalled = false if len(sub.acksPending) > 0 { s.setupAckTimer(sub, sub.ackWait) } sub.Unlock() // Case of restarted durable subscriber, or first durable queue // subscriber re-joining a group that was left with pending messages. err = s.updateDurable(ss, sub) } else { // Create sub here (can be plain, durable or queue subscriber) sub = &subState{ SubState: spb.SubState{ ClientID: sr.ClientID, QGroup: sr.QGroup, Inbox: sr.Inbox, AckInbox: ackInbox, MaxInFlight: sr.MaxInFlight, AckWaitInSecs: sr.AckWaitInSecs, DurableName: sr.DurableName, IsDurable: isDurable, }, subject: sr.Subject, ackWait: time.Duration(sr.AckWaitInSecs) * time.Second, acksPending: make(map[uint64]struct{}), store: cs.Subs, } if setStartPos { // set the start sequence of the subscriber. s.setSubStartSequence(cs, sub, sr) } // add the subscription to stan err = s.addSubscription(ss, sub) } if err != nil { // Try to undo what has been done. s.closeProtosMu.Lock() ss.Remove(cs, sub, false) s.closeProtosMu.Unlock() Errorf("STAN: Unable to add subscription for %s: %v", sr.Subject, err) s.sendSubscriptionResponseErr(m.Reply, err) return } Debugf("STAN: [Client:%s] Added subscription on subject=%s, inbox=%s", sr.ClientID, sr.Subject, sr.Inbox) // In case this is a durable, sub already exists so we need to protect access sub.Lock() // Subscribe to acks. // We MUST use the same connection than all other chan subscribers // if we want to receive messages in order from NATS server. sub.ackSub, err = s.nc.Subscribe(ackInbox, s.processAckMsg) if err != nil { sub.Unlock() panic(fmt.Sprintf("Could not subscribe to ack subject, %v\n", err)) } sub.ackSub.SetPendingLimits(-1, -1) sub.Unlock() // However, we need to flush to ensure that NATS server processes // this subscription request before we return OK and start sending // messages to the client. s.nc.Flush() // Create a non-error response resp := &pb.SubscriptionResponse{AckInbox: ackInbox} b, _ := resp.Marshal() s.ncs.Publish(m.Reply, b) // If we are a durable (queue or not) and have state if isDurable { // Redeliver any oustanding. s.performDurableRedelivery(cs, sub) } // publish messages to this subscriber sub.RLock() qs := sub.qstate sub.RUnlock() if qs != nil { s.sendAvailableMessagesToQueue(cs, qs) } else { s.sendAvailableMessages(cs, sub) } } // processAckMsg processes inbound acks from clients for delivered messages. func (s *StanServer) processAckMsg(m *nats.Msg) { ack := &pb.Ack{} if ack.Unmarshal(m.Data) != nil { // Expecting the full range of "close" requests: subUnsub, subClose, or connClose if s.processInternalCloseRequest(m, false) { return } } cs := s.store.LookupChannel(ack.Subject) if cs == nil { Errorf("STAN: [Client:?] Ack received, invalid channel (%s)", ack.Subject) return } s.processAck(cs, cs.UserData.(*subStore).LookupByAckInbox(m.Subject), ack.Sequence) } // processAck processes an ack and if needed sends more messages. func (s *StanServer) processAck(cs *stores.ChannelStore, sub *subState, sequence uint64) { if sub == nil { return } sub.Lock() if s.trace { Tracef("STAN: [Client:%s] removing pending ack, subj=%s, seq=%d", sub.ClientID, sub.subject, sequence) } if err := sub.store.AckSeqPending(sub.ID, sequence); err != nil { Errorf("STAN: [Client:%s] Unable to persist ack for %s:%v (%v)", sub.ClientID, sub.subject, sequence, err) sub.Unlock() return } delete(sub.acksPending, sequence) stalled := sub.stalled if int32(len(sub.acksPending)) < sub.MaxInFlight { sub.stalled = false } // Leave the reset/cancel of the ackTimer to the redelivery cb. qs := sub.qstate sub.Unlock() if qs != nil { qs.Lock() stalled = qs.stalled qs.stalled = false qs.Unlock() } if !stalled { return } if qs != nil { s.sendAvailableMessagesToQueue(cs, qs) } else { s.sendAvailableMessages(cs, sub) } } // Send any messages that are ready to be sent that have been queued to the group. func (s *StanServer) sendAvailableMessagesToQueue(cs *stores.ChannelStore, qs *queueState) { if cs == nil || qs == nil { return } qs.Lock() for nextSeq := qs.lastSent + 1; ; nextSeq++ { nextMsg := getNextMsg(cs, &nextSeq, &qs.lastSent) if nextMsg == nil { break } if _, sent, sendMore := s.sendMsgToQueueGroup(qs, nextMsg, honorMaxInFlight); !sent || !sendMore { break } } qs.Unlock() } // Send any messages that are ready to be sent that have been queued. func (s *StanServer) sendAvailableMessages(cs *stores.ChannelStore, sub *subState) { sub.Lock() for nextSeq := sub.LastSent + 1; ; nextSeq++ { nextMsg := getNextMsg(cs, &nextSeq, &sub.LastSent) if nextMsg == nil { break } if sent, sendMore := s.sendMsgToSub(sub, nextMsg, honorMaxInFlight); !sent || !sendMore { break } } sub.Unlock() } func getNextMsg(cs *stores.ChannelStore, nextSeq, lastSent *uint64) *pb.MsgProto { for { nextMsg := cs.Msgs.Lookup(*nextSeq) if nextMsg != nil { return nextMsg } // Reason why we don't call FirstMsg here is that // FirstMsg could be costly (read from disk, etc) // to realize that the message is of lower sequence. // So check with cheaper FirstSequence() first. firstAvail := cs.Msgs.FirstSequence() if firstAvail <= *nextSeq { return nil } // TODO: We may send dataloss advisories to the client // through the use of a subscription created optionally // by the sub and given to the server through the SubscriptionRequest. // For queue group, server would pick one of the member to send // the advisory to. // For now, just skip the missing ones. *nextSeq = firstAvail *lastSent = firstAvail - 1 // Note that the next lookup could still fail because // the first avail message may have been dropped in the // meantime. } } func (s *StanServer) getSequenceFromStartTime(cs *stores.ChannelStore, startTime int64) uint64 { return cs.Msgs.GetSequenceFromTimestamp(startTime) } // Setup the start position for the subscriber. func (s *StanServer) setSubStartSequence(cs *stores.ChannelStore, sub *subState, sr *pb.SubscriptionRequest) { sub.Lock() lastSent := uint64(0) // In all start position cases, if there is no message, ensure // lastSent stays at 0. switch sr.StartPosition { case pb.StartPosition_NewOnly: lastSent = cs.Msgs.LastSequence() Debugf("STAN: [Client:%s] Sending new-only subject=%s, seq=%d.", sub.ClientID, sub.subject, lastSent) case pb.StartPosition_LastReceived: lastSeq := cs.Msgs.LastSequence() if lastSeq > 0 { lastSent = lastSeq - 1 } Debugf("STAN: [Client:%s] Sending last message, subject=%s.", sub.ClientID, sub.subject) case pb.StartPosition_TimeDeltaStart: startTime := time.Now().UnixNano() - sr.StartTimeDelta // If there is no message, seq will be 0. seq := s.getSequenceFromStartTime(cs, startTime) if seq > 0 { // If the time delta is in the future relative to the last // message in the log, 'seq' will be equal to last sequence + 1, // so this would translate to "new only" semantic. lastSent = seq - 1 } Debugf("STAN: [Client:%s] Sending from time, subject=%s time=%d seq=%d", sub.ClientID, sub.subject, startTime, lastSent) case pb.StartPosition_SequenceStart: // If there is no message, firstSeq and lastSeq will be equal to 0. firstSeq, lastSeq := cs.Msgs.FirstAndLastSequence() // StartSequence is an uint64, so can't be lower than 0. if sr.StartSequence < firstSeq { // That translates to sending the first message available. lastSent = firstSeq - 1 } else if sr.StartSequence > lastSeq { // That translates to "new only" lastSent = lastSeq } else if sr.StartSequence > 0 { // That translates to sending the message with StartSequence // sequence number. lastSent = sr.StartSequence - 1 } Debugf("STAN: [Client:%s] Sending from sequence, subject=%s seq_asked=%d actual_seq=%d", sub.ClientID, sub.subject, sr.StartSequence, lastSent) case pb.StartPosition_First: firstSeq := cs.Msgs.FirstSequence() if firstSeq > 0 { lastSent = firstSeq - 1 } Debugf("STAN: [Client:%s] Sending from beginning, subject=%s seq=%d", sub.ClientID, sub.subject, lastSent) } sub.LastSent = lastSent sub.Unlock() } // ClusterID returns the STAN Server's ID. func (s *StanServer) ClusterID() string { return s.info.ClusterID } // Shutdown will close our NATS connection and shutdown any embedded NATS server. func (s *StanServer) Shutdown() { Noticef("STAN: Shutting down.") s.Lock() if s.shutdown { s.Unlock() return } // Allows Shutdown() to be idempotent s.shutdown = true // We need to make sure that the storeIOLoop returns before // closing the Store waitForIOStoreLoop := true // Capture under lock store := s.store ns := s.natsServer // Do not close and nil the connections here, they are used in many places // without locking. Once closed, s.nc.xxx() calls will simply fail, but // we won't panic. ncs := s.ncs nc := s.nc if s.ioChannel != nil { // Notify the IO channel that we are shutting down s.ioChannelQuit <- struct{}{} } else { waitForIOStoreLoop = false } s.Unlock() // Make sure the StoreIOLoop returns before closing the Store if waitForIOStoreLoop { s.ioChannelWG.Wait() } // Close/Shutdown resources. Note that unless one instantiates StanServer // directly (instead of calling RunServer() and the like), these should // not be nil. if store != nil { store.Close() } if ncs != nil { ncs.Close() } if nc != nil { nc.Close() } if ns != nil { ns.Shutdown() } // Wait for go-routines to return s.wg.Wait() }