lib/protocol: Revert unreleased changes related to closing connections (#5688)

This reverts commits:
    ec7c88ca55
    19b51c9b92
    5da41f75fa
    04b927104f
This commit is contained in:
Simon Frei 2019-05-08 08:08:26 +02:00 committed by Jakob Borg
parent 59e1349499
commit 283f39ae5f
1 changed files with 51 additions and 101 deletions

View File

@ -182,13 +182,11 @@ type rawConnection struct {
nextID int32 nextID int32
nextIDMut sync.Mutex nextIDMut sync.Mutex
sentClusterConfig chan struct{} outbox chan asyncMessage
outbox chan asyncMessage closed chan struct{}
closed chan struct{} closeOnce sync.Once
closeOnce sync.Once sendCloseOnce sync.Once
sendCloseOnce sync.Once compression Compression
wg sync.WaitGroup
compression Compression
} }
type asyncResult struct { type asyncResult struct {
@ -222,16 +220,15 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
cw := &countingWriter{Writer: writer} cw := &countingWriter{Writer: writer}
c := rawConnection{ c := rawConnection{
id: deviceID, id: deviceID,
name: name, name: name,
receiver: nativeModel{receiver}, receiver: nativeModel{receiver},
cr: cr, cr: cr,
cw: cw, cw: cw,
awaiting: make(map[int32]chan asyncResult), awaiting: make(map[int32]chan asyncResult),
sentClusterConfig: make(chan struct{}), outbox: make(chan asyncMessage),
outbox: make(chan asyncMessage), closed: make(chan struct{}),
closed: make(chan struct{}), compression: compress,
compression: compress,
} }
return wireFormatConnection{&c} return wireFormatConnection{&c}
@ -240,21 +237,13 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
// Start creates the goroutines for sending and receiving of messages. It must // Start creates the goroutines for sending and receiving of messages. It must
// be called exactly once after creating a connection. // be called exactly once after creating a connection.
func (c *rawConnection) Start() { func (c *rawConnection) Start() {
c.startGoroutine(c.readerLoop)
c.startGoroutine(c.writerLoop)
c.startGoroutine(c.pingSender)
c.startGoroutine(c.pingReceiver)
}
func (c *rawConnection) startGoroutine(loop func() error) {
c.wg.Add(1)
go func() { go func() {
err := loop() err := c.readerLoop()
c.wg.Done() c.internalClose(err)
if err != nil && err != ErrClosed {
c.internalClose(err)
}
}() }()
go c.writerLoop()
go c.pingSender()
go c.pingReceiver()
} }
func (c *rawConnection) ID() DeviceID { func (c *rawConnection) ID() DeviceID {
@ -333,20 +322,9 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
return res.val, res.err return res.val, res.err
} }
// ClusterConfig sends the cluster configuration message to the peer. // ClusterConfig send the cluster configuration message to the peer and returns any error
// It must be called just once (as per BEP).
func (c *rawConnection) ClusterConfig(config ClusterConfig) { func (c *rawConnection) ClusterConfig(config ClusterConfig) {
select { c.send(&config, nil)
case <-c.sentClusterConfig:
return
case <-c.closed:
return
default:
}
if err := c.writeMessage(&config); err != nil {
c.internalClose(err)
}
close(c.sentClusterConfig)
} }
func (c *rawConnection) Closed() bool { func (c *rawConnection) Closed() bool {
@ -370,44 +348,26 @@ func (c *rawConnection) ping() bool {
return c.send(&Ping{}, nil) return c.send(&Ping{}, nil)
} }
type messageWithError struct { func (c *rawConnection) readerLoop() (err error) {
msg message
err error
}
func (c *rawConnection) readerLoop() error {
fourByteBuf := make([]byte, 4) fourByteBuf := make([]byte, 4)
inbox := make(chan messageWithError)
// Reading from the wire may block until the underlying connection is closed.
go func() {
for {
msg, err := c.readMessage(fourByteBuf)
select {
case inbox <- messageWithError{msg: msg, err: err}:
case <-c.closed:
return
}
}
}()
state := stateInitial state := stateInitial
var msgWithErr messageWithError
for { for {
select { select {
case msgWithErr = <-inbox:
case <-c.closed: case <-c.closed:
return ErrClosed return ErrClosed
} default:
if msgWithErr.err != nil {
if msgWithErr.err == errUnknownMessage {
// Unknown message types are skipped, for future extensibility.
continue
}
return msgWithErr.err
} }
switch msg := msgWithErr.msg.(type) { msg, err := c.readMessage(fourByteBuf)
if err == errUnknownMessage {
// Unknown message types are skipped, for future extensibility.
continue
}
if err != nil {
return err
}
switch msg := msg.(type) {
case *ClusterConfig: case *ClusterConfig:
l.Debugln("read ClusterConfig message") l.Debugln("read ClusterConfig message")
if state != stateInitial { if state != stateInitial {
@ -667,26 +627,19 @@ func (c *rawConnection) handleResponse(resp Response) {
c.awaitingMut.Unlock() c.awaitingMut.Unlock()
} }
func (c *rawConnection) send(msg message, done chan struct{}) (sent bool) { func (c *rawConnection) send(msg message, done chan struct{}) bool {
defer func() {
if !sent && done != nil {
close(done)
}
}()
select {
case <-c.sentClusterConfig:
case <-c.closed:
return false
}
select { select {
case c.outbox <- asyncMessage{msg, done}: case c.outbox <- asyncMessage{msg, done}:
return true return true
case <-c.closed: case <-c.closed:
if done != nil {
close(done)
}
return false return false
} }
} }
func (c *rawConnection) writerLoop() error { func (c *rawConnection) writerLoop() {
for { for {
select { select {
case hm := <-c.outbox: case hm := <-c.outbox:
@ -695,11 +648,12 @@ func (c *rawConnection) writerLoop() error {
close(hm.done) close(hm.done)
} }
if err != nil { if err != nil {
return err c.internalClose(err)
return
} }
case <-c.closed: case <-c.closed:
return ErrClosed return
} }
} }
} }
@ -872,7 +826,10 @@ func (c *rawConnection) Close(err error) {
} }
}) })
c.internalClose(err) // No more sends are necessary, therefore further steps to close the
// connection outside of this package can proceed immediately.
// And this prevents a potential deadlock due to calling c.receiver.Closed
go c.internalClose(err)
} }
// internalClose is called if there is an unexpected error during normal operation. // internalClose is called if there is an unexpected error during normal operation.
@ -890,14 +847,7 @@ func (c *rawConnection) internalClose(err error) {
} }
c.awaitingMut.Unlock() c.awaitingMut.Unlock()
// Wait for all our operations to terminate before signaling c.receiver.Closed(c, err)
// to the receiver that the connection was closed.
c.wg.Wait()
// No more sends are necessary, therefore further steps to close the
// connection outside of this package can proceed immediately.
// And this prevents a potential deadlock.
go c.receiver.Closed(c, err)
}) })
} }
@ -906,7 +856,7 @@ func (c *rawConnection) internalClose(err error) {
// PingSendInterval/2, we do nothing. Otherwise we send a ping message. This // PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
// results in an effecting ping interval of somewhere between // results in an effecting ping interval of somewhere between
// PingSendInterval/2 and PingSendInterval. // PingSendInterval/2 and PingSendInterval.
func (c *rawConnection) pingSender() error { func (c *rawConnection) pingSender() {
ticker := time.NewTicker(PingSendInterval / 2) ticker := time.NewTicker(PingSendInterval / 2)
defer ticker.Stop() defer ticker.Stop()
@ -923,7 +873,7 @@ func (c *rawConnection) pingSender() error {
c.ping() c.ping()
case <-c.closed: case <-c.closed:
return ErrClosed return
} }
} }
} }
@ -931,7 +881,7 @@ func (c *rawConnection) pingSender() error {
// The pingReceiver checks that we've received a message (any message will do, // The pingReceiver checks that we've received a message (any message will do,
// but we expect pings in the absence of other messages) within the last // but we expect pings in the absence of other messages) within the last
// ReceiveTimeout. If not, we close the connection with an ErrTimeout. // ReceiveTimeout. If not, we close the connection with an ErrTimeout.
func (c *rawConnection) pingReceiver() error { func (c *rawConnection) pingReceiver() {
ticker := time.NewTicker(ReceiveTimeout / 2) ticker := time.NewTicker(ReceiveTimeout / 2)
defer ticker.Stop() defer ticker.Stop()
@ -941,13 +891,13 @@ func (c *rawConnection) pingReceiver() error {
d := time.Since(c.cr.Last()) d := time.Since(c.cr.Last())
if d > ReceiveTimeout { if d > ReceiveTimeout {
l.Debugln(c.id, "ping timeout", d) l.Debugln(c.id, "ping timeout", d)
return ErrTimeout c.internalClose(ErrTimeout)
} }
l.Debugln(c.id, "last read within", d) l.Debugln(c.id, "last read within", d)
case <-c.closed: case <-c.closed:
return ErrClosed return
} }
} }
} }