From 632bcae856cf8781580887a9a6e82ca22cc35414 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 30 Dec 2013 22:10:54 -0500 Subject: [PATCH] Mostly lock free receive loop --- protocol/protocol.go | 70 ++++++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/protocol/protocol.go b/protocol/protocol.go index 80e93d55b..2ee21b6ff 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -59,9 +59,6 @@ type Connection struct { lastStatistics Statistics statisticsLock sync.Mutex - - lastReceive time.Time - lastReceiveLock sync.RWMutex } var ErrClosed = errors.New("Connection closed") @@ -71,8 +68,10 @@ type asyncResult struct { err error } -const pingTimeout = 2 * time.Minute -const pingIdleTime = 5 * time.Minute +const ( + pingTimeout = 2 * time.Minute + pingIdleTime = 5 * time.Minute +) func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) *Connection { flrd := flate.NewReader(reader) @@ -88,7 +87,6 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M writer: flwr, mwriter: &marshalWriter{w: flwr}, awaiting: make(map[int]chan asyncResult), - lastReceive: time.Now(), ID: nodeID, lastStatistics: Statistics{At: time.Now()}, } @@ -102,7 +100,6 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M // Index writes the list of file information to the connected peer node func (c *Connection) Index(idx []FileInfo) { c.Lock() - var msgType int if c.indexSent == nil { // This is the first time we send an index. @@ -234,26 +231,24 @@ func (c *Connection) isClosed() bool { } func (c *Connection) readerLoop() { - for !c.isClosed() { +loop: + for { hdr := c.mreader.readHeader() if c.mreader.err != nil { c.Close(c.mreader.err) - break + break loop } if hdr.version != 0 { c.Close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version)) - break + break loop } - c.lastReceiveLock.Lock() - c.lastReceive = time.Now() - c.lastReceiveLock.Unlock() - switch hdr.msgType { case messageTypeIndex: files := c.mreader.readIndex() if c.mreader.err != nil { c.Close(c.mreader.err) + break loop } else { c.receiver.Index(c.ID, files) } @@ -262,18 +257,25 @@ func (c *Connection) readerLoop() { files := c.mreader.readIndex() if c.mreader.err != nil { c.Close(c.mreader.err) + break loop } else { c.receiver.IndexUpdate(c.ID, files) } case messageTypeRequest: - c.processRequest(hdr.msgID) + req := c.mreader.readRequest() + if c.mreader.err != nil { + c.Close(c.mreader.err) + break loop + } + go c.processRequest(hdr.msgID, req) case messageTypeResponse: data := c.mreader.readResponse() if c.mreader.err != nil { c.Close(c.mreader.err) + break loop } else { c.Lock() rc, ok := c.awaiting[hdr.msgID] @@ -293,8 +295,10 @@ func (c *Connection) readerLoop() { c.Unlock() if err != nil { c.Close(err) + break loop } else if c.mwriter.err != nil { c.Close(c.mwriter.err) + break loop } case messageTypePong: @@ -313,35 +317,31 @@ func (c *Connection) readerLoop() { default: c.Close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType)) + break loop } } } -func (c *Connection) processRequest(msgID int) { - req := c.mreader.readRequest() - if c.mreader.err != nil { - c.Close(c.mreader.err) - } else { - go func() { - data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash) - c.Lock() - c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse})) - c.mwriter.writeResponse(data) - err := c.flush() - c.Unlock() - buffers.Put(data) - if err != nil { - c.Close(err) - } else if c.mwriter.err != nil { - c.Close(c.mwriter.err) - } - }() +func (c *Connection) processRequest(msgID int, req request) { + data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash) + + c.Lock() + c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse})) + c.mwriter.writeResponse(data) + err := c.flush() + c.Unlock() + + buffers.Put(data) + if err != nil { + c.Close(err) + } else if c.mwriter.err != nil { + c.Close(c.mwriter.err) } } func (c *Connection) pingerLoop() { var rc = make(chan bool, 1) - for !c.isClosed() { + for { time.Sleep(pingIdleTime / 2) go func() { rc <- c.Ping()