From 7ac00e189b8ddaa7654c61becdd1c3391e6df50a Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 1 Jul 2014 11:58:53 +0200 Subject: [PATCH 01/11] Tone down UPnP not found message (fixes #406) --- cmd/syncthing/main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index d5fc32aae..ba64fd384 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -483,7 +483,10 @@ func setupUPnP(r rand.Source) int { l.Warnln("Failed to create UPnP port mapping") } } else { - l.Infof("No UPnP IGD device found, no port mapping created (%v)", err) + l.Infof("No UPnP gateway detected") + if debugNet { + l.Debugf("UPnP: %v", err) + } } } } else { From 5353659f9fcb840142ba4c8d83f48654d43d4bdc Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 1 Jul 2014 13:22:45 +0200 Subject: [PATCH 02/11] Add temporary debug logging for #344 (revert later) --- files/set.go | 5 +++++ files/set_debug.go | 51 ++++++++++++++++++++++++++++++++++++++++++++++ model/model.go | 3 +++ 3 files changed, 59 insertions(+) create mode 100644 files/set_debug.go diff --git a/files/set.go b/files/set.go index bcb7048df..0f0d657f2 100644 --- a/files/set.go +++ b/files/set.go @@ -49,6 +49,7 @@ func (m *Set) Replace(id uint, fs []scanner.File) { } m.Lock() + log("Replace", id, len(fs)) if len(fs) == 0 || !m.equals(id, fs) { m.changes[id]++ m.replace(id, fs) @@ -65,6 +66,7 @@ func (m *Set) ReplaceWithDelete(id uint, fs []scanner.File) { } m.Lock() + log("ReplaceWithDelete", id, len(fs)) if len(fs) == 0 || !m.equals(id, fs) { m.changes[id]++ @@ -102,7 +104,9 @@ func (m *Set) Update(id uint, fs []scanner.File) { if debug { l.Debugf("Update(%d, [%d])", id, len(fs)) } + m.Lock() + log("Update", id, len(fs)) m.update(id, fs) m.changes[id]++ m.Unlock() @@ -220,6 +224,7 @@ func (m *Set) equals(id uint, fs []scanner.File) bool { func (m *Set) update(cid uint, fs []scanner.File) { remFiles := m.remoteKey[cid] if remFiles == nil { + printLog() l.Fatalln("update before replace for cid", cid) } for _, f := range fs { diff --git a/files/set_debug.go b/files/set_debug.go new file mode 100644 index 000000000..76dac1e28 --- /dev/null +++ b/files/set_debug.go @@ -0,0 +1,51 @@ +package files + +import ( + "fmt" + "time" + + "github.com/calmh/syncthing/cid" +) + +type logEntry struct { + time time.Time + method string + cid uint + node string + nfiles int +} + +func (l logEntry) String() string { + return fmt.Sprintf("%v: %s cid:%d node:%s nfiles:%d", l.time, l.method, l.cid, l.node, l.nfiles) +} + +var ( + debugLog [10]logEntry + debugNext int + cm *cid.Map +) + +func SetCM(m *cid.Map) { + cm = m +} + +func log(method string, id uint, nfiles int) { + e := logEntry{ + time: time.Now(), + method: method, + cid: id, + nfiles: nfiles, + } + if cm != nil { + e.node = cm.Name(id) + } + debugLog[debugNext] = e + debugNext = (debugNext + 1) % len(debugLog) +} + +func printLog() { + l.Debugln("--- Consistency error ---") + for _, e := range debugLog { + l.Debugln(e) + } +} diff --git a/model/model.go b/model/model.go index a05de8afd..01519f267 100644 --- a/model/model.go +++ b/model/model.go @@ -98,6 +98,9 @@ func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVers sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)}, } + // TEMP: #344 + files.SetCM(m.cm) + var timeout = 20 * 60 // seconds if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 { it, err := strconv.Atoi(t) From 0aa067a726cf7cdf46a021d8d55307d7f0bce831 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Wed, 2 Jul 2014 07:40:27 +0200 Subject: [PATCH 03/11] Avoid deadlock during initial scan (fixes #389) --- model/model.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/model/model.go b/model/model.go index 01519f267..79245281b 100644 --- a/model/model.go +++ b/model/model.go @@ -454,19 +454,6 @@ func (m *Model) ReplaceLocal(repo string, fs []scanner.File) { m.rmut.RUnlock() } -func (m *Model) SeedLocal(repo string, fs []protocol.FileInfo) { - var sfs = make([]scanner.File, len(fs)) - for i := 0; i < len(fs); i++ { - lamport.Default.Tick(fs[i].Version) - sfs[i] = fileFromFileInfo(fs[i]) - sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup - } - - m.rmut.RLock() - m.repoFiles[repo].Replace(cid.LocalID, sfs) - m.rmut.RUnlock() -} - func (m *Model) CurrentRepoFile(repo string, file string) scanner.File { m.rmut.RLock() f := m.repoFiles[repo].Get(cid.LocalID, file) @@ -736,7 +723,15 @@ func (m *Model) LoadIndexes(dir string) { m.rmut.RLock() for repo := range m.repoCfgs { fs := m.loadIndex(repo, dir) - m.SeedLocal(repo, fs) + + var sfs = make([]scanner.File, len(fs)) + for i := 0; i < len(fs); i++ { + lamport.Default.Tick(fs[i].Version) + sfs[i] = fileFromFileInfo(fs[i]) + sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup + } + + m.repoFiles[repo].Replace(cid.LocalID, sfs) } m.rmut.RUnlock() } From 91c4ff6009a08900454aa5dd92767af421ae9e64 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Wed, 2 Jul 2014 20:28:03 +0200 Subject: [PATCH 04/11] Handle query parameters in UPnP control URL (fixes #211) --- upnp/upnp.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/upnp/upnp.go b/upnp/upnp.go index b4afebad9..df9e73fd7 100644 --- a/upnp/upnp.go +++ b/upnp/upnp.go @@ -203,14 +203,26 @@ func getServiceURL(rootURL string) (string, error) { } u, _ := url.Parse(rootURL) - if svc.ControlURL[0] == '/' { - u.Path = svc.ControlURL - } else { - u.Path += svc.ControlURL - } + replaceRawPath(u, svc.ControlURL) return u.String(), nil } +func replaceRawPath(u *url.URL, rp string) { + var p, q string + fs := strings.Split(rp, "?") + p = fs[0] + if len(fs) > 1 { + q = fs[1] + } + + if p[0] == '/' { + u.Path = p + } else { + u.Path += p + } + u.RawQuery = q +} + func soapRequest(url, function, message string) error { tpl := ` From 53898d2c6065ba33219796ed2928ee2d3b7e75ce Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Wed, 2 Jul 2014 20:43:16 +0200 Subject: [PATCH 05/11] Log client version on connect --- model/model.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/model/model.go b/model/model.go index 79245281b..2536b339a 100644 --- a/model/model.go +++ b/model/model.go @@ -372,6 +372,8 @@ func (m *Model) ClusterConfig(nodeID string, config protocol.ClusterConfigMessag m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion } m.pmut.Unlock() + + l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion) } // Close removes the peer from the model and closes the underlying connection if possible. From 6ade27641dd4c89061b2b30e4f248a1ceee26ed1 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Wed, 2 Jul 2014 21:33:30 +0200 Subject: [PATCH 06/11] Protocol state machine on receiving side --- protocol/protocol.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/protocol/protocol.go b/protocol/protocol.go index 20ef89959..9e5a6d3cd 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -28,6 +28,12 @@ const ( messageTypeIndexUpdate = 6 ) +const ( + stateInitial = iota + stateCCRcvd + stateIdxRcvd +) + const ( FlagDeleted uint32 = 1 << 12 FlagInvalid = 1 << 13 @@ -70,6 +76,7 @@ type Connection interface { type rawConnection struct { id string receiver Model + state int reader io.ReadCloser cr *countingReader @@ -116,6 +123,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M c := rawConnection{ id: nodeID, receiver: nativeModel{receiver}, + state: stateInitial, reader: flrd, cr: cr, xr: xdr.NewReader(flrd), @@ -257,21 +265,34 @@ func (c *rawConnection) readerLoop() (err error) { switch hdr.msgType { case messageTypeIndex: + if c.state < stateCCRcvd { + return fmt.Errorf("protocol error: index message in state %d", c.state) + } if err := c.handleIndex(); err != nil { return err } + c.state = stateIdxRcvd case messageTypeIndexUpdate: + if c.state < stateIdxRcvd { + return fmt.Errorf("protocol error: index update message in state %d", c.state) + } if err := c.handleIndexUpdate(); err != nil { return err } case messageTypeRequest: + if c.state < stateIdxRcvd { + return fmt.Errorf("protocol error: request message in state %d", c.state) + } if err := c.handleRequest(hdr); err != nil { return err } case messageTypeResponse: + if c.state < stateIdxRcvd { + return fmt.Errorf("protocol error: response message in state %d", c.state) + } if err := c.handleResponse(hdr); err != nil { return err } @@ -283,9 +304,13 @@ func (c *rawConnection) readerLoop() (err error) { c.handlePong(hdr) case messageTypeClusterConfig: + if c.state != stateInitial { + return fmt.Errorf("protocol error: cluster config message in state %d", c.state) + } if err := c.handleClusterConfig(); err != nil { return err } + c.state = stateCCRcvd default: return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) From 381795d6d00c222df85bdf604fd2e464febdeefa Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Wed, 2 Jul 2014 21:49:24 +0200 Subject: [PATCH 07/11] Simplify locking in protocol.Index --- protocol/protocol.go | 50 +++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/protocol/protocol.go b/protocol/protocol.go index 9e5a6d3cd..d3bbd5a69 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -88,11 +88,11 @@ type rawConnection struct { xw *xdr.Writer wmut sync.Mutex - indexSent map[string]map[string]uint64 - awaiting []chan asyncResult - imut sync.Mutex + awaiting []chan asyncResult + imut sync.Mutex - idxMut sync.Mutex // ensures serialization of Index calls + idxSent map[string]map[string]uint64 + idxMut sync.Mutex // ensures serialization of Index calls nextID chan int outbox chan []encodable @@ -121,21 +121,21 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M wb := bufio.NewWriter(flwr) c := rawConnection{ - id: nodeID, - receiver: nativeModel{receiver}, - state: stateInitial, - reader: flrd, - cr: cr, - xr: xdr.NewReader(flrd), - writer: flwr, - cw: cw, - wb: wb, - xw: xdr.NewWriter(wb), - awaiting: make([]chan asyncResult, 0x1000), - indexSent: make(map[string]map[string]uint64), - outbox: make(chan []encodable), - nextID: make(chan int), - closed: make(chan struct{}), + id: nodeID, + receiver: nativeModel{receiver}, + state: stateInitial, + reader: flrd, + cr: cr, + xr: xdr.NewReader(flrd), + writer: flwr, + cw: cw, + wb: wb, + xw: xdr.NewWriter(wb), + awaiting: make([]chan asyncResult, 0x1000), + idxSent: make(map[string]map[string]uint64), + outbox: make(chan []encodable), + nextID: make(chan int), + closed: make(chan struct{}), } go c.indexSerializerLoop() @@ -156,29 +156,27 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) { c.idxMut.Lock() defer c.idxMut.Unlock() - c.imut.Lock() var msgType int - if c.indexSent[repo] == nil { + if c.idxSent[repo] == nil { // This is the first time we send an index. msgType = messageTypeIndex - c.indexSent[repo] = make(map[string]uint64) + c.idxSent[repo] = make(map[string]uint64) for _, f := range idx { - c.indexSent[repo][f.Name] = f.Version + c.idxSent[repo][f.Name] = f.Version } } else { // We have sent one full index. Only send updates now. msgType = messageTypeIndexUpdate var diff []FileInfo for _, f := range idx { - if vs, ok := c.indexSent[repo][f.Name]; !ok || f.Version != vs { + if vs, ok := c.idxSent[repo][f.Name]; !ok || f.Version != vs { diff = append(diff, f) - c.indexSent[repo][f.Name] = f.Version + c.idxSent[repo][f.Name] = f.Version } } idx = diff } - c.imut.Unlock() if len(idx) > 0 { c.send(header{0, -1, msgType}, IndexMessage{repo, idx}) From bc1d04f0b92c0359a399cd38924cd6c58f82eb6e Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Wed, 2 Jul 2014 21:49:51 +0200 Subject: [PATCH 08/11] Always send initial index, even if empty (ref #344) --- protocol/protocol.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/protocol.go b/protocol/protocol.go index d3bbd5a69..564220695 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -178,7 +178,7 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) { idx = diff } - if len(idx) > 0 { + if msgType == messageTypeIndex || len(idx) > 0 { c.send(header{0, -1, msgType}, IndexMessage{repo, idx}) } } From 2f5a822ca4c04847676fe9168e4d26decc6c1970 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 3 Jul 2014 12:30:10 +0200 Subject: [PATCH 09/11] Send initial index in batches --- model/model.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/model/model.go b/model/model.go index 2536b339a..424018196 100644 --- a/model/model.go +++ b/model/model.go @@ -520,7 +520,14 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) if debug { l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx)) } - protoConn.Index(repo, idx) + const batchSize = 1000 + for i := 0; i < len(idx); i += batchSize { + if len(idx[i:]) < batchSize { + protoConn.Index(repo, idx[i:]) + } else { + protoConn.Index(repo, idx[i:i+batchSize]) + } + } } }() } From 4a6b43bcaec3f6deaf823e96746f725ef3e77a6b Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 3 Jul 2014 13:37:20 +0200 Subject: [PATCH 10/11] Clean up protocol locking and closing --- protocol/protocol.go | 74 ++++++++++++++++---------------------------- 1 file changed, 27 insertions(+), 47 deletions(-) diff --git a/protocol/protocol.go b/protocol/protocol.go index 564220695..9977737d3 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -81,15 +81,14 @@ type rawConnection struct { reader io.ReadCloser cr *countingReader xr *xdr.Reader + writer io.WriteCloser + cw *countingWriter + wb *bufio.Writer + xw *xdr.Writer - cw *countingWriter - wb *bufio.Writer - xw *xdr.Writer - wmut sync.Mutex - - awaiting []chan asyncResult - imut sync.Mutex + awaiting []chan asyncResult + awaitingMut sync.Mutex idxSent map[string]map[string]uint64 idxMut sync.Mutex // ensures serialization of Index calls @@ -97,6 +96,7 @@ type rawConnection struct { nextID chan int outbox chan []encodable closed chan struct{} + once sync.Once } type asyncResult struct { @@ -192,13 +192,13 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int return nil, ErrClosed } - c.imut.Lock() + c.awaitingMut.Lock() if ch := c.awaiting[id]; ch != nil { panic("id taken") } - rc := make(chan asyncResult) + rc := make(chan asyncResult, 1) c.awaiting[id] = rc - c.imut.Unlock() + c.awaitingMut.Unlock() ok := c.send(header{0, id, messageTypeRequest}, RequestMessage{repo, name, uint64(offset), uint32(size)}) @@ -227,9 +227,9 @@ func (c *rawConnection) ping() bool { } rc := make(chan asyncResult, 1) - c.imut.Lock() + c.awaitingMut.Lock() c.awaiting[id] = rc - c.imut.Unlock() + c.awaitingMut.Unlock() ok := c.send(header{0, id, messageTypePing}) if !ok { @@ -388,32 +388,25 @@ func (c *rawConnection) handleResponse(hdr header) error { return err } - go func(hdr header, err error) { - c.imut.Lock() - rc := c.awaiting[hdr.msgID] + c.awaitingMut.Lock() + if rc := c.awaiting[hdr.msgID]; rc != nil { c.awaiting[hdr.msgID] = nil - c.imut.Unlock() - - if rc != nil { - rc <- asyncResult{data, err} - close(rc) - } - }(hdr, c.xr.Error()) + rc <- asyncResult{data, nil} + close(rc) + } + c.awaitingMut.Unlock() return nil } func (c *rawConnection) handlePong(hdr header) { - c.imut.Lock() + c.awaitingMut.Lock() if rc := c.awaiting[hdr.msgID]; rc != nil { - go func() { - rc <- asyncResult{} - close(rc) - }() - c.awaiting[hdr.msgID] = nil + rc <- asyncResult{} + close(rc) } - c.imut.Unlock() + c.awaitingMut.Unlock() } func (c *rawConnection) handleClusterConfig() error { @@ -458,17 +451,14 @@ func (c *rawConnection) send(h header, es ...encodable) bool { func (c *rawConnection) writerLoop() { var err error for es := range c.outbox { - c.wmut.Lock() for _, e := range es { e.encodeXDR(c.xw) } if err = c.flush(); err != nil { - c.wmut.Unlock() c.close(err) return } - c.wmut.Unlock() } } @@ -493,29 +483,20 @@ func (c *rawConnection) flush() error { } func (c *rawConnection) close(err error) { - c.imut.Lock() - c.wmut.Lock() - defer c.imut.Unlock() - defer c.wmut.Unlock() - - select { - case <-c.closed: - return - default: + c.once.Do(func() { close(c.closed) + c.awaitingMut.Lock() for i, ch := range c.awaiting { if ch != nil { close(ch) c.awaiting[i] = nil } } - - c.writer.Close() - c.reader.Close() + c.awaitingMut.Unlock() go c.receiver.Close(c.id, err) - } + }) } func (c *rawConnection) idGenerator() { @@ -577,8 +558,7 @@ func (c *rawConnection) pingerLoop() { func (c *rawConnection) processRequest(msgID int, req RequestMessage) { data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size)) - c.send(header{0, msgID, messageTypeResponse}, - encodableBytes(data)) + c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data)) } type Statistics struct { From a720f90a70f3ca21ab8eacb98788aa04e7aca704 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Fri, 4 Jul 2014 15:16:33 +0200 Subject: [PATCH 11/11] Don't leak writer and index goroutines on close --- protocol/protocol.go | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/protocol/protocol.go b/protocol/protocol.go index 9977737d3..e82fa756b 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -332,11 +332,16 @@ func (c *rawConnection) indexSerializerLoop() { // large index update from the other side. But we must also ensure to // process the indexes in the order they are received, hence the separate // routine and buffered channel. - for ii := range incomingIndexes { - if ii.update { - c.receiver.IndexUpdate(ii.id, ii.repo, ii.files) - } else { - c.receiver.Index(ii.id, ii.repo, ii.files) + for { + select { + case ii := <-incomingIndexes: + if ii.update { + c.receiver.IndexUpdate(ii.id, ii.repo, ii.files) + } else { + c.receiver.Index(ii.id, ii.repo, ii.files) + } + case <-c.closed: + return } } } @@ -450,13 +455,18 @@ func (c *rawConnection) send(h header, es ...encodable) bool { func (c *rawConnection) writerLoop() { var err error - for es := range c.outbox { - for _, e := range es { - e.encodeXDR(c.xw) - } + for { + select { + case es := <-c.outbox: + for _, e := range es { + e.encodeXDR(c.xw) + } - if err = c.flush(); err != nil { - c.close(err) + if err = c.flush(); err != nil { + c.close(err) + return + } + case <-c.closed: return } }