diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index bed05c3a9..57a2661ed 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -540,7 +540,7 @@ func (c *rawConnection) readMessageAfterHeader(hdr Header, fourByteBuf []byte) ( // ... and is then unmarshalled - msg, err := c.newMessage(hdr.Type) + msg, err := newMessage(hdr.Type) if err != nil { BufferPool.Put(buf) return nil, err @@ -747,7 +747,7 @@ func (c *rawConnection) writeMessage(msg message) error { size := msg.ProtoSize() hdr := Header{ - Type: c.typeOf(msg), + Type: typeOf(msg), } hdrSize := hdr.ProtoSize() if hdrSize > 1<<16-1 { @@ -765,7 +765,7 @@ func (c *rawConnection) writeMessage(msg message) error { } if c.shouldCompressMessage(msg) { - ok, err := c.writeCompressedMessage(msg, buf[overhead:], overhead) + ok, err := c.writeCompressedMessage(msg, buf[overhead:]) if ok { return err } @@ -789,13 +789,13 @@ func (c *rawConnection) writeMessage(msg message) error { return nil } -// Write msg out compressed, given its uncompressed marshaled payload and overhead. +// Write msg out compressed, given its uncompressed marshaled payload. // // The first return value indicates whether compression succeeded. // If not, the caller should retry without compression. -func (c *rawConnection) writeCompressedMessage(msg message, marshaled []byte, overhead int) (ok bool, err error) { +func (c *rawConnection) writeCompressedMessage(msg message, marshaled []byte) (ok bool, err error) { hdr := Header{ - Type: c.typeOf(msg), + Type: typeOf(msg), Compression: MessageCompressionLZ4, } hdrSize := hdr.ProtoSize() @@ -804,13 +804,16 @@ func (c *rawConnection) writeCompressedMessage(msg message, marshaled []byte, ov } cOverhead := 2 + hdrSize + 4 - maxCompressed := cOverhead + lz4.CompressBlockBound(len(marshaled)) + // The compressed size may be at most n-n/32 = .96875*n bytes, + // I.e., if we can't save at least 3.125% bandwidth, we forgo compression. + // This number is arbitrary but cheap to compute. + maxCompressed := cOverhead + len(marshaled) - len(marshaled)/32 buf := BufferPool.Get(maxCompressed) defer BufferPool.Put(buf) compressedSize, err := lz4Compress(marshaled, buf[cOverhead:]) totSize := compressedSize + cOverhead - if err != nil || totSize >= len(marshaled)+overhead { + if err != nil { return false, nil } @@ -831,7 +834,7 @@ func (c *rawConnection) writeCompressedMessage(msg message, marshaled []byte, ov return true, nil } -func (c *rawConnection) typeOf(msg message) MessageType { +func typeOf(msg message) MessageType { switch msg.(type) { case *ClusterConfig: return MessageTypeClusterConfig @@ -854,7 +857,7 @@ func (c *rawConnection) typeOf(msg message) MessageType { } } -func (c *rawConnection) newMessage(t MessageType) (message, error) { +func newMessage(t MessageType) (message, error) { switch t { case MessageTypeClusterConfig: return new(ClusterConfig), nil @@ -1014,16 +1017,16 @@ func (c *rawConnection) Statistics() Statistics { } func lz4Compress(src, buf []byte) (int, error) { - // The compressed block is prefixed by the size of the uncompressed data. - binary.BigEndian.PutUint32(buf, uint32(len(src))) - n, err := lz4.CompressBlock(src, buf[4:], nil) if err != nil { return -1, err - } else if len(src) > 0 && n == 0 { + } else if n == 0 { return -1, errNotCompressible } + // The compressed block is prefixed by the size of the uncompressed data. + binary.BigEndian.PutUint32(buf, uint32(len(src))) + return n + 4, nil } diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go index 85e73011e..54e3ec70e 100644 --- a/lib/protocol/protocol_test.go +++ b/lib/protocol/protocol_test.go @@ -466,7 +466,7 @@ func TestWriteCompressed(t *testing.T) { t.Error("received the wrong message") } - hdr := Header{Type: c.typeOf(msg)} + hdr := Header{Type: typeOf(msg)} size := int64(2 + hdr.ProtoSize() + 4 + msg.ProtoSize()) if c.cr.tot > size { t.Errorf("compression enlarged message from %d to %d",