diff --git a/integration/h1/config.xml b/integration/h1/config.xml
index cee29c1e8..b9b970ac7 100644
--- a/integration/h1/config.xml
+++ b/integration/h1/config.xml
@@ -9,6 +9,9 @@
127.0.0.1:22003
+
+ 127.0.0.1:22004
+
diff --git a/integration/h4/cert.pem b/integration/h4/cert.pem
new file mode 100644
index 000000000..eea071930
--- /dev/null
+++ b/integration/h4/cert.pem
@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIID3jCCAkigAwIBAgIBADALBgkqhkiG9w0BAQswFDESMBAGA1UEAxMJc3luY3Ro
+aW5nMB4XDTE0MDUxMDAwNTM0N1oXDTQ5MTIzMTIzNTk1OVowFDESMBAGA1UEAxMJ
+c3luY3RoaW5nMIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA9MRyBtAr
+Sjt29azNoCWxx5xZF3RodBcQu+wv5sRR8lWozrr4brfUJLslcQHowqaAprOU1NP+
+BH12P5CSymsUrwAmCwSQ54CimXrNi5RiNMl7dtInJksk4Kp6nJgfyR7TqeQgqxtv
++skVWdJY7ptxqpVuDfkf1JnNr68dbANw8hEJpPaGm3qOt81YvSg37R75HiOCzv+h
+FcSjKpPyFMvPARMCOHuZS0fYRJtI5nwmR0mWtKfnH/2204YNiQUne/8h2fgtkpxy
+OjxKOs2KJxbmpV6Uur/YyGyinb5+Aa0df3KCBuZmE+i/AsZcTsk0fgefe+bshWG/
+hzrNfV0wsX3TYjYOSBJ04+f/uQW00G1GGSxPwTsShGqVuwfJkTqkjAXX5wcH+PgJ
+ewG/dyMzKklMg19Y65WkhpWa/19o2KSZNw6TO8YM1arwT0STcMc+4fdrVB09lX6q
+NJA8UL8hUX+jbKBzatDY64h1d9E8PE0ODHYgYFO2Ko7e2GnWCQeijGmnAgMBAAGj
+PzA9MA4GA1UdDwEB/wQEAwIAoDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUH
+AwIwDAYDVR0TAQH/BAIwADALBgkqhkiG9w0BAQsDggGBANFiHcATP5Lm11o65wbh
+sKk7yteTapRohMoLNdW44YNyM8ZkELnrdNY8pe3CWSGy3spBH01+4jbUT+gSltQr
+KTLVxSZ7f91696Og5ag4BQCeFY6ghKD/G9+PlBSj6yb3Y98NZsx8huLfylH+XuJw
+2gP5Nqov4uXaKgYylx2gdaeCb2M+wM/br1DO2HCPCmgbZE5g8RM5JxzojGn/41Le
+IbCd39zdI6NKj9c7T1Bxmt20uzca4nRgXVVzJymedEoF+//sBRk6PQzqgjgn/r3S
+h9vrqo5j8ly/+ojFjBaVY7gq2XHM6/q0LTjeKkv2MUQw+vEEZX65GpBOgBZ8U0Wb
+/NMUUhhDjGE/0G6TCJgq/HdkjmsNaWjO5sWjhnwXNImYXBdH4OenhXIrHcLhcnxN
+2n5sPkDc6n0LVVV7VAjBPXcTmu2uOSK02yqNZLLWJygp1Wl6lbiqLS3bJgYrUv2m
+YkRaR+IqVPw5EPs/QlH0qLBeCyIasaSWUVZeitVwRmqIUA==
+-----END CERTIFICATE-----
diff --git a/integration/h4/config.xml b/integration/h4/config.xml
new file mode 100644
index 000000000..f36d9fd98
--- /dev/null
+++ b/integration/h4/config.xml
@@ -0,0 +1,36 @@
+
+
+
+
+
+
+
+
+ 127.0.0.1:22001
+
+
+ 127.0.0.1:22002
+
+
+ 127.0.0.1:22003
+
+
+ dynamic
+
+
+ 127.0.0.1:8084
+
+
+ :22004
+ announce.syncthing.net:22025
+ false
+ true
+ 16
+ 0
+ 60
+ 10
+ 1000
+ false
+ false
+
+
diff --git a/integration/h4/key.pem b/integration/h4/key.pem
new file mode 100644
index 000000000..73f2c05df
--- /dev/null
+++ b/integration/h4/key.pem
@@ -0,0 +1,39 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIG5AIBAAKCAYEA9MRyBtArSjt29azNoCWxx5xZF3RodBcQu+wv5sRR8lWozrr4
+brfUJLslcQHowqaAprOU1NP+BH12P5CSymsUrwAmCwSQ54CimXrNi5RiNMl7dtIn
+Jksk4Kp6nJgfyR7TqeQgqxtv+skVWdJY7ptxqpVuDfkf1JnNr68dbANw8hEJpPaG
+m3qOt81YvSg37R75HiOCzv+hFcSjKpPyFMvPARMCOHuZS0fYRJtI5nwmR0mWtKfn
+H/2204YNiQUne/8h2fgtkpxyOjxKOs2KJxbmpV6Uur/YyGyinb5+Aa0df3KCBuZm
+E+i/AsZcTsk0fgefe+bshWG/hzrNfV0wsX3TYjYOSBJ04+f/uQW00G1GGSxPwTsS
+hGqVuwfJkTqkjAXX5wcH+PgJewG/dyMzKklMg19Y65WkhpWa/19o2KSZNw6TO8YM
+1arwT0STcMc+4fdrVB09lX6qNJA8UL8hUX+jbKBzatDY64h1d9E8PE0ODHYgYFO2
+Ko7e2GnWCQeijGmnAgMBAAECggGBAIjKaLdqC2d3CCqQonJH3q0hsaCsC9wlL9L2
+UmbzfKCkQq0WTNUDo2nLtUcMvBpclzWS0zCGMUYtH7Kyh3bclTigKqKpsJnQiA6i
+VNEW4jOCDp//HqYGBNwSKmftlIX/1mbx+VfnA5PyYR5LsivXb5TX4iOpAKL+Obdf
+dF/zJGIEJ5GrvNqTicMq3dcI7Qh18N9pFSe+MTZLKK0Y9Yetx0hgaTNL0AYEZtcg
+uYMmCvZ4J+Namo6EanKYTmQvHzvq/tZVMvud9Gcr6uKKtVBcgex9S/R7IicaKg78
+oDTgH0nDrpI55pZCX8vuVGk8nVTXXLTsMR1XojOpiYjS6ucfTkPEw3fOW/YRhHg5
+93TrdDiWkqSWube5LNUF87q65t/aw/y2EH2aTNqcPD5OQ+EZRS8OGYPqOrJ4Ycbp
+j6CMSE+LX2IDMQyJ+9J0vPHtFsAviBKQkPoQ1L6mvhJuw6ksy34NQGykNDHz7nQK
+SeqvCJ6XCtaWNkq+00lC3UFaGsjuUQKBwQD8+y370co5G7G5GDLbLE3i+pguUN7L
+5YfDj5qqsM9hOJNqeKAHrKFP2ii0F9WxGw/ruY0k8k7zUt6LepgwkCI5BYfckRKJ
+g8YsNTizjqPLRGtiqL9Garjo+xPxFGj+TkTg9fYD4xTWFa1I15zzCu7Ye7xObeEH
+LRtcm3R4fU54JDrKtKDccoQmTEAzsxRdNXi9ifc7qgjGBH9W02guuGPY4ltT1aZR
+bcO5vpi44Fnl2h6d7N6iwCtFJ0CaT1pAZ4UCgcEA97Asf5DTDWKByZBhk+VvuT1b
+6nMYjqKxDNMmCaomCmk8Mif0w9SEJmAg0b/gbs/H6T78a+9WjbN5q9xHcDU91uax
+TdCenTq7H981AjgUG7OA7XwYn+AKy+hGSnsTJglMJzJm6TGt+Sq0oO9EahBRDlsP
+PiQRot2gyQfubwcl3rhdErRwaCM92BUyPkC2fy2OppAeZOOxxuzxrvHflDOuDGCZ
+KPCmy6U9HV0JOAO2FSNJeZdNLBixXa1Pk8TgbLY7AoHBAPG7lhn9Qg3Fz9H9NINH
+13jfWdFQB0SwJEWTEAiwgMj2ha6Eau5KX63s2V4VNGVSZakqmZtHSneppOuEjq5A
+2+K+zS7PFPaACzos9OxmjU7rJu2UL4m66sv9NvXzOcxev+RyQs0+DKfw+K8VEG0Q
+8l+8BJiw2AjCalXYWbfUjMmyXNdbOCbN6kaqL+L26KuUL7Z1gd/qPw3wODmgMvoJ
+yabxzLDUA2PlzdPMMyTdhCllfkILmEXN+MrQkiOhVa0a/QKBwGZjAhH9ePD4fnQm
+5d8wIb3uGlfRGh6kLBIEGp42IqF9HPASykBFUhdW91odOhY0eAv4CHpJpnrO7QXY
++gLtT1HNbQ+gpGCUTZQAPbZcHhvRWQNSoA8+mtftfVj+hUzc3Qj68cWFzsfIGoDI
+R3ycoBUSGTvzxwKPIQ7Y43wr9UCa74Zy5mB16POw12MadxYda/F4c8f6w5taiRFr
+VKO7tT/Skp101U4rURcZRV1NU3BrdMz5eWI4FuGFafbIlIj7zwKBwHCt3VQt+JmZ
+OhCJR+8Q+jT0JvnMu1zi4CcMRiT8FbNdZDY/3B0wG4ySTNrEikFzIjihF4zIp2nv
+nD3qKQs+THl51GA8AnP9bNk7hknD7rXUuScndccTW58+PGrjqfwJp/1MEeOJQpoX
+0JML1w+dIKHzsKN0X6UL7Gyq8m+0SJKmQQguan3d3M8CMpnW0srgqOfJ+q1+bz8b
+6FuJeijoaN8+zyKkN+9R91Erw5pk+7vJRzEpDtkhprEE5tLNDKrXJw==
+-----END RSA PRIVATE KEY-----
diff --git a/integration/test.sh b/integration/test.sh
index dd6599539..c04526b67 100755
--- a/integration/test.sh
+++ b/integration/test.sh
@@ -14,7 +14,7 @@ go build json.go
start() {
echo "Starting..."
- for i in 1 2 3 ; do
+ for i in 1 2 3 4 ; do
STPROFILER=":909$i" syncthing -home "h$i" &
done
}
diff --git a/protocol/protocol.go b/protocol/protocol.go
index 501574b6b..fd60d6501 100644
--- a/protocol/protocol.go
+++ b/protocol/protocol.go
@@ -9,7 +9,6 @@ import (
"sync"
"time"
- "github.com/calmh/syncthing/buffers"
"github.com/calmh/syncthing/xdr"
)
@@ -72,16 +71,18 @@ type rawConnection struct {
xr *xdr.Reader
writer io.WriteCloser
- cw *countingWriter
- wb *bufio.Writer
- xw *xdr.Writer
- wmut sync.Mutex
- closed bool
+ cw *countingWriter
+ wb *bufio.Writer
+ xw *xdr.Writer
+ wmut sync.Mutex
- awaiting map[int]chan asyncResult
- nextID int
indexSent map[string]map[string][2]int64
+ awaiting []chan asyncResult
imut sync.Mutex
+
+ nextID chan int
+ outbox chan []encodable
+ closed chan struct{}
}
type asyncResult struct {
@@ -115,12 +116,17 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
cw: cw,
wb: wb,
xw: xdr.NewWriter(wb),
- awaiting: make(map[int]chan asyncResult),
+ awaiting: make([]chan asyncResult, 0x1000),
indexSent: make(map[string]map[string][2]int64),
+ outbox: make(chan []encodable),
+ nextID: make(chan int),
+ closed: make(chan struct{}),
}
go c.readerLoop()
+ go c.writerLoop()
go c.pingerLoop()
+ go c.idGenerator()
return wireFormatConnection{&c}
}
@@ -131,10 +137,6 @@ func (c *rawConnection) ID() string {
// Index writes the list of file information to the connected peer node
func (c *rawConnection) Index(repo string, idx []FileInfo) {
- if c.isClosed() {
- return
- }
-
c.imut.Lock()
var msgType int
if c.indexSent[repo] == nil {
@@ -157,48 +159,32 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) {
}
idx = diff
}
-
- id := c.nextID
- c.nextID = (c.nextID + 1) & 0xfff
c.imut.Unlock()
- c.wmut.Lock()
- header{0, id, msgType}.encodeXDR(c.xw)
- IndexMessage{repo, idx}.encodeXDR(c.xw)
- err := c.flush()
- c.wmut.Unlock()
-
- if err != nil {
- c.close(err)
- return
- }
+ c.send(header{0, -1, msgType}, IndexMessage{repo, idx})
}
// Request returns the bytes for the specified block after fetching them from the connected peer.
func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
- if c.isClosed() {
+ var id int
+ select {
+ case id = <-c.nextID:
+ case <-c.closed:
return nil, ErrClosed
}
c.imut.Lock()
- id := c.nextID
- c.nextID = (c.nextID + 1) & 0xfff
- rc := make(chan asyncResult)
- if _, ok := c.awaiting[id]; ok {
+ if ch := c.awaiting[id]; ch != nil {
panic("id taken")
}
+ rc := make(chan asyncResult)
c.awaiting[id] = rc
c.imut.Unlock()
- c.wmut.Lock()
- header{0, id, messageTypeRequest}.encodeXDR(c.xw)
- RequestMessage{repo, name, uint64(offset), uint32(size)}.encodeXDR(c.xw)
- err := c.flush()
- c.wmut.Unlock()
-
- if err != nil {
- c.close(err)
- return nil, err
+ ok := c.send(header{0, id, messageTypeRequest},
+ RequestMessage{repo, name, uint64(offset), uint32(size)})
+ if !ok {
+ return nil, ErrClosed
}
res, ok := <-rc
@@ -210,45 +196,24 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
// ClusterConfig send the cluster configuration message to the peer and returns any error
func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
- if c.isClosed() {
- return
- }
-
- c.imut.Lock()
- id := c.nextID
- c.nextID = (c.nextID + 1) & 0xfff
- c.imut.Unlock()
-
- c.wmut.Lock()
- header{0, id, messageTypeClusterConfig}.encodeXDR(c.xw)
- config.encodeXDR(c.xw)
- err := c.flush()
- c.wmut.Unlock()
-
- if err != nil {
- c.close(err)
- }
+ c.send(header{0, -1, messageTypeClusterConfig}, config)
}
func (c *rawConnection) ping() bool {
- if c.isClosed() {
+ var id int
+ select {
+ case id = <-c.nextID:
+ case <-c.closed:
return false
}
- c.imut.Lock()
- id := c.nextID
- c.nextID = (c.nextID + 1) & 0xfff
rc := make(chan asyncResult, 1)
+ c.imut.Lock()
c.awaiting[id] = rc
c.imut.Unlock()
- c.wmut.Lock()
- header{0, id, messageTypePing}.encodeXDR(c.xw)
- err := c.flush()
- c.wmut.Unlock()
-
- if err != nil {
- c.close(err)
+ ok := c.send(header{0, id, messageTypePing})
+ if !ok {
return false
}
@@ -256,6 +221,196 @@ func (c *rawConnection) ping() bool {
return ok && res.err == nil
}
+func (c *rawConnection) readerLoop() (err error) {
+ defer func() {
+ c.close(err)
+ }()
+
+ for {
+ select {
+ case <-c.closed:
+ return ErrClosed
+ default:
+ }
+
+ var hdr header
+ hdr.decodeXDR(c.xr)
+ if err := c.xr.Error(); err != nil {
+ return err
+ }
+ if hdr.version != 0 {
+ return fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version)
+ }
+
+ switch hdr.msgType {
+ case messageTypeIndex:
+ if err := c.handleIndex(); err != nil {
+ return err
+ }
+
+ case messageTypeIndexUpdate:
+ if err := c.handleIndexUpdate(); err != nil {
+ return err
+ }
+
+ case messageTypeRequest:
+ if err := c.handleRequest(hdr); err != nil {
+ return err
+ }
+
+ case messageTypeResponse:
+ if err := c.handleResponse(hdr); err != nil {
+ return err
+ }
+
+ case messageTypePing:
+ c.send(header{0, hdr.msgID, messageTypePong})
+
+ case messageTypePong:
+ c.handlePong(hdr)
+
+ case messageTypeClusterConfig:
+ if err := c.handleClusterConfig(); err != nil {
+ return err
+ }
+
+ default:
+ return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
+ }
+ }
+}
+
+func (c *rawConnection) handleIndex() error {
+ var im IndexMessage
+ im.decodeXDR(c.xr)
+ if err := c.xr.Error(); err != nil {
+ return err
+ } else {
+
+ // We run this (and the corresponding one for update, below)
+ // in a separate goroutine to avoid blocking the read loop.
+ // There is otherwise a potential deadlock where both sides
+ // has the model locked because it's sending a large index
+ // update and can't receive the large index update from the
+ // other side.
+
+ go c.receiver.Index(c.id, im.Repository, im.Files)
+ }
+ return nil
+}
+
+func (c *rawConnection) handleIndexUpdate() error {
+ var im IndexMessage
+ im.decodeXDR(c.xr)
+ if err := c.xr.Error(); err != nil {
+ return err
+ } else {
+ go c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
+ }
+ return nil
+}
+
+func (c *rawConnection) handleRequest(hdr header) error {
+ var req RequestMessage
+ req.decodeXDR(c.xr)
+ if err := c.xr.Error(); err != nil {
+ return err
+ }
+ go c.processRequest(hdr.msgID, req)
+ return nil
+}
+
+func (c *rawConnection) handleResponse(hdr header) error {
+ data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
+
+ if err := c.xr.Error(); err != nil {
+ return err
+ }
+
+ go func(hdr header, err error) {
+ c.imut.Lock()
+ rc := c.awaiting[hdr.msgID]
+ c.awaiting[hdr.msgID] = nil
+ c.imut.Unlock()
+
+ if rc != nil {
+ rc <- asyncResult{data, err}
+ close(rc)
+ }
+ }(hdr, c.xr.Error())
+
+ return nil
+}
+
+func (c *rawConnection) handlePong(hdr header) {
+ c.imut.Lock()
+ if rc := c.awaiting[hdr.msgID]; rc != nil {
+ go func() {
+ rc <- asyncResult{}
+ close(rc)
+ }()
+
+ c.awaiting[hdr.msgID] = nil
+ }
+ c.imut.Unlock()
+}
+
+func (c *rawConnection) handleClusterConfig() error {
+ var cm ClusterConfigMessage
+ cm.decodeXDR(c.xr)
+ if err := c.xr.Error(); err != nil {
+ return err
+ } else {
+ go c.receiver.ClusterConfig(c.id, cm)
+ }
+ return nil
+}
+
+type encodable interface {
+ encodeXDR(*xdr.Writer) (int, error)
+}
+type encodableBytes []byte
+
+func (e encodableBytes) encodeXDR(xw *xdr.Writer) (int, error) {
+ return xw.WriteBytes(e)
+}
+
+func (c *rawConnection) send(h header, es ...encodable) bool {
+ if h.msgID < 0 {
+ select {
+ case id := <-c.nextID:
+ h.msgID = id
+ case <-c.closed:
+ return false
+ }
+ }
+ msg := append([]encodable{h}, es...)
+
+ select {
+ case c.outbox <- msg:
+ return true
+ case <-c.closed:
+ return false
+ }
+}
+
+func (c *rawConnection) writerLoop() {
+ var err error
+ for es := range c.outbox {
+ c.wmut.Lock()
+ for _, e := range es {
+ e.encodeXDR(c.xw)
+ }
+
+ if err = c.flush(); err != nil {
+ c.wmut.Unlock()
+ c.close(err)
+ return
+ }
+ c.wmut.Unlock()
+ }
+}
+
type flusher interface {
Flush() error
}
@@ -282,154 +437,35 @@ func (c *rawConnection) close(err error) {
defer c.imut.Unlock()
defer c.wmut.Unlock()
- if c.closed {
+ select {
+ case <-c.closed:
return
- }
+ default:
+ close(c.closed)
- c.closed = true
-
- for _, ch := range c.awaiting {
- close(ch)
- }
- c.awaiting = nil
- c.writer.Close()
- c.reader.Close()
-
- c.receiver.Close(c.id, err)
-}
-
-func (c *rawConnection) isClosed() bool {
- c.wmut.Lock()
- defer c.wmut.Unlock()
- return c.closed
-}
-
-func (c *rawConnection) readerLoop() {
-loop:
- for !c.isClosed() {
- var hdr header
- hdr.decodeXDR(c.xr)
- if err := c.xr.Error(); err != nil {
- c.close(err)
- break loop
- }
- if hdr.version != 0 {
- c.close(fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version))
- break loop
+ for i, ch := range c.awaiting {
+ if ch != nil {
+ close(ch)
+ c.awaiting[i] = nil
+ }
}
- switch hdr.msgType {
- case messageTypeIndex:
- var im IndexMessage
- im.decodeXDR(c.xr)
- if err := c.xr.Error(); err != nil {
- c.close(err)
- break loop
- } else {
+ c.writer.Close()
+ c.reader.Close()
- // We run this (and the corresponding one for update, below)
- // in a separate goroutine to avoid blocking the read loop.
- // There is otherwise a potential deadlock where both sides
- // has the model locked because it's sending a large index
- // update and can't receive the large index update from the
- // other side.
-
- go c.receiver.Index(c.id, im.Repository, im.Files)
- }
-
- case messageTypeIndexUpdate:
- var im IndexMessage
- im.decodeXDR(c.xr)
- if err := c.xr.Error(); err != nil {
- c.close(err)
- break loop
- } else {
- go c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
- }
-
- case messageTypeRequest:
- var req RequestMessage
- req.decodeXDR(c.xr)
- if err := c.xr.Error(); err != nil {
- c.close(err)
- break loop
- }
- go c.processRequest(hdr.msgID, req)
-
- case messageTypeResponse:
- data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
-
- if err := c.xr.Error(); err != nil {
- c.close(err)
- break loop
- }
-
- go func(hdr header, err error) {
- c.imut.Lock()
- rc, ok := c.awaiting[hdr.msgID]
- delete(c.awaiting, hdr.msgID)
- c.imut.Unlock()
-
- if ok {
- rc <- asyncResult{data, err}
- close(rc)
- }
- }(hdr, c.xr.Error())
-
- case messageTypePing:
- c.wmut.Lock()
- header{0, hdr.msgID, messageTypePong}.encodeXDR(c.xw)
- err := c.flush()
- c.wmut.Unlock()
- if err != nil {
- c.close(err)
- break loop
- }
-
- case messageTypePong:
- c.imut.Lock()
- rc, ok := c.awaiting[hdr.msgID]
-
- if ok {
- go func() {
- rc <- asyncResult{}
- close(rc)
- }()
-
- delete(c.awaiting, hdr.msgID)
- }
- c.imut.Unlock()
-
- case messageTypeClusterConfig:
- var cm ClusterConfigMessage
- cm.decodeXDR(c.xr)
- if err := c.xr.Error(); err != nil {
- c.close(err)
- break loop
- } else {
- go c.receiver.ClusterConfig(c.id, cm)
- }
-
- default:
- c.close(fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType))
- break loop
- }
+ c.receiver.Close(c.id, err)
}
}
-func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
- data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
-
- c.wmut.Lock()
- header{0, msgID, messageTypeResponse}.encodeXDR(c.xw)
- c.xw.WriteBytes(data)
- err := c.flush()
- c.wmut.Unlock()
-
- buffers.Put(data)
-
- if err != nil {
- c.close(err)
+func (c *rawConnection) idGenerator() {
+ nextID := 0
+ for {
+ nextID = (nextID + 1) & 0xfff
+ select {
+ case c.nextID <- nextID:
+ case <-c.closed:
+ return
+ }
}
}
@@ -437,9 +473,6 @@ func (c *rawConnection) pingerLoop() {
var rc = make(chan bool, 1)
ticker := time.Tick(pingIdleTime / 2)
for {
- if c.isClosed() {
- return
- }
select {
case <-ticker:
go func() {
@@ -452,11 +485,23 @@ func (c *rawConnection) pingerLoop() {
}
case <-time.After(pingTimeout):
c.close(fmt.Errorf("ping timeout"))
+ case <-c.closed:
+ return
}
+
+ case <-c.closed:
+ return
}
}
}
+func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
+ data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
+
+ c.send(header{0, msgID, messageTypeResponse},
+ encodableBytes(data))
+}
+
type Statistics struct {
At time.Time
InBytesTotal int
diff --git a/protocol/protocol_test.go b/protocol/protocol_test.go
index 80c4f83a3..aaf8b0e66 100644
--- a/protocol/protocol_test.go
+++ b/protocol/protocol_test.go
@@ -174,9 +174,7 @@ func TestClose(t *testing.T) {
c0.close(nil)
- if !c0.isClosed() {
- t.Fatal("Connection should be closed")
- }
+ <-c0.closed
if !m0.isClosed() {
t.Fatal("Connection should be closed")
}