Clean up protocol locking and closing

This commit is contained in:
Jakob Borg 2014-07-03 13:37:20 +02:00
parent 2f5a822ca4
commit 4a6b43bcae
1 changed files with 27 additions and 47 deletions

View File

@ -81,15 +81,14 @@ type rawConnection struct {
reader io.ReadCloser reader io.ReadCloser
cr *countingReader cr *countingReader
xr *xdr.Reader xr *xdr.Reader
writer io.WriteCloser writer io.WriteCloser
cw *countingWriter
wb *bufio.Writer
xw *xdr.Writer
cw *countingWriter awaiting []chan asyncResult
wb *bufio.Writer awaitingMut sync.Mutex
xw *xdr.Writer
wmut sync.Mutex
awaiting []chan asyncResult
imut sync.Mutex
idxSent map[string]map[string]uint64 idxSent map[string]map[string]uint64
idxMut sync.Mutex // ensures serialization of Index calls idxMut sync.Mutex // ensures serialization of Index calls
@ -97,6 +96,7 @@ type rawConnection struct {
nextID chan int nextID chan int
outbox chan []encodable outbox chan []encodable
closed chan struct{} closed chan struct{}
once sync.Once
} }
type asyncResult struct { type asyncResult struct {
@ -192,13 +192,13 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int
return nil, ErrClosed return nil, ErrClosed
} }
c.imut.Lock() c.awaitingMut.Lock()
if ch := c.awaiting[id]; ch != nil { if ch := c.awaiting[id]; ch != nil {
panic("id taken") panic("id taken")
} }
rc := make(chan asyncResult) rc := make(chan asyncResult, 1)
c.awaiting[id] = rc c.awaiting[id] = rc
c.imut.Unlock() c.awaitingMut.Unlock()
ok := c.send(header{0, id, messageTypeRequest}, ok := c.send(header{0, id, messageTypeRequest},
RequestMessage{repo, name, uint64(offset), uint32(size)}) RequestMessage{repo, name, uint64(offset), uint32(size)})
@ -227,9 +227,9 @@ func (c *rawConnection) ping() bool {
} }
rc := make(chan asyncResult, 1) rc := make(chan asyncResult, 1)
c.imut.Lock() c.awaitingMut.Lock()
c.awaiting[id] = rc c.awaiting[id] = rc
c.imut.Unlock() c.awaitingMut.Unlock()
ok := c.send(header{0, id, messageTypePing}) ok := c.send(header{0, id, messageTypePing})
if !ok { if !ok {
@ -388,32 +388,25 @@ func (c *rawConnection) handleResponse(hdr header) error {
return err return err
} }
go func(hdr header, err error) { c.awaitingMut.Lock()
c.imut.Lock() if rc := c.awaiting[hdr.msgID]; rc != nil {
rc := c.awaiting[hdr.msgID]
c.awaiting[hdr.msgID] = nil c.awaiting[hdr.msgID] = nil
c.imut.Unlock() rc <- asyncResult{data, nil}
close(rc)
if rc != nil { }
rc <- asyncResult{data, err} c.awaitingMut.Unlock()
close(rc)
}
}(hdr, c.xr.Error())
return nil return nil
} }
func (c *rawConnection) handlePong(hdr header) { func (c *rawConnection) handlePong(hdr header) {
c.imut.Lock() c.awaitingMut.Lock()
if rc := c.awaiting[hdr.msgID]; rc != nil { if rc := c.awaiting[hdr.msgID]; rc != nil {
go func() {
rc <- asyncResult{}
close(rc)
}()
c.awaiting[hdr.msgID] = nil c.awaiting[hdr.msgID] = nil
rc <- asyncResult{}
close(rc)
} }
c.imut.Unlock() c.awaitingMut.Unlock()
} }
func (c *rawConnection) handleClusterConfig() error { func (c *rawConnection) handleClusterConfig() error {
@ -458,17 +451,14 @@ func (c *rawConnection) send(h header, es ...encodable) bool {
func (c *rawConnection) writerLoop() { func (c *rawConnection) writerLoop() {
var err error var err error
for es := range c.outbox { for es := range c.outbox {
c.wmut.Lock()
for _, e := range es { for _, e := range es {
e.encodeXDR(c.xw) e.encodeXDR(c.xw)
} }
if err = c.flush(); err != nil { if err = c.flush(); err != nil {
c.wmut.Unlock()
c.close(err) c.close(err)
return return
} }
c.wmut.Unlock()
} }
} }
@ -493,29 +483,20 @@ func (c *rawConnection) flush() error {
} }
func (c *rawConnection) close(err error) { func (c *rawConnection) close(err error) {
c.imut.Lock() c.once.Do(func() {
c.wmut.Lock()
defer c.imut.Unlock()
defer c.wmut.Unlock()
select {
case <-c.closed:
return
default:
close(c.closed) close(c.closed)
c.awaitingMut.Lock()
for i, ch := range c.awaiting { for i, ch := range c.awaiting {
if ch != nil { if ch != nil {
close(ch) close(ch)
c.awaiting[i] = nil c.awaiting[i] = nil
} }
} }
c.awaitingMut.Unlock()
c.writer.Close()
c.reader.Close()
go c.receiver.Close(c.id, err) go c.receiver.Close(c.id, err)
} })
} }
func (c *rawConnection) idGenerator() { func (c *rawConnection) idGenerator() {
@ -577,8 +558,7 @@ func (c *rawConnection) pingerLoop() {
func (c *rawConnection) processRequest(msgID int, req RequestMessage) { func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size)) data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
c.send(header{0, msgID, messageTypeResponse}, c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data))
encodableBytes(data))
} }
type Statistics struct { type Statistics struct {