// Copyright 2016 Apcera Inc. All rights reserved. // Package stan is a Go client for the NATS Streaming messaging system (https://nats.io). package stan import ( "errors" "fmt" "runtime" "sync" "time" "github.com/nats-io/go-nats" "github.com/nats-io/go-nats-streaming/pb" "github.com/nats-io/nuid" ) // Version is the NATS Streaming Go Client version const Version = "0.3.4" const ( // DefaultNatsURL is the default URL the client connects to DefaultNatsURL = "nats://localhost:4222" // DefaultConnectWait is the default timeout used for the connect operation DefaultConnectWait = 2 * time.Second // DefaultDiscoverPrefix is the prefix subject used to connect to the NATS Streaming server DefaultDiscoverPrefix = "_STAN.discover" // DefaultACKPrefix is the prefix subject used to send ACKs to the NATS Streaming server DefaultACKPrefix = "_STAN.acks" // DefaultMaxPubAcksInflight is the default maximum number of published messages // without outstanding ACKs from the server DefaultMaxPubAcksInflight = 16384 ) // Conn represents a connection to the NATS Streaming subsystem. It can Publish and // Subscribe to messages within the NATS Streaming cluster. type Conn interface { // Publish Publish(subject string, data []byte) error PublishAsync(subject string, data []byte, ah AckHandler) (string, error) // Subscribe Subscribe(subject string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error) // QueueSubscribe QueueSubscribe(subject, qgroup string, cb MsgHandler, opts ...SubscriptionOption) (Subscription, error) // Close Close() error // NatsConn returns the underlying NATS conn. Use this with care. For // example, closing the wrapped NATS conn will put the NATS Streaming Conn // in an invalid state. NatsConn() *nats.Conn } // Errors var ( ErrConnectReqTimeout = errors.New("stan: connect request timeout") ErrCloseReqTimeout = errors.New("stan: close request timeout") ErrSubReqTimeout = errors.New("stan: subscribe request timeout") ErrUnsubReqTimeout = errors.New("stan: unsubscribe request timeout") ErrConnectionClosed = errors.New("stan: connection closed") ErrTimeout = errors.New("stan: publish ack timeout") ErrBadAck = errors.New("stan: malformed ack") ErrBadSubscription = errors.New("stan: invalid subscription") ErrBadConnection = errors.New("stan: invalid connection") ErrManualAck = errors.New("stan: cannot manually ack in auto-ack mode") ErrNilMsg = errors.New("stan: nil message") ErrNoServerSupport = errors.New("stan: not supported by server") ) // AckHandler is used for Async Publishing to provide status of the ack. // The func will be passed teh GUID and any error state. No error means the // message was successfully received by NATS Streaming. type AckHandler func(string, error) // Options can be used to a create a customized connection. type Options struct { NatsURL string NatsConn *nats.Conn ConnectTimeout time.Duration AckTimeout time.Duration DiscoverPrefix string MaxPubAcksInflight int } // DefaultOptions are the NATS Streaming client's default options var DefaultOptions = Options{ NatsURL: DefaultNatsURL, ConnectTimeout: DefaultConnectWait, AckTimeout: DefaultAckWait, DiscoverPrefix: DefaultDiscoverPrefix, MaxPubAcksInflight: DefaultMaxPubAcksInflight, } // Option is a function on the options for a connection. type Option func(*Options) error // NatsURL is an Option to set the URL the client should connect to. func NatsURL(u string) Option { return func(o *Options) error { o.NatsURL = u return nil } } // ConnectWait is an Option to set the timeout for establishing a connection. func ConnectWait(t time.Duration) Option { return func(o *Options) error { o.ConnectTimeout = t return nil } } // PubAckWait is an Option to set the timeout for waiting for an ACK for a // published message. func PubAckWait(t time.Duration) Option { return func(o *Options) error { o.AckTimeout = t return nil } } // MaxPubAcksInflight is an Option to set the maximum number of published // messages without outstanding ACKs from the server. func MaxPubAcksInflight(max int) Option { return func(o *Options) error { o.MaxPubAcksInflight = max return nil } } // NatsConn is an Option to set the underlying NATS connection to be used // by a NATS Streaming Conn object. func NatsConn(nc *nats.Conn) Option { return func(o *Options) error { o.NatsConn = nc return nil } } // A conn represents a bare connection to a stan cluster. type conn struct { sync.RWMutex clientID string serverID string pubPrefix string // Publish prefix set by stan, append our subject. subRequests string // Subject to send subscription requests. unsubRequests string // Subject to send unsubscribe requests. subCloseRequests string // Subject to send subscription close requests. closeRequests string // Subject to send close requests. ackSubject string // publish acks ackSubscription *nats.Subscription hbSubscription *nats.Subscription subMap map[string]*subscription pubAckMap map[string]*ack pubAckChan chan (struct{}) opts Options nc *nats.Conn ncOwned bool // NATS Streaming created the connection, so needs to close it. } // Closure for ack contexts. type ack struct { t *time.Timer ah AckHandler ch chan error } // Connect will form a connection to the NATS Streaming subsystem. func Connect(stanClusterID, clientID string, options ...Option) (Conn, error) { // Process Options c := conn{clientID: clientID, opts: DefaultOptions} for _, opt := range options { if err := opt(&c.opts); err != nil { return nil, err } } // Check if the user has provided a connection as an option c.nc = c.opts.NatsConn // Create a NATS connection if it doesn't exist. if c.nc == nil { nc, err := nats.Connect(c.opts.NatsURL) if err != nil { return nil, err } c.nc = nc c.ncOwned = true } else if !c.nc.IsConnected() { // Bail if the custom NATS connection is disconnected return nil, ErrBadConnection } // Create a heartbeat inbox hbInbox := nats.NewInbox() var err error if c.hbSubscription, err = c.nc.Subscribe(hbInbox, c.processHeartBeat); err != nil { c.Close() return nil, err } // Send Request to discover the cluster discoverSubject := c.opts.DiscoverPrefix + "." + stanClusterID req := &pb.ConnectRequest{ClientID: clientID, HeartbeatInbox: hbInbox} b, _ := req.Marshal() reply, err := c.nc.Request(discoverSubject, b, c.opts.ConnectTimeout) if err != nil { c.Close() if err == nats.ErrTimeout { return nil, ErrConnectReqTimeout } return nil, err } // Process the response, grab server pubPrefix cr := &pb.ConnectResponse{} err = cr.Unmarshal(reply.Data) if err != nil { c.Close() return nil, err } if cr.Error != "" { c.Close() return nil, errors.New(cr.Error) } // Capture cluster configuration endpoints to publish and subscribe/unsubscribe. c.pubPrefix = cr.PubPrefix c.subRequests = cr.SubRequests c.unsubRequests = cr.UnsubRequests c.subCloseRequests = cr.SubCloseRequests c.closeRequests = cr.CloseRequests // Setup the ACK subscription c.ackSubject = DefaultACKPrefix + "." + nuid.Next() if c.ackSubscription, err = c.nc.Subscribe(c.ackSubject, c.processAck); err != nil { c.Close() return nil, err } c.ackSubscription.SetPendingLimits(1024*1024, 32*1024*1024) c.pubAckMap = make(map[string]*ack) // Create Subscription map c.subMap = make(map[string]*subscription) c.pubAckChan = make(chan struct{}, c.opts.MaxPubAcksInflight) // Attach a finalizer runtime.SetFinalizer(&c, func(sc *conn) { sc.Close() }) return &c, nil } // Close a connection to the stan system. func (sc *conn) Close() error { if sc == nil { return ErrBadConnection } sc.Lock() defer sc.Unlock() if sc.nc == nil { // We are already closed. return nil } // Capture for NATS calls below. nc := sc.nc if sc.ncOwned { defer nc.Close() } // Signals we are closed. sc.nc = nil // Now close ourselves. if sc.ackSubscription != nil { sc.ackSubscription.Unsubscribe() } req := &pb.CloseRequest{ClientID: sc.clientID} b, _ := req.Marshal() reply, err := nc.Request(sc.closeRequests, b, sc.opts.ConnectTimeout) if err != nil { if err == nats.ErrTimeout { return ErrCloseReqTimeout } return err } cr := &pb.CloseResponse{} err = cr.Unmarshal(reply.Data) if err != nil { return err } if cr.Error != "" { return errors.New(cr.Error) } return nil } // NatsConn returns the underlying NATS conn. Use this with care. For example, // closing the wrapped NATS conn will put the NATS Streaming Conn in an invalid // state. func (sc *conn) NatsConn() *nats.Conn { return sc.nc } // Process a heartbeat from the NATS Streaming cluster func (sc *conn) processHeartBeat(m *nats.Msg) { // No payload assumed, just reply. sc.RLock() nc := sc.nc sc.RUnlock() if nc != nil { nc.Publish(m.Reply, nil) } } // Process an ack from the NATS Streaming cluster func (sc *conn) processAck(m *nats.Msg) { pa := &pb.PubAck{} err := pa.Unmarshal(m.Data) if err != nil { // FIXME, make closure to have context? fmt.Printf("Error processing unmarshal\n") return } // Remove a := sc.removeAck(pa.Guid) if a != nil { // Capture error if it exists. if pa.Error != "" { err = errors.New(pa.Error) } if a.ah != nil { // Perform the ackHandler callback a.ah(pa.Guid, err) } else if a.ch != nil { // Send to channel directly a.ch <- err } } } // Publish will publish to the cluster and wait for an ACK. func (sc *conn) Publish(subject string, data []byte) error { ch := make(chan error) _, err := sc.publishAsync(subject, data, nil, ch) if err == nil { err = <-ch } return err } // PublishAsync will publish to the cluster on pubPrefix+subject and asynchronously // process the ACK or error state. It will return the GUID for the message being sent. func (sc *conn) PublishAsync(subject string, data []byte, ah AckHandler) (string, error) { return sc.publishAsync(subject, data, ah, nil) } func (sc *conn) publishAsync(subject string, data []byte, ah AckHandler, ch chan error) (string, error) { a := &ack{ah: ah, ch: ch} sc.Lock() if sc.nc == nil { sc.Unlock() return "", ErrConnectionClosed } subj := sc.pubPrefix + "." + subject // This is only what we need from PubMsg in the timer below, // so do this so that pe doesn't escape (and we same on new object) peGUID := nuid.Next() pe := &pb.PubMsg{ClientID: sc.clientID, Guid: peGUID, Subject: subject, Data: data} b, _ := pe.Marshal() // Map ack to guid. sc.pubAckMap[peGUID] = a // snapshot ackSubject := sc.ackSubject ackTimeout := sc.opts.AckTimeout pac := sc.pubAckChan sc.Unlock() // Use the buffered channel to control the number of outstanding acks. pac <- struct{}{} err := sc.nc.PublishRequest(subj, ackSubject, b) if err != nil { sc.removeAck(peGUID) return "", err } // Setup the timer for expiration. sc.Lock() a.t = time.AfterFunc(ackTimeout, func() { sc.removeAck(peGUID) if a.ah != nil { ah(peGUID, ErrTimeout) } else if a.ch != nil { a.ch <- ErrTimeout } }) sc.Unlock() return peGUID, nil } // removeAck removes the ack from the pubAckMap and cancels any state, e.g. timers func (sc *conn) removeAck(guid string) *ack { var t *time.Timer sc.Lock() a := sc.pubAckMap[guid] if a != nil { t = a.t delete(sc.pubAckMap, guid) } pac := sc.pubAckChan sc.Unlock() // Cancel timer if needed. if t != nil { t.Stop() } // Remove from channel to unblock PublishAsync if a != nil && len(pac) > 0 { <-pac } return a } // Process an msg from the NATS Streaming cluster func (sc *conn) processMsg(raw *nats.Msg) { msg := &Msg{} err := msg.Unmarshal(raw.Data) if err != nil { panic("Error processing unmarshal for msg") } // Lookup the subscription sc.RLock() nc := sc.nc isClosed := nc == nil sub := sc.subMap[raw.Subject] sc.RUnlock() // Check if sub is no longer valid or connection has been closed. if sub == nil || isClosed { return } // Store in msg for backlink msg.Sub = sub sub.RLock() cb := sub.cb ackSubject := sub.ackInbox isManualAck := sub.opts.ManualAcks subsc := sub.sc // Can be nil if sub has been unsubscribed. sub.RUnlock() // Perform the callback if cb != nil && subsc != nil { cb(msg) } // Proces auto-ack if !isManualAck && nc != nil { ack := &pb.Ack{Subject: msg.Subject, Sequence: msg.Sequence} b, _ := ack.Marshal() if err := nc.Publish(ackSubject, b); err != nil { // FIXME(dlc) - Async error handler? Retry? } } }