syncthing/lib/protocol/protocol.go
Jakob Borg 76af9ba53d Implement facility based logger, debugging via REST API
This implements a new debug/trace infrastructure based on a slightly
hacked up logger. Instead of the traditional "if debug { ... }" I've
rewritten the logger to have no-op Debugln and Debugf, unless debugging
has been enabled for a given "facility". The "facility" is just a
string, typically a package name.

This will be slightly slower than before; but not that much as it's
mostly a function call that returns immediately. For the cases where it
matters (the Debugln takes a hex.Dump() of something for example, and
it's not in a very occasional "if err != nil" branch) there is an
l.ShouldDebug(facility) that is fast enough to be used like the old "if
debug".

The point of all this is that we can now toggle debugging for the
various packages on and off at runtime. There's a new method
/rest/system/debug that can be POSTed a set of facilities to enable and
disable debug for, or GET from to get a list of facilities with
descriptions and their current debug status.

Similarly a /rest/system/log?since=... can grab the latest log entries,
up to 250 of them (hardcoded constant in main.go) plus the initial few.

Not implemented in this commit (but planned) is a simple debug GUI
available on /debug that shows the current log in an easily pasteable
format and has checkboxes to enable the various debug facilities.

The debug instructions to a user then becomes "visit this URL, check
these boxes, reproduce your problem, copy and paste the log". The actual
log viewer on the hypothetical /debug URL can poll regularly for new log
entries and this bypass the 250 line limit.

The existing STTRACE=foo variable is still obeyed and just sets the
start state of the system.
2015-10-03 18:09:53 +02:00

757 lines
18 KiB
Go

// Copyright (C) 2014 The Protocol Authors.
package protocol
import (
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"sync"
"time"
lz4 "github.com/bkaradzic/go-lz4"
)
const (
// BlockSize is the standard ata block size (128 KiB)
BlockSize = 128 << 10
// MaxMessageLen is the largest message size allowed on the wire. (64 MiB)
MaxMessageLen = 64 << 20
)
const (
messageTypeClusterConfig = 0
messageTypeIndex = 1
messageTypeRequest = 2
messageTypeResponse = 3
messageTypePing = 4
messageTypeIndexUpdate = 6
messageTypeClose = 7
)
const (
stateInitial = iota
stateReady
)
// FileInfo flags
const (
FlagDeleted uint32 = 1 << 12
FlagInvalid = 1 << 13
FlagDirectory = 1 << 14
FlagNoPermBits = 1 << 15
FlagSymlink = 1 << 16
FlagSymlinkMissingTarget = 1 << 17
FlagsAll = (1 << 18) - 1
SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget
)
// IndexMessage message flags (for IndexUpdate)
const (
FlagIndexTemporary uint32 = 1 << iota
)
// Request message flags
const (
FlagRequestTemporary uint32 = 1 << iota
)
// ClusterConfigMessage.Folders.Devices flags
const (
FlagShareTrusted uint32 = 1 << 0
FlagShareReadOnly = 1 << 1
FlagIntroducer = 1 << 2
FlagShareBits = 0x000000ff
)
var (
ErrClosed = errors.New("connection closed")
ErrTimeout = errors.New("read timeout")
)
// Specific variants of empty messages...
type pingMessage struct{ EmptyMessage }
type Model interface {
// An index was received from the peer device
Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
// An index update was received from the peer device
IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
// A request was made by the peer device
Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error
// A cluster configuration message was received
ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
// The peer device closed the connection
Close(deviceID DeviceID, err error)
}
type Connection interface {
Start()
ID() DeviceID
Name() string
Index(folder string, files []FileInfo, flags uint32, options []Option) error
IndexUpdate(folder string, files []FileInfo, flags uint32, options []Option) error
Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error)
ClusterConfig(config ClusterConfigMessage)
Statistics() Statistics
}
type rawConnection struct {
id DeviceID
name string
receiver Model
cr *countingReader
cw *countingWriter
awaiting [4096]chan asyncResult
awaitingMut sync.Mutex
idxMut sync.Mutex // ensures serialization of Index calls
nextID chan int
outbox chan hdrMsg
closed chan struct{}
once sync.Once
pool sync.Pool
compression Compression
rdbuf0 []byte // used & reused by readMessage
rdbuf1 []byte // used & reused by readMessage
}
type asyncResult struct {
val []byte
err error
}
type hdrMsg struct {
hdr header
msg encodable
done chan struct{}
}
type encodable interface {
AppendXDR([]byte) ([]byte, error)
}
type isEofer interface {
IsEOF() bool
}
const (
// PingSendInterval is how often we make sure to send a message, by
// triggering pings if necessary.
PingSendInterval = 90 * time.Second
// ReceiveTimeout is the longest we'll wait for a message from the other
// side before closing the connection.
ReceiveTimeout = 300 * time.Second
)
func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress Compression) Connection {
cr := &countingReader{Reader: reader}
cw := &countingWriter{Writer: writer}
c := rawConnection{
id: deviceID,
name: name,
receiver: nativeModel{receiver},
cr: cr,
cw: cw,
outbox: make(chan hdrMsg),
nextID: make(chan int),
closed: make(chan struct{}),
pool: sync.Pool{
New: func() interface{} {
return make([]byte, BlockSize)
},
},
compression: compress,
}
return wireFormatConnection{&c}
}
// Start creates the goroutines for sending and receiving of messages. It must
// be called exactly once after creating a connection.
func (c *rawConnection) Start() {
go c.readerLoop()
go c.writerLoop()
go c.pingSender()
go c.pingReceiver()
go c.idGenerator()
}
func (c *rawConnection) ID() DeviceID {
return c.id
}
func (c *rawConnection) Name() string {
return c.name
}
// Index writes the list of file information to the connected peer device
func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, options []Option) error {
select {
case <-c.closed:
return ErrClosed
default:
}
c.idxMut.Lock()
c.send(-1, messageTypeIndex, IndexMessage{
Folder: folder,
Files: idx,
Flags: flags,
Options: options,
}, nil)
c.idxMut.Unlock()
return nil
}
// IndexUpdate writes the list of file information to the connected peer device as an update
func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32, options []Option) error {
select {
case <-c.closed:
return ErrClosed
default:
}
c.idxMut.Lock()
c.send(-1, messageTypeIndexUpdate, IndexMessage{
Folder: folder,
Files: idx,
Flags: flags,
Options: options,
}, nil)
c.idxMut.Unlock()
return nil
}
// Request returns the bytes for the specified block after fetching them from the connected peer.
func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
var id int
select {
case id = <-c.nextID:
case <-c.closed:
return nil, ErrClosed
}
c.awaitingMut.Lock()
if ch := c.awaiting[id]; ch != nil {
panic("id taken")
}
rc := make(chan asyncResult, 1)
c.awaiting[id] = rc
c.awaitingMut.Unlock()
ok := c.send(id, messageTypeRequest, RequestMessage{
Folder: folder,
Name: name,
Offset: offset,
Size: int32(size),
Hash: hash,
Flags: flags,
Options: options,
}, nil)
if !ok {
return nil, ErrClosed
}
res, ok := <-rc
if !ok {
return nil, ErrClosed
}
return res.val, res.err
}
// ClusterConfig send the cluster configuration message to the peer and returns any error
func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
c.send(-1, messageTypeClusterConfig, config, nil)
}
func (c *rawConnection) ping() bool {
var id int
select {
case id = <-c.nextID:
case <-c.closed:
return false
}
return c.send(id, messageTypePing, nil, nil)
}
func (c *rawConnection) readerLoop() (err error) {
defer func() {
c.close(err)
}()
state := stateInitial
for {
select {
case <-c.closed:
return ErrClosed
default:
}
hdr, msg, err := c.readMessage()
if err != nil {
return err
}
switch msg := msg.(type) {
case ClusterConfigMessage:
if state != stateInitial {
return fmt.Errorf("protocol error: cluster config message in state %d", state)
}
go c.receiver.ClusterConfig(c.id, msg)
state = stateReady
case IndexMessage:
switch hdr.msgType {
case messageTypeIndex:
if state != stateReady {
return fmt.Errorf("protocol error: index message in state %d", state)
}
c.handleIndex(msg)
state = stateReady
case messageTypeIndexUpdate:
if state != stateReady {
return fmt.Errorf("protocol error: index update message in state %d", state)
}
c.handleIndexUpdate(msg)
state = stateReady
}
case RequestMessage:
if state != stateReady {
return fmt.Errorf("protocol error: request message in state %d", state)
}
// Requests are handled asynchronously
go c.handleRequest(hdr.msgID, msg)
case ResponseMessage:
if state != stateReady {
return fmt.Errorf("protocol error: response message in state %d", state)
}
c.handleResponse(hdr.msgID, msg)
case pingMessage:
if state != stateReady {
return fmt.Errorf("protocol error: ping message in state %d", state)
}
// Nothing
case CloseMessage:
return errors.New(msg.Reason)
default:
return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
}
}
}
func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
if cap(c.rdbuf0) < 8 {
c.rdbuf0 = make([]byte, 8)
} else {
c.rdbuf0 = c.rdbuf0[:8]
}
_, err = io.ReadFull(c.cr, c.rdbuf0)
if err != nil {
return
}
hdr = decodeHeader(binary.BigEndian.Uint32(c.rdbuf0[0:4]))
msglen := int(binary.BigEndian.Uint32(c.rdbuf0[4:8]))
l.Debugf("read header %v (msglen=%d)", hdr, msglen)
if msglen > MaxMessageLen {
err = fmt.Errorf("message length %d exceeds maximum %d", msglen, MaxMessageLen)
return
}
if hdr.version != 0 {
err = fmt.Errorf("unknown protocol version 0x%x", hdr.version)
return
}
if cap(c.rdbuf0) < msglen {
c.rdbuf0 = make([]byte, msglen)
} else {
c.rdbuf0 = c.rdbuf0[:msglen]
}
_, err = io.ReadFull(c.cr, c.rdbuf0)
if err != nil {
return
}
l.Debugf("read %d bytes", len(c.rdbuf0))
msgBuf := c.rdbuf0
if hdr.compression && msglen > 0 {
c.rdbuf1 = c.rdbuf1[:cap(c.rdbuf1)]
c.rdbuf1, err = lz4.Decode(c.rdbuf1, c.rdbuf0)
if err != nil {
return
}
msgBuf = c.rdbuf1
l.Debugf("decompressed to %d bytes", len(msgBuf))
}
if len(msgBuf) > 1024 {
l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024]))
} else {
l.Debugf("message data:\n%s", hex.Dump(msgBuf))
}
// We check each returned error for the XDRError.IsEOF() method.
// IsEOF()==true here means that the message contained fewer fields than
// expected. It does not signify an EOF on the socket, because we've
// successfully read a size value and that many bytes already. New fields
// we expected but the other peer didn't send should be interpreted as
// zero/nil, and if that's not valid we'll verify it somewhere else.
switch hdr.msgType {
case messageTypeIndex, messageTypeIndexUpdate:
var idx IndexMessage
err = idx.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = idx
case messageTypeRequest:
var req RequestMessage
err = req.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = req
case messageTypeResponse:
var resp ResponseMessage
err = resp.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = resp
case messageTypePing:
msg = pingMessage{}
case messageTypeClusterConfig:
var cc ClusterConfigMessage
err = cc.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = cc
case messageTypeClose:
var cm CloseMessage
err = cm.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = cm
default:
err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
}
return
}
func (c *rawConnection) handleIndex(im IndexMessage) {
l.Debugf("Index(%v, %v, %d file, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
c.receiver.Index(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
}
func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
l.Debugf("queueing IndexUpdate(%v, %v, %d files, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
c.receiver.IndexUpdate(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
}
func filterIndexMessageFiles(fs []FileInfo) []FileInfo {
var out []FileInfo
for i, f := range fs {
switch f.Name {
case "", ".", "..", "/": // A few obviously invalid filenames
l.Infof("Dropping invalid filename %q from incoming index", f.Name)
if out == nil {
// Most incoming updates won't contain anything invalid, so we
// delay the allocation and copy to output slice until we
// really need to do it, then copy all the so var valid files
// to it.
out = make([]FileInfo, i, len(fs)-1)
copy(out, fs)
}
default:
if out != nil {
out = append(out, f)
}
}
}
if out != nil {
return out
}
return fs
}
func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
size := int(req.Size)
usePool := size <= BlockSize
var buf []byte
var done chan struct{}
if usePool {
buf = c.pool.Get().([]byte)[:size]
done = make(chan struct{})
} else {
buf = make([]byte, size)
}
err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), req.Hash, req.Flags, req.Options, buf)
if err != nil {
c.send(msgID, messageTypeResponse, ResponseMessage{
Data: nil,
Code: errorToCode(err),
}, done)
} else {
c.send(msgID, messageTypeResponse, ResponseMessage{
Data: buf,
Code: errorToCode(err),
}, done)
}
if usePool {
<-done
c.pool.Put(buf)
}
}
func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
c.awaitingMut.Lock()
if rc := c.awaiting[msgID]; rc != nil {
c.awaiting[msgID] = nil
rc <- asyncResult{resp.Data, codeToError(resp.Code)}
close(rc)
}
c.awaitingMut.Unlock()
}
func (c *rawConnection) handlePong(msgID int) {
c.awaitingMut.Lock()
if rc := c.awaiting[msgID]; rc != nil {
c.awaiting[msgID] = nil
rc <- asyncResult{}
close(rc)
}
c.awaitingMut.Unlock()
}
func (c *rawConnection) send(msgID int, msgType int, msg encodable, done chan struct{}) bool {
if msgID < 0 {
select {
case id := <-c.nextID:
msgID = id
case <-c.closed:
return false
}
}
hdr := header{
version: 0,
msgID: msgID,
msgType: msgType,
}
select {
case c.outbox <- hdrMsg{hdr, msg, done}:
return true
case <-c.closed:
return false
}
}
func (c *rawConnection) writerLoop() {
var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused
var uncBuf []byte // buffer for uncompressed message, kept and reused
for {
var tempBuf []byte
var err error
select {
case hm := <-c.outbox:
if hm.msg != nil {
// Uncompressed message in uncBuf
uncBuf, err = hm.msg.AppendXDR(uncBuf[:0])
if hm.done != nil {
close(hm.done)
}
if err != nil {
c.close(err)
return
}
compress := false
switch c.compression {
case CompressAlways:
compress = true
case CompressMetadata:
compress = hm.hdr.msgType != messageTypeResponse
}
if compress && len(uncBuf) >= compressionThreshold {
// Use compression for large messages
hm.hdr.compression = true
// Make sure we have enough space for the compressed message plus header in msgBug
msgBuf = msgBuf[:cap(msgBuf)]
if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) {
msgBuf = make([]byte, maxLen)
}
// Compressed is written to msgBuf, we keep tb for the length only
tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf)
binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf)))
msgBuf = msgBuf[0 : len(tempBuf)+8]
l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf))
} else {
// No point in compressing very short messages
hm.hdr.compression = false
msgBuf = msgBuf[:cap(msgBuf)]
if l := len(uncBuf) + 8; l > len(msgBuf) {
msgBuf = make([]byte, l)
}
binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf)))
msgBuf = msgBuf[0 : len(uncBuf)+8]
copy(msgBuf[8:], uncBuf)
l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf))
}
} else {
l.Debugf("write empty message; %v", hm.hdr)
binary.BigEndian.PutUint32(msgBuf[4:8], 0)
msgBuf = msgBuf[:8]
}
binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr))
if err == nil {
var n int
n, err = c.cw.Write(msgBuf)
l.Debugf("wrote %d bytes on the wire", n)
}
if err != nil {
c.close(err)
return
}
case <-c.closed:
return
}
}
}
func (c *rawConnection) close(err error) {
c.once.Do(func() {
close(c.closed)
c.awaitingMut.Lock()
for i, ch := range c.awaiting {
if ch != nil {
close(ch)
c.awaiting[i] = nil
}
}
c.awaitingMut.Unlock()
go c.receiver.Close(c.id, err)
})
}
func (c *rawConnection) idGenerator() {
nextID := 0
for {
nextID = (nextID + 1) & 0xfff
select {
case c.nextID <- nextID:
case <-c.closed:
return
}
}
}
// The pingSender makes sure that we've sent a message within the last
// PingSendInterval. If we already have something sent in the last
// PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
// results in an effecting ping interval of somewhere between
// PingSendInterval/2 and PingSendInterval.
func (c *rawConnection) pingSender() {
ticker := time.Tick(PingSendInterval / 2)
for {
select {
case <-ticker:
d := time.Since(c.cw.Last())
if d < PingSendInterval/2 {
l.Debugln(c.id, "ping skipped after wr", d)
continue
}
l.Debugln(c.id, "ping -> after", d)
c.ping()
case <-c.closed:
return
}
}
}
// The pingReciever checks that we've received a message (any message will do,
// but we expect pings in the absence of other messages) within the last
// ReceiveTimeout. If not, we close the connection with an ErrTimeout.
func (c *rawConnection) pingReceiver() {
ticker := time.Tick(ReceiveTimeout / 2)
for {
select {
case <-ticker:
d := time.Since(c.cr.Last())
if d > ReceiveTimeout {
l.Debugln(c.id, "ping timeout", d)
c.close(ErrTimeout)
}
l.Debugln(c.id, "last read within", d)
case <-c.closed:
return
}
}
}
type Statistics struct {
At time.Time
InBytesTotal int64
OutBytesTotal int64
}
func (c *rawConnection) Statistics() Statistics {
return Statistics{
At: time.Now(),
InBytesTotal: c.cr.Tot(),
OutBytesTotal: c.cw.Tot(),
}
}