containerd/containerd/vendor/github.com/coreos/go-systemd/dbus/subscription.go

251 lines
7.3 KiB
Go

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dbus
import (
"errors"
"time"
"github.com/godbus/dbus"
)
const (
cleanIgnoreInterval = int64(10 * time.Second)
ignoreInterval = int64(30 * time.Millisecond)
)
// Subscribe sets up this connection to subscribe to all systemd dbus events.
// This is required before calling SubscribeUnits. When the connection closes
// systemd will automatically stop sending signals so there is no need to
// explicitly call Unsubscribe().
func (c *Conn) Subscribe() error {
c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
"type='signal',interface='org.freedesktop.systemd1.Manager',member='UnitNew'")
c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,
"type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'")
err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store()
if err != nil {
return err
}
return nil
}
// Unsubscribe this connection from systemd dbus events.
func (c *Conn) Unsubscribe() error {
err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store()
if err != nil {
return err
}
return nil
}
func (c *Conn) dispatch() {
ch := make(chan *dbus.Signal, signalBuffer)
c.sigconn.Signal(ch)
go func() {
for {
signal, ok := <-ch
if !ok {
return
}
if signal.Name == "org.freedesktop.systemd1.Manager.JobRemoved" {
c.jobComplete(signal)
}
if c.subscriber.updateCh == nil {
continue
}
var unitPath dbus.ObjectPath
switch signal.Name {
case "org.freedesktop.systemd1.Manager.JobRemoved":
unitName := signal.Body[2].(string)
c.sysobj.Call("org.freedesktop.systemd1.Manager.GetUnit", 0, unitName).Store(&unitPath)
case "org.freedesktop.systemd1.Manager.UnitNew":
unitPath = signal.Body[1].(dbus.ObjectPath)
case "org.freedesktop.DBus.Properties.PropertiesChanged":
if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" {
unitPath = signal.Path
}
}
if unitPath == dbus.ObjectPath("") {
continue
}
c.sendSubStateUpdate(unitPath)
}
}()
}
// Returns two unbuffered channels which will receive all changed units every
// interval. Deleted units are sent as nil.
func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
}
// SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer
// size of the channels, the comparison function for detecting changes and a filter
// function for cutting down on the noise that your channel receives.
func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func(string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
old := make(map[string]*UnitStatus)
statusChan := make(chan map[string]*UnitStatus, buffer)
errChan := make(chan error, buffer)
go func() {
for {
timerChan := time.After(interval)
units, err := c.ListUnits()
if err == nil {
cur := make(map[string]*UnitStatus)
for i := range units {
if filterUnit != nil && filterUnit(units[i].Name) {
continue
}
cur[units[i].Name] = &units[i]
}
// add all new or changed units
changed := make(map[string]*UnitStatus)
for n, u := range cur {
if oldU, ok := old[n]; !ok || isChanged(oldU, u) {
changed[n] = u
}
delete(old, n)
}
// add all deleted units
for oldN := range old {
changed[oldN] = nil
}
old = cur
if len(changed) != 0 {
statusChan <- changed
}
} else {
errChan <- err
}
<-timerChan
}
}()
return statusChan, errChan
}
type SubStateUpdate struct {
UnitName string
SubState string
}
// SetSubStateSubscriber writes to updateCh when any unit's substate changes.
// Although this writes to updateCh on every state change, the reported state
// may be more recent than the change that generated it (due to an unavoidable
// race in the systemd dbus interface). That is, this method provides a good
// way to keep a current view of all units' states, but is not guaranteed to
// show every state transition they go through. Furthermore, state changes
// will only be written to the channel with non-blocking writes. If updateCh
// is full, it attempts to write an error to errCh; if errCh is full, the error
// passes silently.
func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) {
c.subscriber.Lock()
defer c.subscriber.Unlock()
c.subscriber.updateCh = updateCh
c.subscriber.errCh = errCh
}
func (c *Conn) sendSubStateUpdate(path dbus.ObjectPath) {
c.subscriber.Lock()
defer c.subscriber.Unlock()
if c.shouldIgnore(path) {
return
}
info, err := c.GetUnitProperties(string(path))
if err != nil {
select {
case c.subscriber.errCh <- err:
default:
}
}
name := info["Id"].(string)
substate := info["SubState"].(string)
update := &SubStateUpdate{name, substate}
select {
case c.subscriber.updateCh <- update:
default:
select {
case c.subscriber.errCh <- errors.New("update channel full!"):
default:
}
}
c.updateIgnore(path, info)
}
// The ignore functions work around a wart in the systemd dbus interface.
// Requesting the properties of an unloaded unit will cause systemd to send a
// pair of UnitNew/UnitRemoved signals. Because we need to get a unit's
// properties on UnitNew (as that's the only indication of a new unit coming up
// for the first time), we would enter an infinite loop if we did not attempt
// to detect and ignore these spurious signals. The signal themselves are
// indistinguishable from relevant ones, so we (somewhat hackishly) ignore an
// unloaded unit's signals for a short time after requesting its properties.
// This means that we will miss e.g. a transient unit being restarted
// *immediately* upon failure and also a transient unit being started
// immediately after requesting its status (with systemctl status, for example,
// because this causes a UnitNew signal to be sent which then causes us to fetch
// the properties).
func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool {
t, ok := c.subscriber.ignore[path]
return ok && t >= time.Now().UnixNano()
}
func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) {
c.cleanIgnore()
// unit is unloaded - it will trigger bad systemd dbus behavior
if info["LoadState"].(string) == "not-found" {
c.subscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
}
}
// without this, ignore would grow unboundedly over time
func (c *Conn) cleanIgnore() {
now := time.Now().UnixNano()
if c.subscriber.cleanIgnore < now {
c.subscriber.cleanIgnore = now + cleanIgnoreInterval
for p, t := range c.subscriber.ignore {
if t < now {
delete(c.subscriber.ignore, p)
}
}
}
}