diff --git a/cmd/syncthing/model.go b/cmd/syncthing/model.go index 4356998f7..b0e043034 100644 --- a/cmd/syncthing/model.go +++ b/cmd/syncthing/model.go @@ -423,16 +423,14 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) cm := m.clusterConfig(nodeID) protoConn.ClusterConfig(cm) - var idxToSend = make(map[string][]protocol.FileInfo) - - m.rmut.RLock() - for _, repo := range m.nodeRepos[nodeID] { - idxToSend[repo] = m.protocolIndex(repo) - } - m.rmut.RUnlock() - go func() { - for repo, idx := range idxToSend { + m.rmut.RLock() + repos := m.nodeRepos[nodeID] + m.rmut.RUnlock() + for _, repo := range repos { + m.rmut.RLock() + idx := m.protocolIndex(repo) + m.rmut.RUnlock() if debugNet { dlog.Printf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx)) } diff --git a/protocol/protocol.go b/protocol/protocol.go index 157597c99..889199591 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -64,26 +64,24 @@ type Connection interface { } type rawConnection struct { - id string - receiver Model - - reader io.ReadCloser - cr *countingReader - xr *xdr.Reader - writer io.WriteCloser - - cw *countingWriter - wb *bufio.Writer - xw *xdr.Writer - wmut sync.Mutex - - close chan error - closed chan struct{} + sync.RWMutex + id string + receiver Model + reader io.ReadCloser + cr *countingReader + xr *xdr.Reader + writer io.WriteCloser + cw *countingWriter + wb *bufio.Writer + xw *xdr.Writer + closed chan struct{} awaiting map[int]chan asyncResult nextID int indexSent map[string]map[string][2]int64 - imut sync.Mutex + + hasSentIndex bool + hasRecvdIndex bool } type asyncResult struct { @@ -117,13 +115,11 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M cw: cw, wb: wb, xw: xdr.NewWriter(wb), - close: make(chan error), closed: make(chan struct{}), awaiting: make(map[int]chan asyncResult), indexSent: make(map[string]map[string][2]int64), } - go c.closer() go c.readerLoop() go c.pingerLoop() @@ -136,11 +132,11 @@ func (c *rawConnection) ID() string { // Index writes the list of file information to the connected peer node func (c *rawConnection) Index(repo string, idx []FileInfo) { + c.Lock() if c.isClosed() { + c.Unlock() return } - - c.imut.Lock() var msgType int if c.indexSent[repo] == nil { // This is the first time we send an index. @@ -163,48 +159,45 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) { idx = diff } - id := c.nextID + header{0, c.nextID, msgType}.encodeXDR(c.xw) + _, err := IndexMessage{repo, idx}.encodeXDR(c.xw) + if err == nil { + err = c.flush() + } c.nextID = (c.nextID + 1) & 0xfff - c.imut.Unlock() - - c.wmut.Lock() - header{0, id, msgType}.encodeXDR(c.xw) - IndexMessage{repo, idx}.encodeXDR(c.xw) - err := c.flush() - c.wmut.Unlock() + c.hasSentIndex = true + c.Unlock() if err != nil { - c.close <- err + c.close(err) return } } // Request returns the bytes for the specified block after fetching them from the connected peer. func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) { + c.Lock() if c.isClosed() { + c.Unlock() return nil, ErrClosed } - - c.imut.Lock() - id := c.nextID - c.nextID = (c.nextID + 1) & 0xfff rc := make(chan asyncResult) - if _, ok := c.awaiting[id]; ok { + if _, ok := c.awaiting[c.nextID]; ok { panic("id taken") } - c.awaiting[id] = rc - c.imut.Unlock() - - c.wmut.Lock() - header{0, id, messageTypeRequest}.encodeXDR(c.xw) - RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw) - err := c.flush() - c.wmut.Unlock() - + c.awaiting[c.nextID] = rc + header{0, c.nextID, messageTypeRequest}.encodeXDR(c.xw) + _, err := RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw) + if err == nil { + err = c.flush() + } if err != nil { - c.close <- err + c.Unlock() + c.close(err) return nil, err } + c.nextID = (c.nextID + 1) & 0xfff + c.Unlock() res, ok := <-rc if !ok { @@ -215,47 +208,46 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int // ClusterConfig send the cluster configuration message to the peer and returns any error func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) { + c.Lock() + defer c.Unlock() + if c.isClosed() { return } - c.imut.Lock() - id := c.nextID + header{0, c.nextID, messageTypeClusterConfig}.encodeXDR(c.xw) c.nextID = (c.nextID + 1) & 0xfff - c.imut.Unlock() - - c.wmut.Lock() - header{0, id, messageTypeClusterConfig}.encodeXDR(c.xw) - config.encodeXDR(c.xw) - err := c.flush() - c.wmut.Unlock() + _, err := config.encodeXDR(c.xw) + if err == nil { + err = c.flush() + } if err != nil { - c.close <- err + c.close(err) } } func (c *rawConnection) ping() bool { + c.Lock() if c.isClosed() { + c.Unlock() return false } - - c.imut.Lock() - id := c.nextID - c.nextID = (c.nextID + 1) & 0xfff rc := make(chan asyncResult, 1) - c.awaiting[id] = rc - c.imut.Unlock() - - c.wmut.Lock() - header{0, id, messageTypePing}.encodeXDR(c.xw) + c.awaiting[c.nextID] = rc + header{0, c.nextID, messageTypePing}.encodeXDR(c.xw) err := c.flush() - c.wmut.Unlock() - if err != nil { - c.close <- err + c.Unlock() + c.close(err) + return false + } else if c.xw.Error() != nil { + c.Unlock() + c.close(c.xw.Error()) return false } + c.nextID = (c.nextID + 1) & 0xfff + c.Unlock() res, ok := <-rc return ok && res.err == nil @@ -266,24 +258,21 @@ type flusher interface { } func (c *rawConnection) flush() error { - if err := c.xw.Error(); err != nil { - return err - } - - if err := c.wb.Flush(); err != nil { - return err - } - + c.wb.Flush() if f, ok := c.writer.(flusher); ok { return f.Flush() } - return nil } -func (c *rawConnection) closer() { - err := <-c.close - +func (c *rawConnection) close(err error) { + c.Lock() + select { + case <-c.closed: + c.Unlock() + return + default: + } close(c.closed) for _, ch := range c.awaiting { close(ch) @@ -291,6 +280,7 @@ func (c *rawConnection) closer() { c.awaiting = nil c.writer.Close() c.reader.Close() + c.Unlock() c.receiver.Close(c.id, err) } @@ -309,12 +299,12 @@ loop: for !c.isClosed() { var hdr header hdr.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - c.close <- err + if c.xr.Error() != nil { + c.close(c.xr.Error()) break loop } if hdr.version != 0 { - c.close <- fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version) + c.close(fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version)) break loop } @@ -322,8 +312,8 @@ loop: case messageTypeIndex: var im IndexMessage im.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - c.close <- err + if c.xr.Error() != nil { + c.close(c.xr.Error()) break loop } else { @@ -336,12 +326,15 @@ loop: go c.receiver.Index(c.id, im.Repository, im.Files) } + c.Lock() + c.hasRecvdIndex = true + c.Unlock() case messageTypeIndexUpdate: var im IndexMessage im.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - c.close <- err + if c.xr.Error() != nil { + c.close(c.xr.Error()) break loop } else { go c.receiver.IndexUpdate(c.id, im.Repository, im.Files) @@ -350,8 +343,8 @@ loop: case messageTypeRequest: var req RequestMessage req.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - c.close <- err + if c.xr.Error() != nil { + c.close(c.xr.Error()) break loop } go c.processRequest(hdr.msgID, req) @@ -359,16 +352,16 @@ loop: case messageTypeResponse: data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size - if err := c.xr.Error(); err != nil { - c.close <- err + if c.xr.Error() != nil { + c.close(c.xr.Error()) break loop } go func(hdr header, err error) { - c.imut.Lock() + c.Lock() rc, ok := c.awaiting[hdr.msgID] delete(c.awaiting, hdr.msgID) - c.imut.Unlock() + c.Unlock() if ok { rc <- asyncResult{data, err} @@ -377,41 +370,44 @@ loop: }(hdr, c.xr.Error()) case messageTypePing: - c.wmut.Lock() + c.Lock() header{0, hdr.msgID, messageTypePong}.encodeXDR(c.xw) err := c.flush() - c.wmut.Unlock() + c.Unlock() if err != nil { - c.close <- err + c.close(err) + break loop + } else if c.xw.Error() != nil { + c.close(c.xw.Error()) break loop } case messageTypePong: - c.imut.Lock() + c.RLock() rc, ok := c.awaiting[hdr.msgID] + c.RUnlock() if ok { - go func() { - rc <- asyncResult{} - close(rc) - }() + rc <- asyncResult{} + close(rc) + c.Lock() delete(c.awaiting, hdr.msgID) + c.Unlock() } - c.imut.Unlock() case messageTypeClusterConfig: var cm ClusterConfigMessage cm.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - c.close <- err + if c.xr.Error() != nil { + c.close(c.xr.Error()) break loop } else { go c.receiver.ClusterConfig(c.id, cm) } default: - c.close <- fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) + c.close(fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)) break loop } } @@ -420,16 +416,17 @@ loop: func (c *rawConnection) processRequest(msgID int, req RequestMessage) { data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size)) - c.wmut.Lock() + c.Lock() header{0, msgID, messageTypeResponse}.encodeXDR(c.xw) - c.xw.WriteBytes(data) - err := c.flush() - c.wmut.Unlock() + _, err := c.xw.WriteBytes(data) + if err == nil { + err = c.flush() + } + c.Unlock() buffers.Put(data) - if err != nil { - c.close <- err + c.close(err) } } @@ -439,16 +436,22 @@ func (c *rawConnection) pingerLoop() { for { select { case <-ticker: - go func() { - rc <- c.ping() - }() - select { - case ok := <-rc: - if !ok { - c.close <- fmt.Errorf("ping failure") + c.RLock() + ready := c.hasRecvdIndex && c.hasSentIndex + c.RUnlock() + + if ready { + go func() { + rc <- c.ping() + }() + select { + case ok := <-rc: + if !ok { + c.close(fmt.Errorf("ping failure")) + } + case <-time.After(pingTimeout): + c.close(fmt.Errorf("ping timeout")) } - case <-time.After(pingTimeout): - c.close <- fmt.Errorf("ping timeout") } case <-c.closed: return