From 1a1f118f1a1fe9e40d43921ce234c0aa64fcfe54 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sun, 11 May 2014 14:30:29 -0300 Subject: [PATCH] Restructure protocol code with less locking --- integration/h1/config.xml | 3 + integration/h4/cert.pem | 23 ++ integration/h4/config.xml | 36 +++ integration/h4/key.pem | 39 ++++ integration/test.sh | 2 +- protocol/protocol.go | 469 +++++++++++++++++++++----------------- protocol/protocol_test.go | 4 +- 7 files changed, 360 insertions(+), 216 deletions(-) create mode 100644 integration/h4/cert.pem create mode 100644 integration/h4/config.xml create mode 100644 integration/h4/key.pem diff --git a/integration/h1/config.xml b/integration/h1/config.xml index cee29c1e8..b9b970ac7 100644 --- a/integration/h1/config.xml +++ b/integration/h1/config.xml @@ -9,6 +9,9 @@
127.0.0.1:22003
+ +
127.0.0.1:22004
+
diff --git a/integration/h4/cert.pem b/integration/h4/cert.pem new file mode 100644 index 000000000..eea071930 --- /dev/null +++ b/integration/h4/cert.pem @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIID3jCCAkigAwIBAgIBADALBgkqhkiG9w0BAQswFDESMBAGA1UEAxMJc3luY3Ro +aW5nMB4XDTE0MDUxMDAwNTM0N1oXDTQ5MTIzMTIzNTk1OVowFDESMBAGA1UEAxMJ +c3luY3RoaW5nMIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA9MRyBtAr +Sjt29azNoCWxx5xZF3RodBcQu+wv5sRR8lWozrr4brfUJLslcQHowqaAprOU1NP+ +BH12P5CSymsUrwAmCwSQ54CimXrNi5RiNMl7dtInJksk4Kp6nJgfyR7TqeQgqxtv ++skVWdJY7ptxqpVuDfkf1JnNr68dbANw8hEJpPaGm3qOt81YvSg37R75HiOCzv+h +FcSjKpPyFMvPARMCOHuZS0fYRJtI5nwmR0mWtKfnH/2204YNiQUne/8h2fgtkpxy +OjxKOs2KJxbmpV6Uur/YyGyinb5+Aa0df3KCBuZmE+i/AsZcTsk0fgefe+bshWG/ +hzrNfV0wsX3TYjYOSBJ04+f/uQW00G1GGSxPwTsShGqVuwfJkTqkjAXX5wcH+PgJ +ewG/dyMzKklMg19Y65WkhpWa/19o2KSZNw6TO8YM1arwT0STcMc+4fdrVB09lX6q +NJA8UL8hUX+jbKBzatDY64h1d9E8PE0ODHYgYFO2Ko7e2GnWCQeijGmnAgMBAAGj +PzA9MA4GA1UdDwEB/wQEAwIAoDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUH +AwIwDAYDVR0TAQH/BAIwADALBgkqhkiG9w0BAQsDggGBANFiHcATP5Lm11o65wbh +sKk7yteTapRohMoLNdW44YNyM8ZkELnrdNY8pe3CWSGy3spBH01+4jbUT+gSltQr +KTLVxSZ7f91696Og5ag4BQCeFY6ghKD/G9+PlBSj6yb3Y98NZsx8huLfylH+XuJw +2gP5Nqov4uXaKgYylx2gdaeCb2M+wM/br1DO2HCPCmgbZE5g8RM5JxzojGn/41Le +IbCd39zdI6NKj9c7T1Bxmt20uzca4nRgXVVzJymedEoF+//sBRk6PQzqgjgn/r3S +h9vrqo5j8ly/+ojFjBaVY7gq2XHM6/q0LTjeKkv2MUQw+vEEZX65GpBOgBZ8U0Wb +/NMUUhhDjGE/0G6TCJgq/HdkjmsNaWjO5sWjhnwXNImYXBdH4OenhXIrHcLhcnxN +2n5sPkDc6n0LVVV7VAjBPXcTmu2uOSK02yqNZLLWJygp1Wl6lbiqLS3bJgYrUv2m +YkRaR+IqVPw5EPs/QlH0qLBeCyIasaSWUVZeitVwRmqIUA== +-----END CERTIFICATE----- diff --git a/integration/h4/config.xml b/integration/h4/config.xml new file mode 100644 index 000000000..f36d9fd98 --- /dev/null +++ b/integration/h4/config.xml @@ -0,0 +1,36 @@ + + + + + + + + +
127.0.0.1:22001
+
+ +
127.0.0.1:22002
+
+ +
127.0.0.1:22003
+
+ +
dynamic
+
+ +
127.0.0.1:8084
+
+ + :22004 + announce.syncthing.net:22025 + false + true + 16 + 0 + 60 + 10 + 1000 + false + false + +
diff --git a/integration/h4/key.pem b/integration/h4/key.pem new file mode 100644 index 000000000..73f2c05df --- /dev/null +++ b/integration/h4/key.pem @@ -0,0 +1,39 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIG5AIBAAKCAYEA9MRyBtArSjt29azNoCWxx5xZF3RodBcQu+wv5sRR8lWozrr4 +brfUJLslcQHowqaAprOU1NP+BH12P5CSymsUrwAmCwSQ54CimXrNi5RiNMl7dtIn +Jksk4Kp6nJgfyR7TqeQgqxtv+skVWdJY7ptxqpVuDfkf1JnNr68dbANw8hEJpPaG +m3qOt81YvSg37R75HiOCzv+hFcSjKpPyFMvPARMCOHuZS0fYRJtI5nwmR0mWtKfn +H/2204YNiQUne/8h2fgtkpxyOjxKOs2KJxbmpV6Uur/YyGyinb5+Aa0df3KCBuZm +E+i/AsZcTsk0fgefe+bshWG/hzrNfV0wsX3TYjYOSBJ04+f/uQW00G1GGSxPwTsS +hGqVuwfJkTqkjAXX5wcH+PgJewG/dyMzKklMg19Y65WkhpWa/19o2KSZNw6TO8YM +1arwT0STcMc+4fdrVB09lX6qNJA8UL8hUX+jbKBzatDY64h1d9E8PE0ODHYgYFO2 +Ko7e2GnWCQeijGmnAgMBAAECggGBAIjKaLdqC2d3CCqQonJH3q0hsaCsC9wlL9L2 +UmbzfKCkQq0WTNUDo2nLtUcMvBpclzWS0zCGMUYtH7Kyh3bclTigKqKpsJnQiA6i +VNEW4jOCDp//HqYGBNwSKmftlIX/1mbx+VfnA5PyYR5LsivXb5TX4iOpAKL+Obdf +dF/zJGIEJ5GrvNqTicMq3dcI7Qh18N9pFSe+MTZLKK0Y9Yetx0hgaTNL0AYEZtcg +uYMmCvZ4J+Namo6EanKYTmQvHzvq/tZVMvud9Gcr6uKKtVBcgex9S/R7IicaKg78 +oDTgH0nDrpI55pZCX8vuVGk8nVTXXLTsMR1XojOpiYjS6ucfTkPEw3fOW/YRhHg5 +93TrdDiWkqSWube5LNUF87q65t/aw/y2EH2aTNqcPD5OQ+EZRS8OGYPqOrJ4Ycbp +j6CMSE+LX2IDMQyJ+9J0vPHtFsAviBKQkPoQ1L6mvhJuw6ksy34NQGykNDHz7nQK +SeqvCJ6XCtaWNkq+00lC3UFaGsjuUQKBwQD8+y370co5G7G5GDLbLE3i+pguUN7L +5YfDj5qqsM9hOJNqeKAHrKFP2ii0F9WxGw/ruY0k8k7zUt6LepgwkCI5BYfckRKJ +g8YsNTizjqPLRGtiqL9Garjo+xPxFGj+TkTg9fYD4xTWFa1I15zzCu7Ye7xObeEH +LRtcm3R4fU54JDrKtKDccoQmTEAzsxRdNXi9ifc7qgjGBH9W02guuGPY4ltT1aZR +bcO5vpi44Fnl2h6d7N6iwCtFJ0CaT1pAZ4UCgcEA97Asf5DTDWKByZBhk+VvuT1b +6nMYjqKxDNMmCaomCmk8Mif0w9SEJmAg0b/gbs/H6T78a+9WjbN5q9xHcDU91uax +TdCenTq7H981AjgUG7OA7XwYn+AKy+hGSnsTJglMJzJm6TGt+Sq0oO9EahBRDlsP +PiQRot2gyQfubwcl3rhdErRwaCM92BUyPkC2fy2OppAeZOOxxuzxrvHflDOuDGCZ +KPCmy6U9HV0JOAO2FSNJeZdNLBixXa1Pk8TgbLY7AoHBAPG7lhn9Qg3Fz9H9NINH +13jfWdFQB0SwJEWTEAiwgMj2ha6Eau5KX63s2V4VNGVSZakqmZtHSneppOuEjq5A +2+K+zS7PFPaACzos9OxmjU7rJu2UL4m66sv9NvXzOcxev+RyQs0+DKfw+K8VEG0Q +8l+8BJiw2AjCalXYWbfUjMmyXNdbOCbN6kaqL+L26KuUL7Z1gd/qPw3wODmgMvoJ +yabxzLDUA2PlzdPMMyTdhCllfkILmEXN+MrQkiOhVa0a/QKBwGZjAhH9ePD4fnQm +5d8wIb3uGlfRGh6kLBIEGp42IqF9HPASykBFUhdW91odOhY0eAv4CHpJpnrO7QXY ++gLtT1HNbQ+gpGCUTZQAPbZcHhvRWQNSoA8+mtftfVj+hUzc3Qj68cWFzsfIGoDI +R3ycoBUSGTvzxwKPIQ7Y43wr9UCa74Zy5mB16POw12MadxYda/F4c8f6w5taiRFr +VKO7tT/Skp101U4rURcZRV1NU3BrdMz5eWI4FuGFafbIlIj7zwKBwHCt3VQt+JmZ +OhCJR+8Q+jT0JvnMu1zi4CcMRiT8FbNdZDY/3B0wG4ySTNrEikFzIjihF4zIp2nv +nD3qKQs+THl51GA8AnP9bNk7hknD7rXUuScndccTW58+PGrjqfwJp/1MEeOJQpoX +0JML1w+dIKHzsKN0X6UL7Gyq8m+0SJKmQQguan3d3M8CMpnW0srgqOfJ+q1+bz8b +6FuJeijoaN8+zyKkN+9R91Erw5pk+7vJRzEpDtkhprEE5tLNDKrXJw== +-----END RSA PRIVATE KEY----- diff --git a/integration/test.sh b/integration/test.sh index dd6599539..c04526b67 100755 --- a/integration/test.sh +++ b/integration/test.sh @@ -14,7 +14,7 @@ go build json.go start() { echo "Starting..." - for i in 1 2 3 ; do + for i in 1 2 3 4 ; do STPROFILER=":909$i" syncthing -home "h$i" & done } diff --git a/protocol/protocol.go b/protocol/protocol.go index 501574b6b..fd60d6501 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/calmh/syncthing/buffers" "github.com/calmh/syncthing/xdr" ) @@ -72,16 +71,18 @@ type rawConnection struct { xr *xdr.Reader writer io.WriteCloser - cw *countingWriter - wb *bufio.Writer - xw *xdr.Writer - wmut sync.Mutex - closed bool + cw *countingWriter + wb *bufio.Writer + xw *xdr.Writer + wmut sync.Mutex - awaiting map[int]chan asyncResult - nextID int indexSent map[string]map[string][2]int64 + awaiting []chan asyncResult imut sync.Mutex + + nextID chan int + outbox chan []encodable + closed chan struct{} } type asyncResult struct { @@ -115,12 +116,17 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M cw: cw, wb: wb, xw: xdr.NewWriter(wb), - awaiting: make(map[int]chan asyncResult), + awaiting: make([]chan asyncResult, 0x1000), indexSent: make(map[string]map[string][2]int64), + outbox: make(chan []encodable), + nextID: make(chan int), + closed: make(chan struct{}), } go c.readerLoop() + go c.writerLoop() go c.pingerLoop() + go c.idGenerator() return wireFormatConnection{&c} } @@ -131,10 +137,6 @@ func (c *rawConnection) ID() string { // Index writes the list of file information to the connected peer node func (c *rawConnection) Index(repo string, idx []FileInfo) { - if c.isClosed() { - return - } - c.imut.Lock() var msgType int if c.indexSent[repo] == nil { @@ -157,48 +159,32 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) { } idx = diff } - - id := c.nextID - c.nextID = (c.nextID + 1) & 0xfff c.imut.Unlock() - c.wmut.Lock() - header{0, id, msgType}.encodeXDR(c.xw) - IndexMessage{repo, idx}.encodeXDR(c.xw) - err := c.flush() - c.wmut.Unlock() - - if err != nil { - c.close(err) - return - } + c.send(header{0, -1, msgType}, IndexMessage{repo, idx}) } // Request returns the bytes for the specified block after fetching them from the connected peer. func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) { - if c.isClosed() { + var id int + select { + case id = <-c.nextID: + case <-c.closed: return nil, ErrClosed } c.imut.Lock() - id := c.nextID - c.nextID = (c.nextID + 1) & 0xfff - rc := make(chan asyncResult) - if _, ok := c.awaiting[id]; ok { + if ch := c.awaiting[id]; ch != nil { panic("id taken") } + rc := make(chan asyncResult) c.awaiting[id] = rc c.imut.Unlock() - c.wmut.Lock() - header{0, id, messageTypeRequest}.encodeXDR(c.xw) - RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw) - err := c.flush() - c.wmut.Unlock() - - if err != nil { - c.close(err) - return nil, err + ok := c.send(header{0, id, messageTypeRequest}, + RequestMessage{repo, name, uint64(offset), uint32(size)}) + if !ok { + return nil, ErrClosed } res, ok := <-rc @@ -210,45 +196,24 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int // ClusterConfig send the cluster configuration message to the peer and returns any error func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) { - if c.isClosed() { - return - } - - c.imut.Lock() - id := c.nextID - c.nextID = (c.nextID + 1) & 0xfff - c.imut.Unlock() - - c.wmut.Lock() - header{0, id, messageTypeClusterConfig}.encodeXDR(c.xw) - config.encodeXDR(c.xw) - err := c.flush() - c.wmut.Unlock() - - if err != nil { - c.close(err) - } + c.send(header{0, -1, messageTypeClusterConfig}, config) } func (c *rawConnection) ping() bool { - if c.isClosed() { + var id int + select { + case id = <-c.nextID: + case <-c.closed: return false } - c.imut.Lock() - id := c.nextID - c.nextID = (c.nextID + 1) & 0xfff rc := make(chan asyncResult, 1) + c.imut.Lock() c.awaiting[id] = rc c.imut.Unlock() - c.wmut.Lock() - header{0, id, messageTypePing}.encodeXDR(c.xw) - err := c.flush() - c.wmut.Unlock() - - if err != nil { - c.close(err) + ok := c.send(header{0, id, messageTypePing}) + if !ok { return false } @@ -256,6 +221,196 @@ func (c *rawConnection) ping() bool { return ok && res.err == nil } +func (c *rawConnection) readerLoop() (err error) { + defer func() { + c.close(err) + }() + + for { + select { + case <-c.closed: + return ErrClosed + default: + } + + var hdr header + hdr.decodeXDR(c.xr) + if err := c.xr.Error(); err != nil { + return err + } + if hdr.version != 0 { + return fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version) + } + + switch hdr.msgType { + case messageTypeIndex: + if err := c.handleIndex(); err != nil { + return err + } + + case messageTypeIndexUpdate: + if err := c.handleIndexUpdate(); err != nil { + return err + } + + case messageTypeRequest: + if err := c.handleRequest(hdr); err != nil { + return err + } + + case messageTypeResponse: + if err := c.handleResponse(hdr); err != nil { + return err + } + + case messageTypePing: + c.send(header{0, hdr.msgID, messageTypePong}) + + case messageTypePong: + c.handlePong(hdr) + + case messageTypeClusterConfig: + if err := c.handleClusterConfig(); err != nil { + return err + } + + default: + return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) + } + } +} + +func (c *rawConnection) handleIndex() error { + var im IndexMessage + im.decodeXDR(c.xr) + if err := c.xr.Error(); err != nil { + return err + } else { + + // We run this (and the corresponding one for update, below) + // in a separate goroutine to avoid blocking the read loop. + // There is otherwise a potential deadlock where both sides + // has the model locked because it's sending a large index + // update and can't receive the large index update from the + // other side. + + go c.receiver.Index(c.id, im.Repository, im.Files) + } + return nil +} + +func (c *rawConnection) handleIndexUpdate() error { + var im IndexMessage + im.decodeXDR(c.xr) + if err := c.xr.Error(); err != nil { + return err + } else { + go c.receiver.IndexUpdate(c.id, im.Repository, im.Files) + } + return nil +} + +func (c *rawConnection) handleRequest(hdr header) error { + var req RequestMessage + req.decodeXDR(c.xr) + if err := c.xr.Error(); err != nil { + return err + } + go c.processRequest(hdr.msgID, req) + return nil +} + +func (c *rawConnection) handleResponse(hdr header) error { + data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size + + if err := c.xr.Error(); err != nil { + return err + } + + go func(hdr header, err error) { + c.imut.Lock() + rc := c.awaiting[hdr.msgID] + c.awaiting[hdr.msgID] = nil + c.imut.Unlock() + + if rc != nil { + rc <- asyncResult{data, err} + close(rc) + } + }(hdr, c.xr.Error()) + + return nil +} + +func (c *rawConnection) handlePong(hdr header) { + c.imut.Lock() + if rc := c.awaiting[hdr.msgID]; rc != nil { + go func() { + rc <- asyncResult{} + close(rc) + }() + + c.awaiting[hdr.msgID] = nil + } + c.imut.Unlock() +} + +func (c *rawConnection) handleClusterConfig() error { + var cm ClusterConfigMessage + cm.decodeXDR(c.xr) + if err := c.xr.Error(); err != nil { + return err + } else { + go c.receiver.ClusterConfig(c.id, cm) + } + return nil +} + +type encodable interface { + encodeXDR(*xdr.Writer) (int, error) +} +type encodableBytes []byte + +func (e encodableBytes) encodeXDR(xw *xdr.Writer) (int, error) { + return xw.WriteBytes(e) +} + +func (c *rawConnection) send(h header, es ...encodable) bool { + if h.msgID < 0 { + select { + case id := <-c.nextID: + h.msgID = id + case <-c.closed: + return false + } + } + msg := append([]encodable{h}, es...) + + select { + case c.outbox <- msg: + return true + case <-c.closed: + return false + } +} + +func (c *rawConnection) writerLoop() { + var err error + for es := range c.outbox { + c.wmut.Lock() + for _, e := range es { + e.encodeXDR(c.xw) + } + + if err = c.flush(); err != nil { + c.wmut.Unlock() + c.close(err) + return + } + c.wmut.Unlock() + } +} + type flusher interface { Flush() error } @@ -282,154 +437,35 @@ func (c *rawConnection) close(err error) { defer c.imut.Unlock() defer c.wmut.Unlock() - if c.closed { + select { + case <-c.closed: return - } + default: + close(c.closed) - c.closed = true - - for _, ch := range c.awaiting { - close(ch) - } - c.awaiting = nil - c.writer.Close() - c.reader.Close() - - c.receiver.Close(c.id, err) -} - -func (c *rawConnection) isClosed() bool { - c.wmut.Lock() - defer c.wmut.Unlock() - return c.closed -} - -func (c *rawConnection) readerLoop() { -loop: - for !c.isClosed() { - var hdr header - hdr.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - c.close(err) - break loop - } - if hdr.version != 0 { - c.close(fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version)) - break loop + for i, ch := range c.awaiting { + if ch != nil { + close(ch) + c.awaiting[i] = nil + } } - switch hdr.msgType { - case messageTypeIndex: - var im IndexMessage - im.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - c.close(err) - break loop - } else { + c.writer.Close() + c.reader.Close() - // We run this (and the corresponding one for update, below) - // in a separate goroutine to avoid blocking the read loop. - // There is otherwise a potential deadlock where both sides - // has the model locked because it's sending a large index - // update and can't receive the large index update from the - // other side. - - go c.receiver.Index(c.id, im.Repository, im.Files) - } - - case messageTypeIndexUpdate: - var im IndexMessage - im.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - c.close(err) - break loop - } else { - go c.receiver.IndexUpdate(c.id, im.Repository, im.Files) - } - - case messageTypeRequest: - var req RequestMessage - req.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - c.close(err) - break loop - } - go c.processRequest(hdr.msgID, req) - - case messageTypeResponse: - data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size - - if err := c.xr.Error(); err != nil { - c.close(err) - break loop - } - - go func(hdr header, err error) { - c.imut.Lock() - rc, ok := c.awaiting[hdr.msgID] - delete(c.awaiting, hdr.msgID) - c.imut.Unlock() - - if ok { - rc <- asyncResult{data, err} - close(rc) - } - }(hdr, c.xr.Error()) - - case messageTypePing: - c.wmut.Lock() - header{0, hdr.msgID, messageTypePong}.encodeXDR(c.xw) - err := c.flush() - c.wmut.Unlock() - if err != nil { - c.close(err) - break loop - } - - case messageTypePong: - c.imut.Lock() - rc, ok := c.awaiting[hdr.msgID] - - if ok { - go func() { - rc <- asyncResult{} - close(rc) - }() - - delete(c.awaiting, hdr.msgID) - } - c.imut.Unlock() - - case messageTypeClusterConfig: - var cm ClusterConfigMessage - cm.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - c.close(err) - break loop - } else { - go c.receiver.ClusterConfig(c.id, cm) - } - - default: - c.close(fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)) - break loop - } + c.receiver.Close(c.id, err) } } -func (c *rawConnection) processRequest(msgID int, req RequestMessage) { - data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size)) - - c.wmut.Lock() - header{0, msgID, messageTypeResponse}.encodeXDR(c.xw) - c.xw.WriteBytes(data) - err := c.flush() - c.wmut.Unlock() - - buffers.Put(data) - - if err != nil { - c.close(err) +func (c *rawConnection) idGenerator() { + nextID := 0 + for { + nextID = (nextID + 1) & 0xfff + select { + case c.nextID <- nextID: + case <-c.closed: + return + } } } @@ -437,9 +473,6 @@ func (c *rawConnection) pingerLoop() { var rc = make(chan bool, 1) ticker := time.Tick(pingIdleTime / 2) for { - if c.isClosed() { - return - } select { case <-ticker: go func() { @@ -452,11 +485,23 @@ func (c *rawConnection) pingerLoop() { } case <-time.After(pingTimeout): c.close(fmt.Errorf("ping timeout")) + case <-c.closed: + return } + + case <-c.closed: + return } } } +func (c *rawConnection) processRequest(msgID int, req RequestMessage) { + data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size)) + + c.send(header{0, msgID, messageTypeResponse}, + encodableBytes(data)) +} + type Statistics struct { At time.Time InBytesTotal int diff --git a/protocol/protocol_test.go b/protocol/protocol_test.go index 80c4f83a3..aaf8b0e66 100644 --- a/protocol/protocol_test.go +++ b/protocol/protocol_test.go @@ -174,9 +174,7 @@ func TestClose(t *testing.T) { c0.close(nil) - if !c0.isClosed() { - t.Fatal("Connection should be closed") - } + <-c0.closed if !m0.isClosed() { t.Fatal("Connection should be closed") }