lib/rc: Fix hangups when Syncthing process ends unexpectedly (#5383)

This commit is contained in:
Simon Frei 2018-12-21 11:49:04 +01:00 committed by Jakob Borg
parent ae0dfcd7ca
commit 2626143fc5
1 changed files with 51 additions and 45 deletions

View File

@ -22,7 +22,6 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strconv" "strconv"
stdsync "sync"
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
@ -40,14 +39,14 @@ type Process struct {
addr string addr string
// Set by eventLoop() // Set by eventLoop()
eventMut sync.Mutex eventMut sync.Mutex
id protocol.DeviceID id protocol.DeviceID
folders []string folders []string
startComplete bool startComplete chan struct{}
startCompleteCond *stdsync.Cond stopped chan struct{}
stop bool stopErr error
sequence map[string]map[string]int64 // Folder ID => Device ID => Sequence sequence map[string]map[string]int64 // Folder ID => Device ID => Sequence
done map[string]bool // Folder ID => 100% done map[string]bool // Folder ID => 100%
cmd *exec.Cmd cmd *exec.Cmd
logfd *os.File logfd *os.File
@ -57,12 +56,13 @@ type Process struct {
// Example: NewProcess("127.0.0.1:8082") // Example: NewProcess("127.0.0.1:8082")
func NewProcess(addr string) *Process { func NewProcess(addr string) *Process {
p := &Process{ p := &Process{
addr: addr, addr: addr,
sequence: make(map[string]map[string]int64), sequence: make(map[string]map[string]int64),
done: make(map[string]bool), done: make(map[string]bool),
eventMut: sync.NewMutex(), eventMut: sync.NewMutex(),
startComplete: make(chan struct{}),
stopped: make(chan struct{}),
} }
p.startCompleteCond = stdsync.NewCond(p.eventMut)
return p return p
} }
@ -108,19 +108,30 @@ func (p *Process) Start(bin string, args ...string) error {
p.cmd = cmd p.cmd = cmd
go p.eventLoop() go p.eventLoop()
go p.wait()
return nil return nil
} }
func (p *Process) wait() {
p.cmd.Wait()
if p.logfd != nil {
p.stopErr = p.checkForProblems(p.logfd)
}
close(p.stopped)
}
// AwaitStartup waits for the Syncthing process to start and perform initial // AwaitStartup waits for the Syncthing process to start and perform initial
// scans of all folders. // scans of all folders.
func (p *Process) AwaitStartup() { func (p *Process) AwaitStartup() {
p.eventMut.Lock() fmt.Println("awaiting startup")
for !p.startComplete { select {
p.startCompleteCond.Wait() case <-p.startComplete:
case <-p.stopped:
} }
p.eventMut.Unlock() fmt.Println("awaited startup")
return
} }
// Stop stops the running Syncthing process. If the process was logging to a // Stop stops the running Syncthing process. If the process was logging to a
@ -128,27 +139,21 @@ func (p *Process) AwaitStartup() {
// panics and data races. The presence of either will be signalled in the form // panics and data races. The presence of either will be signalled in the form
// of a returned error. // of a returned error.
func (p *Process) Stop() (*os.ProcessState, error) { func (p *Process) Stop() (*os.ProcessState, error) {
p.eventMut.Lock() select {
if p.stop { case <-p.stopped:
p.eventMut.Unlock() return p.cmd.ProcessState, p.stopErr
return p.cmd.ProcessState, nil default:
} }
p.stop = true
p.eventMut.Unlock()
if _, err := p.Post("/rest/system/shutdown", nil); err != nil && err != io.ErrUnexpectedEOF { if _, err := p.Post("/rest/system/shutdown", nil); err != nil && err != io.ErrUnexpectedEOF {
// Unexpected EOF is somewhat expected here, as we may exit before // Unexpected EOF is somewhat expected here, as we may exit before
// returning something sensible. // returning something sensible.
return nil, err return nil, err
} }
p.cmd.Wait()
var err error <-p.stopped
if p.logfd != nil {
err = p.checkForProblems(p.logfd)
}
return p.cmd.ProcessState, err return p.cmd.ProcessState, p.stopErr
} }
// Get performs an HTTP GET and returns the bytes and/or an error. Any non-200 // Get performs an HTTP GET and returns the bytes and/or an error. Any non-200
@ -403,7 +408,11 @@ func (p *Process) checkForProblems(logfd *os.File) error {
raceConditionStart := []byte("WARNING: DATA RACE") raceConditionStart := []byte("WARNING: DATA RACE")
raceConditionSep := []byte("==================") raceConditionSep := []byte("==================")
panicConditionStart := []byte("panic:") panicConditionStart := []byte("panic:")
panicConditionSep := []byte(p.id.String()[:5]) p.eventMut.Lock()
panicConditionSep := []byte("[") // fallback if we don't already know our ID
if p.id.String() != "" {
panicConditionSep = []byte(p.id.String()[:5])
}
sc := bufio.NewScanner(fd) sc := bufio.NewScanner(fd)
race := false race := false
_panic := false _panic := false
@ -442,12 +451,11 @@ func (p *Process) eventLoop() {
notScanned := make(map[string]struct{}) notScanned := make(map[string]struct{})
start := time.Now() start := time.Now()
for { for {
p.eventMut.Lock() select {
if p.stop { case <-p.stopped:
p.eventMut.Unlock()
return return
default:
} }
p.eventMut.Unlock()
events, err := p.Events(since) events, err := p.Events(since)
if err != nil { if err != nil {
@ -457,12 +465,11 @@ func (p *Process) eventLoop() {
} }
// If we're stopping, no need to print the error. // If we're stopping, no need to print the error.
p.eventMut.Lock() select {
if p.stop { case <-p.stopped:
p.eventMut.Unlock()
return return
default:
} }
p.eventMut.Unlock()
log.Println("eventLoop: events:", err) log.Println("eventLoop: events:", err)
continue continue
@ -511,17 +518,16 @@ func (p *Process) eventLoop() {
panic("race, or lost startup event") panic("race, or lost startup event")
} }
if !p.startComplete { select {
case <-p.startComplete:
default:
data := ev.Data.(map[string]interface{}) data := ev.Data.(map[string]interface{})
to := data["to"].(string) to := data["to"].(string)
if to == "idle" { if to == "idle" {
folder := data["folder"].(string) folder := data["folder"].(string)
delete(notScanned, folder) delete(notScanned, folder)
if len(notScanned) == 0 { if len(notScanned) == 0 {
p.eventMut.Lock() close(p.startComplete)
p.startComplete = true
p.startCompleteCond.Broadcast()
p.eventMut.Unlock()
} }
} }
} }