From 3775a64d5c36e34e6b2330ad81ed4aae407cb271 Mon Sep 17 00:00:00 2001 From: Simon Frei Date: Mon, 27 May 2019 12:15:34 +0200 Subject: [PATCH] lib/protocol: Don't send anything else before cluster config (#5741) --- lib/protocol/protocol.go | 18 +++++++++++++-- lib/protocol/protocol_test.go | 42 +++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index 37e75a41b..76565f05b 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -184,6 +184,7 @@ type rawConnection struct { inbox chan message outbox chan asyncMessage + clusterConfigBox chan *ClusterConfig dispatcherLoopStopped chan struct{} closed chan struct{} closeOnce sync.Once @@ -230,6 +231,7 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv awaiting: make(map[int32]chan asyncResult), inbox: make(chan message), outbox: make(chan asyncMessage), + clusterConfigBox: make(chan *ClusterConfig), dispatcherLoopStopped: make(chan struct{}), closed: make(chan struct{}), compression: compress, @@ -327,9 +329,11 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i return res.val, res.err } -// ClusterConfig send the cluster configuration message to the peer and returns any error +// ClusterConfig sends the cluster configuration message to the peer. +// It must be called just once (as per BEP), otherwise it will panic. func (c *rawConnection) ClusterConfig(config ClusterConfig) { - c.send(&config, nil) + c.clusterConfigBox <- &config + close(c.clusterConfigBox) } func (c *rawConnection) Closed() bool { @@ -657,6 +661,16 @@ func (c *rawConnection) send(msg message, done chan struct{}) bool { } func (c *rawConnection) writerLoop() { + select { + case cc := <-c.clusterConfigBox: + err := c.writeMessage(cc) + if err != nil { + c.internalClose(err) + return + } + case <-c.closed: + return + } for { select { case hm := <-c.outbox: diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go index 6a4e2156a..0783f2fda 100644 --- a/lib/protocol/protocol_test.go +++ b/lib/protocol/protocol_test.go @@ -172,6 +172,48 @@ func TestCloseRace(t *testing.T) { } } +func TestClusterConfigFirst(t *testing.T) { + m := newTestModel() + + c := NewConnection(c0ID, &testutils.BlockingRW{}, &testutils.NoopRW{}, m, "name", CompressAlways).(wireFormatConnection).Connection.(*rawConnection) + c.Start() + + select { + case c.outbox <- asyncMessage{&Ping{}, nil}: + t.Fatal("able to send ping before cluster config") + case <-time.After(100 * time.Millisecond): + // Allow some time for c.writerLoop to setup after c.Start + } + + c.ClusterConfig(ClusterConfig{}) + + done := make(chan struct{}) + if ok := c.send(&Ping{}, done); !ok { + t.Fatal("send ping after cluster config returned false") + } + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timed out before ping was sent") + } + + done = make(chan struct{}) + go func() { + c.internalClose(errManual) + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("Close didn't return before timeout") + } + + if err := m.closedError(); err != errManual { + t.Fatal("Connection should be closed") + } +} + func TestMarshalIndexMessage(t *testing.T) { if testing.Short() { quickCfg.MaxCount = 10