From abb3fb8a3194c3c0f1c22517bfc7b5c65da0f8ff Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 13 Dec 2018 13:42:28 +0100 Subject: [PATCH] lib/events: Become a service (fixes #5372) (#5373) Here the event Logger is rewritten as a service with a main loop instead of mutexes. This loop has a select with essentially two legs: incoming events, and subscription changes. When both are possible select will chose one randomly, thus ensuring that in practice unsubscribes will happen timely and not block the system. --- lib/events/events.go | 146 +++++++++++++++++++++++++------------- lib/events/events_test.go | 117 ++++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+), 51 deletions(-) diff --git a/lib/events/events.go b/lib/events/events.go index 63d9b59cf..b954b6b50 100644 --- a/lib/events/events.go +++ b/lib/events/events.go @@ -204,7 +204,9 @@ type Logger struct { nextSubscriptionIDs []int nextGlobalID int timeout *time.Timer - mutex sync.Mutex + events chan Event + funcs chan func() + stop chan struct{} } type Event struct { @@ -225,6 +227,13 @@ type Subscription struct { var Default = NewLogger() +func init() { + // The default logger never stops. To ensure this we nil out the stop + // channel so any attempt to stop it will panic. + Default.stop = nil + go Default.Serve() +} + var ( ErrTimeout = errors.New("timeout") ErrClosed = errors.New("closed") @@ -232,8 +241,10 @@ var ( func NewLogger() *Logger { l := &Logger{ - mutex: sync.NewMutex(), timeout: time.NewTimer(time.Second), + events: make(chan Event, BufferSize), + funcs: make(chan func()), + stop: make(chan struct{}), } // Make sure the timer is in the stopped state and hasn't fired anything // into the channel. @@ -243,20 +254,52 @@ func NewLogger() *Logger { return l } -func (l *Logger) Log(t EventType, data interface{}) { - l.mutex.Lock() - l.nextGlobalID++ - dl.Debugln("log", l.nextGlobalID, t, data) +func (l *Logger) Serve() { +loop: + for { + select { + case e := <-l.events: + // Incoming events get sent + l.sendEvent(e) - e := Event{ - GlobalID: l.nextGlobalID, - Time: time.Now(), - Type: t, - Data: data, + case fn := <-l.funcs: + // Subscriptions etc are handled here. + fn() + + case <-l.stop: + break loop + } } + // Closing the event channels corresponds to what happens when a + // subscription is unsubscribed; this stops any BufferedSubscription, + // makes Poll() return ErrClosed, etc. + for _, s := range l.subs { + close(s.events) + } +} + +func (l *Logger) Stop() { + close(l.stop) +} + +func (l *Logger) Log(t EventType, data interface{}) { + l.events <- Event{ + Time: time.Now(), + Type: t, + Data: data, + // SubscriptionID and GlobalID are set in sendEvent + } +} + +func (l *Logger) sendEvent(e Event) { + l.nextGlobalID++ + dl.Debugln("log", l.nextGlobalID, e.Type, e.Data) + + e.GlobalID = l.nextGlobalID + for i, s := range l.subs { - if s.mask&t != 0 { + if s.mask&e.Type != 0 { e.SubscriptionID = l.nextSubscriptionIDs[i] l.nextSubscriptionIDs[i]++ @@ -278,59 +321,60 @@ func (l *Logger) Log(t EventType, data interface{}) { } } } - l.mutex.Unlock() } func (l *Logger) Subscribe(mask EventType) *Subscription { - l.mutex.Lock() - dl.Debugln("subscribe", mask) + res := make(chan *Subscription) + l.funcs <- func() { + dl.Debugln("subscribe", mask) - s := &Subscription{ - mask: mask, - events: make(chan Event, BufferSize), - timeout: time.NewTimer(0), - } + s := &Subscription{ + mask: mask, + events: make(chan Event, BufferSize), + timeout: time.NewTimer(0), + } - // We need to create the timeout timer in the stopped, non-fired state so - // that Subscription.Poll() can safely reset it and select on the timeout - // channel. This ensures the timer is stopped and the channel drained. - if runningTests { - // Make the behavior stable when running tests to avoid randomly - // varying test coverage. This ensures, in practice if not in - // theory, that the timer fires and we take the true branch of the - // next if. - runtime.Gosched() - } - if !s.timeout.Stop() { - <-s.timeout.C - } + // We need to create the timeout timer in the stopped, non-fired state so + // that Subscription.Poll() can safely reset it and select on the timeout + // channel. This ensures the timer is stopped and the channel drained. + if runningTests { + // Make the behavior stable when running tests to avoid randomly + // varying test coverage. This ensures, in practice if not in + // theory, that the timer fires and we take the true branch of the + // next if. + runtime.Gosched() + } + if !s.timeout.Stop() { + <-s.timeout.C + } - l.subs = append(l.subs, s) - l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1) - l.mutex.Unlock() - return s + l.subs = append(l.subs, s) + l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1) + res <- s + } + return <-res } func (l *Logger) Unsubscribe(s *Subscription) { - l.mutex.Lock() - dl.Debugln("unsubscribe") - for i, ss := range l.subs { - if s == ss { - last := len(l.subs) - 1 + l.funcs <- func() { + dl.Debugln("unsubscribe") + for i, ss := range l.subs { + if s == ss { + last := len(l.subs) - 1 - l.subs[i] = l.subs[last] - l.subs[last] = nil - l.subs = l.subs[:last] + l.subs[i] = l.subs[last] + l.subs[last] = nil + l.subs = l.subs[:last] - l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last] - l.nextSubscriptionIDs[last] = 0 - l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last] + l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last] + l.nextSubscriptionIDs[last] = 0 + l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last] - break + break + } } + close(s.events) } - close(s.events) - l.mutex.Unlock() } // Poll returns an event from the subscription or an error if the poll times diff --git a/lib/events/events_test.go b/lib/events/events_test.go index a310c03fe..33804490b 100644 --- a/lib/events/events_test.go +++ b/lib/events/events_test.go @@ -9,6 +9,7 @@ package events import ( "encoding/json" "fmt" + "sync" "testing" "time" ) @@ -28,6 +29,9 @@ func TestNewLogger(t *testing.T) { func TestSubscriber(t *testing.T) { l := NewLogger() + defer l.Stop() + go l.Serve() + s := l.Subscribe(0) defer l.Unsubscribe(s) if s == nil { @@ -37,6 +41,9 @@ func TestSubscriber(t *testing.T) { func TestTimeout(t *testing.T) { l := NewLogger() + defer l.Stop() + go l.Serve() + s := l.Subscribe(0) defer l.Unsubscribe(s) _, err := s.Poll(timeout) @@ -47,6 +54,8 @@ func TestTimeout(t *testing.T) { func TestEventBeforeSubscribe(t *testing.T) { l := NewLogger() + defer l.Stop() + go l.Serve() l.Log(DeviceConnected, "foo") s := l.Subscribe(0) @@ -60,6 +69,8 @@ func TestEventBeforeSubscribe(t *testing.T) { func TestEventAfterSubscribe(t *testing.T) { l := NewLogger() + defer l.Stop() + go l.Serve() s := l.Subscribe(AllEvents) defer l.Unsubscribe(s) @@ -85,6 +96,8 @@ func TestEventAfterSubscribe(t *testing.T) { func TestEventAfterSubscribeIgnoreMask(t *testing.T) { l := NewLogger() + defer l.Stop() + go l.Serve() s := l.Subscribe(DeviceDisconnected) defer l.Unsubscribe(s) @@ -98,6 +111,8 @@ func TestEventAfterSubscribeIgnoreMask(t *testing.T) { func TestBufferOverflow(t *testing.T) { l := NewLogger() + defer l.Stop() + go l.Serve() s := l.Subscribe(AllEvents) defer l.Unsubscribe(s) @@ -121,6 +136,8 @@ func TestBufferOverflow(t *testing.T) { func TestUnsubscribe(t *testing.T) { l := NewLogger() + defer l.Stop() + go l.Serve() s := l.Subscribe(AllEvents) l.Log(DeviceConnected, "foo") @@ -141,6 +158,8 @@ func TestUnsubscribe(t *testing.T) { func TestGlobalIDs(t *testing.T) { l := NewLogger() + defer l.Stop() + go l.Serve() s := l.Subscribe(AllEvents) defer l.Unsubscribe(s) @@ -171,6 +190,8 @@ func TestGlobalIDs(t *testing.T) { func TestSubscriptionIDs(t *testing.T) { l := NewLogger() + defer l.Stop() + go l.Serve() s := l.Subscribe(DeviceConnected) defer l.Unsubscribe(s) @@ -211,6 +232,8 @@ func TestSubscriptionIDs(t *testing.T) { func TestBufferedSub(t *testing.T) { l := NewLogger() + defer l.Stop() + go l.Serve() s := l.Subscribe(AllEvents) defer l.Unsubscribe(s) @@ -240,6 +263,8 @@ func TestBufferedSub(t *testing.T) { func BenchmarkBufferedSub(b *testing.B) { l := NewLogger() + defer l.Stop() + go l.Serve() s := l.Subscribe(AllEvents) defer l.Unsubscribe(s) @@ -294,6 +319,8 @@ func BenchmarkBufferedSub(b *testing.B) { func TestSinceUsesSubscriptionId(t *testing.T) { l := NewLogger() + defer l.Stop() + go l.Serve() s := l.Subscribe(DeviceConnected) defer l.Unsubscribe(s) @@ -339,3 +366,93 @@ func TestUnmarshalEvent(t *testing.T) { t.Fatal("Failed to unmarshal event:", err) } } + +func TestUnsubscribeContention(t *testing.T) { + // Check that we can unsubscribe without blocking the whole system. + + const ( + listeners = 50 + senders = 1000 + ) + + l := NewLogger() + defer l.Stop() + go l.Serve() + + // Start listeners. These will poll until the stop channel is closed, + // then exit and unsubscribe. + + stopListeners := make(chan struct{}) + var listenerWg sync.WaitGroup + listenerWg.Add(listeners) + for i := 0; i < listeners; i++ { + go func() { + defer listenerWg.Done() + + s := l.Subscribe(AllEvents) + defer l.Unsubscribe(s) + + for { + select { + case <-s.C(): + + case <-stopListeners: + return + } + } + }() + } + + // Start senders. These send pointless events until the stop channel is + // closed. + + stopSenders := make(chan struct{}) + defer close(stopSenders) + var senderWg sync.WaitGroup + senderWg.Add(senders) + for i := 0; i < senders; i++ { + go func() { + defer senderWg.Done() + + t := time.NewTicker(time.Millisecond) + + for { + select { + case <-t.C: + l.Log(StateChanged, nil) + + case <-stopSenders: + return + } + } + }() + } + + // Give everything time to start up. + + time.Sleep(time.Second) + + // Stop the listeners and wait for them to exit. This should happen in a + // reasonable time frame. + + t0 := time.Now() + close(stopListeners) + listenerWg.Wait() + if d := time.Since(t0); d > time.Minute { + t.Error("It should not take", d, "to unsubscribe from an event stream") + } +} + +func BenchmarkLogEvent(b *testing.B) { + l := NewLogger() + defer l.Stop() + go l.Serve() + + s := l.Subscribe(AllEvents) + defer l.Unsubscribe(s) + NewBufferedSubscription(s, 1) // runs in the background + + for i := 0; i < b.N; i++ { + l.Log(StateChanged, nil) + } +}