Update protocol package

This commit is contained in:
Audrius Butkevicius 2015-03-26 21:49:49 +00:00
parent fc0cb704f2
commit 941f637bca
11 changed files with 154 additions and 92 deletions

2
Godeps/Godeps.json generated
View File

@ -31,7 +31,7 @@
},
{
"ImportPath": "github.com/syncthing/protocol",
"Rev": "f9132cae85dcda1caba2f4ba78996d348b00ac6c"
"Rev": "6277c0595c18d42e9db75dfe900463ef093a82d2"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb",

View File

@ -13,6 +13,9 @@ type TestModel struct {
name string
offset int64
size int
hash []byte
flags uint32
options []Option
closedCh chan bool
}
@ -22,17 +25,20 @@ func newTestModel() *TestModel {
}
}
func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo) {
func (t *TestModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
}
func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) {
func (t *TestModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
}
func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, size int) ([]byte, error) {
func (t *TestModel) Request(deviceID DeviceID, folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
t.folder = folder
t.name = name
t.offset = offset
t.size = size
t.hash = hash
t.flags = flags
t.options = options
return t.data, nil
}

View File

@ -0,0 +1,51 @@
// Copyright (C) 2014 The Protocol Authors.
package protocol
import (
"errors"
)
const (
ecNoError int32 = iota
ecGeneric
ecNoSuchFile
ecInvalid
)
var (
ErrNoError error = nil
ErrGeneric = errors.New("generic error")
ErrNoSuchFile = errors.New("no such file")
ErrInvalid = errors.New("file is invalid")
)
var lookupError = map[int32]error{
ecNoError: ErrNoError,
ecGeneric: ErrGeneric,
ecNoSuchFile: ErrNoSuchFile,
ecInvalid: ErrInvalid,
}
var lookupCode = map[error]int32{
ErrNoError: ecNoError,
ErrGeneric: ecGeneric,
ErrNoSuchFile: ecNoSuchFile,
ErrInvalid: ecInvalid,
}
func codeToError(errcode int32) error {
err, ok := lookupError[errcode]
if !ok {
return ErrGeneric
}
return err
}
func errorToCode(err error) int32 {
code, ok := lookupCode[err]
if !ok {
return ecGeneric
}
return code
}

View File

@ -1,5 +1,6 @@
// Copyright (C) 2014 The Protocol Authors.
//go:generate -command genxdr go run ../syncthing/Godeps/_workspace/src/github.com/calmh/xdr/cmd/genxdr/main.go
//go:generate genxdr -o message_xdr.go message.go
package protocol
@ -78,8 +79,8 @@ type RequestMessage struct {
}
type ResponseMessage struct {
Data []byte
Error int32
Data []byte
Code int32
}
type ClusterConfigMessage struct {

View File

@ -465,13 +465,13 @@ ResponseMessage Structure:
\ Data (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Error |
| Code |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct ResponseMessage {
opaque Data<>;
int Error;
int Code;
}
*/
@ -502,7 +502,7 @@ func (o ResponseMessage) AppendXDR(bs []byte) ([]byte, error) {
func (o ResponseMessage) EncodeXDRInto(xw *xdr.Writer) (int, error) {
xw.WriteBytes(o.Data)
xw.WriteUint32(uint32(o.Error))
xw.WriteUint32(uint32(o.Code))
return xw.Tot(), xw.Error()
}
@ -519,7 +519,7 @@ func (o *ResponseMessage) UnmarshalXDR(bs []byte) error {
func (o *ResponseMessage) DecodeXDRFrom(xr *xdr.Reader) error {
o.Data = xr.ReadBytes()
o.Error = int32(xr.ReadUint32())
o.Code = int32(xr.ReadUint32())
return xr.Error()
}

View File

@ -12,23 +12,23 @@ type nativeModel struct {
next Model
}
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo) {
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
for i := range files {
files[i].Name = norm.NFD.String(files[i].Name)
}
m.next.Index(deviceID, folder, files)
m.next.Index(deviceID, folder, files, flags, options)
}
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) {
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
for i := range files {
files[i].Name = norm.NFD.String(files[i].Name)
}
m.next.IndexUpdate(deviceID, folder, files)
m.next.IndexUpdate(deviceID, folder, files, flags, options)
}
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int) ([]byte, error) {
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
name = norm.NFD.String(name)
return m.next.Request(deviceID, folder, name, offset, size)
return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options)
}
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {

View File

@ -10,16 +10,16 @@ type nativeModel struct {
next Model
}
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo) {
m.next.Index(deviceID, folder, files)
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
m.next.Index(deviceID, folder, files, flags, options)
}
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) {
m.next.IndexUpdate(deviceID, folder, files)
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
m.next.IndexUpdate(deviceID, folder, files, flags, options)
}
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int) ([]byte, error) {
return m.next.Request(deviceID, folder, name, offset, size)
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options)
}
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {

View File

@ -24,23 +24,30 @@ type nativeModel struct {
next Model
}
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo) {
for i, f := range files {
if strings.ContainsAny(f.Name, disallowedCharacters) {
if f.IsDeleted() {
// Don't complain if the file is marked as deleted, since it
// can't possibly exist here anyway.
continue
}
files[i].Flags |= FlagInvalid
l.Warnf("File name %q contains invalid characters; marked as invalid.", f.Name)
}
files[i].Name = filepath.FromSlash(f.Name)
}
m.next.Index(deviceID, folder, files)
func (m nativeModel) Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
fixupFiles(files)
m.next.Index(deviceID, folder, files, flags, options)
}
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) {
func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option) {
fixupFiles(files)
m.next.IndexUpdate(deviceID, folder, files, flags, options)
}
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
name = filepath.FromSlash(name)
return m.next.Request(deviceID, folder, name, offset, size, hash, flags, options)
}
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
m.next.ClusterConfig(deviceID, config)
}
func (m nativeModel) Close(deviceID DeviceID, err error) {
m.next.Close(deviceID, err)
}
func fixupFiles(files []FileInfo) {
for i, f := range files {
if strings.ContainsAny(f.Name, disallowedCharacters) {
if f.IsDeleted() {
@ -53,18 +60,4 @@ func (m nativeModel) IndexUpdate(deviceID DeviceID, folder string, files []FileI
}
files[i].Name = filepath.FromSlash(files[i].Name)
}
m.next.IndexUpdate(deviceID, folder, files)
}
func (m nativeModel) Request(deviceID DeviceID, folder string, name string, offset int64, size int) ([]byte, error) {
name = filepath.FromSlash(name)
return m.next.Request(deviceID, folder, name, offset, size)
}
func (m nativeModel) ClusterConfig(deviceID DeviceID, config ClusterConfigMessage) {
m.next.ClusterConfig(deviceID, config)
}
func (m nativeModel) Close(deviceID DeviceID, err error) {
m.next.Close(deviceID, err)
}

View File

@ -35,6 +35,7 @@ const (
stateIdxRcvd
)
// FileInfo flags
const (
FlagDeleted uint32 = 1 << 12
FlagInvalid = 1 << 13
@ -48,6 +49,17 @@ const (
SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget
)
// IndexMessage message flags (for IndexUpdate)
const (
FlagIndexTemporary uint32 = 1 << iota
)
// Request message flags
const (
FlagRequestTemporary uint32 = 1 << iota
)
// ClusterConfigMessage.Folders.Devices flags
const (
FlagShareTrusted uint32 = 1 << 0
FlagShareReadOnly = 1 << 1
@ -66,11 +78,11 @@ type pongMessage struct{ EmptyMessage }
type Model interface {
// An index was received from the peer device
Index(deviceID DeviceID, folder string, files []FileInfo)
Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
// An index update was received from the peer device
IndexUpdate(deviceID DeviceID, folder string, files []FileInfo)
IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
// A request was made by the peer device
Request(deviceID DeviceID, folder string, name string, offset int64, size int) ([]byte, error)
Request(deviceID DeviceID, folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error)
// A cluster configuration message was received
ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
// The peer device closed the connection
@ -80,9 +92,9 @@ type Model interface {
type Connection interface {
ID() DeviceID
Name() string
Index(folder string, files []FileInfo) error
IndexUpdate(folder string, files []FileInfo) error
Request(folder string, name string, offset int64, size int) ([]byte, error)
Index(folder string, files []FileInfo, flags uint32, options []Option) error
IndexUpdate(folder string, files []FileInfo, flags uint32, options []Option) error
Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error)
ClusterConfig(config ClusterConfigMessage)
Statistics() Statistics
}
@ -169,7 +181,7 @@ func (c *rawConnection) Name() string {
}
// Index writes the list of file information to the connected peer device
func (c *rawConnection) Index(folder string, idx []FileInfo) error {
func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, options []Option) error {
select {
case <-c.closed:
return ErrClosed
@ -177,15 +189,17 @@ func (c *rawConnection) Index(folder string, idx []FileInfo) error {
}
c.idxMut.Lock()
c.send(-1, messageTypeIndex, IndexMessage{
Folder: folder,
Files: idx,
Folder: folder,
Files: idx,
Flags: flags,
Options: options,
})
c.idxMut.Unlock()
return nil
}
// IndexUpdate writes the list of file information to the connected peer device as an update
func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error {
func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32, options []Option) error {
select {
case <-c.closed:
return ErrClosed
@ -193,15 +207,17 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error {
}
c.idxMut.Lock()
c.send(-1, messageTypeIndexUpdate, IndexMessage{
Folder: folder,
Files: idx,
Folder: folder,
Files: idx,
Flags: flags,
Options: options,
})
c.idxMut.Unlock()
return nil
}
// Request returns the bytes for the specified block after fetching them from the connected peer.
func (c *rawConnection) Request(folder string, name string, offset int64, size int) ([]byte, error) {
func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
var id int
select {
case id = <-c.nextID:
@ -218,10 +234,13 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i
c.awaitingMut.Unlock()
ok := c.send(id, messageTypeRequest, RequestMessage{
Folder: folder,
Name: name,
Offset: offset,
Size: int32(size),
Folder: folder,
Name: name,
Offset: offset,
Size: int32(size),
Hash: hash,
Flags: flags,
Options: options,
})
if !ok {
return nil, ErrClosed
@ -280,11 +299,6 @@ func (c *rawConnection) readerLoop() (err error) {
switch msg := msg.(type) {
case IndexMessage:
if msg.Flags != 0 {
// We don't currently support or expect any flags.
return fmt.Errorf("protocol error: unknown flags 0x%x in Index(Update) message", msg.Flags)
}
switch hdr.msgType {
case messageTypeIndex:
if c.state < stateCCRcvd {
@ -301,10 +315,6 @@ func (c *rawConnection) readerLoop() (err error) {
}
case RequestMessage:
if msg.Flags != 0 {
// We don't currently support or expect any flags.
return fmt.Errorf("protocol error: unknown flags 0x%x in Request message", msg.Flags)
}
if c.state < stateIdxRcvd {
return fmt.Errorf("protocol error: request message in state %d", c.state)
}
@ -460,16 +470,16 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
func (c *rawConnection) handleIndex(im IndexMessage) {
if debug {
l.Debugf("Index(%v, %v, %d files)", c.id, im.Folder, len(im.Files))
l.Debugf("Index(%v, %v, %d file, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
}
c.receiver.Index(c.id, im.Folder, filterIndexMessageFiles(im.Files))
c.receiver.Index(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
}
func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
if debug {
l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Folder, len(im.Files))
l.Debugf("queueing IndexUpdate(%v, %v, %d files, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
}
c.receiver.IndexUpdate(c.id, im.Folder, filterIndexMessageFiles(im.Files))
c.receiver.IndexUpdate(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
}
func filterIndexMessageFiles(fs []FileInfo) []FileInfo {
@ -499,10 +509,11 @@ func filterIndexMessageFiles(fs []FileInfo) []FileInfo {
}
func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
data, _ := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), int(req.Size))
data, err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), int(req.Size), req.Hash, req.Flags, req.Options)
c.send(msgID, messageTypeResponse, ResponseMessage{
Data: data,
Code: errorToCode(err),
})
}
@ -510,7 +521,7 @@ func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
c.awaitingMut.Lock()
if rc := c.awaiting[msgID]; rc != nil {
c.awaiting[msgID] = nil
rc <- asyncResult{resp.Data, nil}
rc <- asyncResult{resp.Data, codeToError(resp.Code)}
close(rc)
}
c.awaitingMut.Unlock()

View File

@ -229,10 +229,10 @@ func TestClose(t *testing.T) {
t.Error("Ping should not return true")
}
c0.Index("default", nil)
c0.Index("default", nil)
c0.Index("default", nil, 0, nil)
c0.Index("default", nil, 0, nil)
if _, err := c0.Request("default", "foo", 0, 0); err == nil {
if _, err := c0.Request("default", "foo", 0, 0, nil, 0, nil); err == nil {
t.Error("Request should return an error")
}
}

View File

@ -20,7 +20,7 @@ func (c wireFormatConnection) Name() string {
return c.next.Name()
}
func (c wireFormatConnection) Index(folder string, fs []FileInfo) error {
func (c wireFormatConnection) Index(folder string, fs []FileInfo, flags uint32, options []Option) error {
var myFs = make([]FileInfo, len(fs))
copy(myFs, fs)
@ -28,10 +28,10 @@ func (c wireFormatConnection) Index(folder string, fs []FileInfo) error {
myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
}
return c.next.Index(folder, myFs)
return c.next.Index(folder, myFs, flags, options)
}
func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo) error {
func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo, flags uint32, options []Option) error {
var myFs = make([]FileInfo, len(fs))
copy(myFs, fs)
@ -39,12 +39,12 @@ func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo) error {
myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name))
}
return c.next.IndexUpdate(folder, myFs)
return c.next.IndexUpdate(folder, myFs, flags, options)
}
func (c wireFormatConnection) Request(folder, name string, offset int64, size int) ([]byte, error) {
func (c wireFormatConnection) Request(folder, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
name = norm.NFC.String(filepath.ToSlash(name))
return c.next.Request(folder, name, offset, size)
return c.next.Request(folder, name, offset, size, hash, flags, options)
}
func (c wireFormatConnection) ClusterConfig(config ClusterConfigMessage) {