Make sure connection is added to m.protoConn and m.rawConn before it's Start()ed (fixes #2034)

This commit is contained in:
Jakob Borg 2015-07-10 16:37:57 +10:00
parent 5bb8ea7449
commit 0c28216ee5
6 changed files with 27 additions and 3 deletions

2
Godeps/Godeps.json generated
View File

@ -35,7 +35,7 @@
}, },
{ {
"ImportPath": "github.com/syncthing/protocol", "ImportPath": "github.com/syncthing/protocol",
"Rev": "9dd6f848bdcd3550606158a33d6aa98de6ea0cdc" "Rev": "b29cfce29e9af56b07b311d27eedceff805f92e2"
}, },
{ {
"ImportPath": "github.com/syndtr/goleveldb/leveldb", "ImportPath": "github.com/syndtr/goleveldb/leveldb",

View File

@ -89,6 +89,7 @@ type Model interface {
} }
type Connection interface { type Connection interface {
Start()
ID() DeviceID ID() DeviceID
Name() string Name() string
Index(folder string, files []FileInfo, flags uint32, options []Option) error Index(folder string, files []FileInfo, flags uint32, options []Option) error
@ -161,12 +162,16 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
compression: compress, compression: compress,
} }
return wireFormatConnection{&c}
}
// Start creates the goroutines for sending a receiving of messages. It must
// be called exactly once after creating a connection.
func (c *rawConnection) Start() {
go c.readerLoop() go c.readerLoop()
go c.writerLoop() go c.writerLoop()
go c.pingerLoop() go c.pingerLoop()
go c.idGenerator() go c.idGenerator()
return wireFormatConnection{&c}
} }
func (c *rawConnection) ID() DeviceID { func (c *rawConnection) ID() DeviceID {

View File

@ -68,7 +68,9 @@ func TestPing(t *testing.T) {
br, bw := io.Pipe() br, bw := io.Pipe()
c0 := NewConnection(c0ID, ar, bw, newTestModel(), "name", CompressAlways).(wireFormatConnection).next.(*rawConnection) c0 := NewConnection(c0ID, ar, bw, newTestModel(), "name", CompressAlways).(wireFormatConnection).next.(*rawConnection)
c0.Start()
c1 := NewConnection(c1ID, br, aw, newTestModel(), "name", CompressAlways).(wireFormatConnection).next.(*rawConnection) c1 := NewConnection(c1ID, br, aw, newTestModel(), "name", CompressAlways).(wireFormatConnection).next.(*rawConnection)
c1.Start()
c0.ClusterConfig(ClusterConfigMessage{}) c0.ClusterConfig(ClusterConfigMessage{})
c1.ClusterConfig(ClusterConfigMessage{}) c1.ClusterConfig(ClusterConfigMessage{})
@ -94,7 +96,9 @@ func TestPingErr(t *testing.T) {
ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e} ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e}
c0 := NewConnection(c0ID, ar, ebw, m0, "name", CompressAlways).(wireFormatConnection).next.(*rawConnection) c0 := NewConnection(c0ID, ar, ebw, m0, "name", CompressAlways).(wireFormatConnection).next.(*rawConnection)
c0.Start()
c1 := NewConnection(c1ID, br, eaw, m1, "name", CompressAlways) c1 := NewConnection(c1ID, br, eaw, m1, "name", CompressAlways)
c1.Start()
c0.ClusterConfig(ClusterConfigMessage{}) c0.ClusterConfig(ClusterConfigMessage{})
c1.ClusterConfig(ClusterConfigMessage{}) c1.ClusterConfig(ClusterConfigMessage{})
@ -174,7 +178,9 @@ func TestVersionErr(t *testing.T) {
br, bw := io.Pipe() br, bw := io.Pipe()
c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).next.(*rawConnection) c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).next.(*rawConnection)
c0.Start()
c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways) c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways)
c1.Start()
c0.ClusterConfig(ClusterConfigMessage{}) c0.ClusterConfig(ClusterConfigMessage{})
c1.ClusterConfig(ClusterConfigMessage{}) c1.ClusterConfig(ClusterConfigMessage{})
@ -199,7 +205,9 @@ func TestTypeErr(t *testing.T) {
br, bw := io.Pipe() br, bw := io.Pipe()
c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).next.(*rawConnection) c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).next.(*rawConnection)
c0.Start()
c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways) c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways)
c1.Start()
c0.ClusterConfig(ClusterConfigMessage{}) c0.ClusterConfig(ClusterConfigMessage{})
c1.ClusterConfig(ClusterConfigMessage{}) c1.ClusterConfig(ClusterConfigMessage{})
@ -224,7 +232,9 @@ func TestClose(t *testing.T) {
br, bw := io.Pipe() br, bw := io.Pipe()
c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).next.(*rawConnection) c0 := NewConnection(c0ID, ar, bw, m0, "name", CompressAlways).(wireFormatConnection).next.(*rawConnection)
c0.Start()
c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways) c1 := NewConnection(c1ID, br, aw, m1, "name", CompressAlways)
c1.Start()
c0.ClusterConfig(ClusterConfigMessage{}) c0.ClusterConfig(ClusterConfigMessage{})
c1.ClusterConfig(ClusterConfigMessage{}) c1.ClusterConfig(ClusterConfigMessage{})

View File

@ -12,6 +12,10 @@ type wireFormatConnection struct {
next Connection next Connection
} }
func (c wireFormatConnection) Start() {
c.next.Start()
}
func (c wireFormatConnection) ID() DeviceID { func (c wireFormatConnection) ID() DeviceID {
return c.next.ID() return c.next.ID()
} }

View File

@ -983,6 +983,8 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection)
} }
m.rawConn[deviceID] = rawConn m.rawConn[deviceID] = rawConn
protoConn.Start()
cm := m.clusterConfig(deviceID) cm := m.clusterConfig(deviceID)
protoConn.ClusterConfig(cm) protoConn.ClusterConfig(cm)

View File

@ -242,6 +242,9 @@ func (FakeConnection) Close() error {
return nil return nil
} }
func (f FakeConnection) Start() {
}
func (f FakeConnection) ID() protocol.DeviceID { func (f FakeConnection) ID() protocol.DeviceID {
return f.id return f.id
} }