lib/syncthing: Refactor to use util.AsService (#5858)

This commit is contained in:
Simon Frei 2019-07-23 10:50:37 +02:00 committed by GitHub
parent 942659fb06
commit 7b3d9a8dca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 35 additions and 73 deletions

View File

@ -10,42 +10,38 @@ import (
"encoding/json"
"io"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/util"
)
// The auditService subscribes to events and writes these in JSON format, one
// event per line, to the specified writer.
type auditService struct {
w io.Writer // audit destination
stop chan struct{} // signals time to stop
started chan struct{} // signals startup complete
stopped chan struct{} // signals stop complete
suture.Service
w io.Writer // audit destination
sub *events.Subscription
}
func newAuditService(w io.Writer) *auditService {
return &auditService{
w: w,
stop: make(chan struct{}),
started: make(chan struct{}),
stopped: make(chan struct{}),
s := &auditService{
w: w,
sub: events.Default.Subscribe(events.AllEvents),
}
s.Service = util.AsService(s.serve)
return s
}
// Serve runs the audit service.
func (s *auditService) Serve() {
defer close(s.stopped)
sub := events.Default.Subscribe(events.AllEvents)
defer events.Default.Unsubscribe(sub)
// serve runs the audit service.
func (s *auditService) serve(stop chan struct{}) {
enc := json.NewEncoder(s.w)
// We're ready to start processing events.
close(s.started)
for {
select {
case ev := <-sub.C():
case ev := <-s.sub.C():
enc.Encode(ev)
case <-s.stop:
case <-stop:
return
}
}
@ -53,17 +49,6 @@ func (s *auditService) Serve() {
// Stop stops the audit service.
func (s *auditService) Stop() {
close(s.stop)
}
// WaitForStart returns once the audit service is ready to receive events, or
// immediately if it's already running.
func (s *auditService) WaitForStart() {
<-s.started
}
// WaitForStop returns once the audit service has stopped.
// (Needed by the tests.)
func (s *auditService) WaitForStop() {
<-s.stopped
s.Service.Stop()
events.Default.Unsubscribe(s.sub)
}

View File

@ -17,13 +17,12 @@ import (
func TestAuditService(t *testing.T) {
buf := new(bytes.Buffer)
service := newAuditService(buf)
// Event sent before start, will not be logged
// Event sent before construction, will not be logged
events.Default.Log(events.ConfigSaved, "the first event")
service := newAuditService(buf)
go service.Serve()
service.WaitForStart()
// Event that should end up in the audit log
events.Default.Log(events.ConfigSaved, "the second event")
@ -32,7 +31,6 @@ func TestAuditService(t *testing.T) {
time.Sleep(10 * time.Millisecond)
service.Stop()
service.WaitForStop()
// This event should not be logged, since we have stopped.
events.Default.Log(events.ConfigSaved, "the third event")

View File

@ -126,7 +126,7 @@ func (a *App) startup() error {
l.SetPrefix("[start] ")
if a.opts.AuditWriter != nil {
a.startAuditing()
a.mainService.Add(newAuditService(a.opts.AuditWriter))
}
if a.opts.Verbose {
@ -418,15 +418,6 @@ func (a *App) Stop(stopReason ExitStatus) ExitStatus {
return a.exitStatus
}
func (a *App) startAuditing() {
auditService := newAuditService(a.opts.AuditWriter)
a.mainService.Add(auditService)
// We wait for the audit service to fully start before we return, to
// ensure we capture all events from the start.
auditService.WaitForStart()
}
func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, errors, systemLog logger.Recorder) error {
guiCfg := a.cfg.GUI()

View File

@ -9,45 +9,37 @@ package syncthing
import (
"fmt"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/util"
)
// The verbose logging service subscribes to events and prints these in
// verbose format to the console using INFO level.
type verboseService struct {
stop chan struct{} // signals time to stop
started chan struct{} // signals startup complete
suture.Service
sub *events.Subscription
}
func newVerboseService() *verboseService {
return &verboseService{
stop: make(chan struct{}),
started: make(chan struct{}),
s := &verboseService{
sub: events.Default.Subscribe(events.AllEvents),
}
s.Service = util.AsService(s.serve)
return s
}
// Serve runs the verbose logging service.
func (s *verboseService) Serve() {
sub := events.Default.Subscribe(events.AllEvents)
defer events.Default.Unsubscribe(sub)
select {
case <-s.started:
// The started channel has already been closed; do nothing.
default:
// This is the first time around. Indicate that we're ready to start
// processing events.
close(s.started)
}
// serve runs the verbose logging service.
func (s *verboseService) serve(stop chan struct{}) {
for {
select {
case ev := <-sub.C():
case ev := <-s.sub.C():
formatted := s.formatEvent(ev)
if formatted != "" {
l.Verboseln(formatted)
}
case <-s.stop:
case <-stop:
return
}
}
@ -55,13 +47,9 @@ func (s *verboseService) Serve() {
// Stop stops the verbose logging service.
func (s *verboseService) Stop() {
close(s.stop)
}
s.Service.Stop()
events.Default.Unsubscribe(s.sub)
// WaitForStart returns once the verbose logging service is ready to receive
// events, or immediately if it's already running.
func (s *verboseService) WaitForStart() {
<-s.started
}
func (s *verboseService) formatEvent(ev events.Event) string {