Merge pull request #1052 from AudriusButkevicius/disco4

Change to URL based announce server addresses (fixes #943)
This commit is contained in:
Jakob Borg 2014-12-01 17:53:48 +01:00
commit c734e48ad0
10 changed files with 809 additions and 246 deletions

View File

@ -41,6 +41,10 @@ identicon {
margin-top: 0px;
}
.popover {
max-width: none;
}
.identicon {
width: 30px;
height: 30px;

View File

@ -12,7 +12,7 @@ func Assets() map[string][]byte {
var bs []byte
var gr *gzip.Reader
bs, _ = base64.StdEncoding.DecodeString("H4sIAAAJbogA/5xWaW/bOBD97l8x28UCSdZXnAtrL4r1Bk0boE0Dx0GRj5REWUQoUiApHy363/dRVxxHToMN4NiiyDdvZt7McHDUGQzoUmcbIxaJo4PLQxoNj09pnnC626jQJUItaJq7RBvbx2a/f54IS5nRC8NSws/YcE5Wx27FDB/TRucUMkWGR8I6I4LccRKOmIoG2lCqIxFvsOChchVxQw7WHDepJR0XDx9v7ukjV9wwSbd5IEVIn0XIleXEYNqv2IRHFGyK7Vcg4NHuKg50pQHMnNCqS1xgi6ElNxbPdFLbqAC7BE4HzHnahnTmDx16MKY2JJl7OrrP/ScvIxKqwE50Bo8SoMLvlZCSAk655XEuu4Sd9O16/unr/dzDTW8e6Nt0NpvezB8m2IxQYwNf8hJKpJkUQIZfhim38fS/fJhdfsL+6b/Xn6/nD/DAA11dz28+3N3R1dcZTel2OptfX95/ns7o9n52+/XuQ5/uOP9VeOMSK9WIYsQdE7JJ+wPyakFORpSwJUd+Qy6WoMYohILekjup1cJDeS+x+SmOfbqOSWnXJQuOfyfOZePBYLVa9Rcq72uzGMgSww7e9ztHg04n0NGGfnQIfxmLIsi0F2jndDqmi2G2nhRvYq1cL2apkJsxvfvE5ZI7ETK64Tl/16VmoUtTIxhyY5myPcuNiCedn51OctylZITPCT6n+JxVJp8Dz5jkK7YB4v+wkcs/G9iUmYVQPaezMR33z3ha7BARV0CAeMtdEFwmGQwLJYXivUDq8LF0ONNWeAWPkR2IF+kp11cicgkwPaJ/Triv962FAujFqob0Y6lXY1oKKwLJCz79MOHhY6DXLayL0Ps9u6QrBidNbmpjJ82RjCkuQYL5bNILhMbtVn9ZYLVEDZbrBZfeaW1L8hiWen+1W/rxauieYpCICJR23UMZuFoVqPQx/X5+fl5ucnzteqlW2mYs5G3S+cKV1F36ohUL8X2pFbxgFkK6RDsSaD43fAUVNSAVMEMuerAe+ZqI3jvvyntn8KNLL9/6Wtn/Ntba7X/bIEevIre+bZCjyvdAG/T7UilKq8obqRf6mZQgo8l2ZTfPZVrP6rS2pawAREfuKd0Lcil5nZxi0boNKLpNxksC1XaQlnn6hvpKofNKyRd1kVTyNzu1E7DwcWH8IPLo2oDkIjgYnZx3aXQ69P+ODyfbUTFQY25REI17tffH2ZoaLdchGmFxVAp6ywOIRPUx+tjz2iwLYFi2lB1Hjof1YqFXJsXCR9S7M3ktHj506Lqh1OxxN3I+tvQbZpc2DlNrS7Vo+3VHSITjvULV/sDKsGyra6+q9nA6HD47He00/dKzEZrIPntR36ZMyt5WVFpN40hxYlfH1ZlyFd5hPI9R6mse+TPtRxqaL7tHE+inV1xKkVlhJ6+R+yfFfYrRQcrWVeYuzi+y9eE2vf08/N/gyI939JJAIC5ZghxZlBTGMlgUEyAwnD1af4vxkxpVhfGMMxLyHNgEN6uoGN0N4hGVsrN9wkyuV3ccMIh+6dnPUjNsGTBT1kvFDswKHlBeeTcoGnSNWae6rrCz5228uAnsSVol//pmUEkpQu/XRnyHzpj0TwqXK4Qgcs+H1WkzmvaeiNrq7Lw+Nzg66iBIt/6Sw61FTzC2dBCXGVwtfYYghY73FO7U29q5/6rrNceLHhCbgmutjq3KLk2/Pj/bZm3TLoZ/VPWV4N5dNZ6aNOo2eBSuWsVXrvyMLxEwC7+3v3mxWOCv9JvQR3vRR23oo+ratcNfii75xS2jUrQ2m602WsRVeM35+8WwvrL9BwAA//8BAAD//zISXddYDQAA")
bs, _ = base64.StdEncoding.DecodeString("H4sIAAAJbogA/5xW7W7buBL976eY24sLJLmW7ThfWGdRrDdo2gBtGjgOivykJMoiQpECScV2i777HurLjiOnxQZwbFHDwzMzZ2Y4POoNh3Sl87URi9TRwdUhjUfHpzRPOd2vVeRSoRY0LVyqjR3A2NvPU2EpN3phWEb4mRjOyerELZnhE1rrgiKmyPBYWGdEWDhOwhFT8VAbynQskjUWPFShYm7I4TTHTWZJJ+XDx9sH+sgVN0zSXRFKEdFnEXFlOTEc7VdsymMK16X5NQh4tPuaA11rADMntOoTFzAx9MyNxTOdNGfUgH0CpwPmPG1DOvebDj0YU2uSzG227nN/42VMQpXYqc7hUQpU+L0UUlLIqbA8KWSfYEnfbuafvj7MPdz09pG+TWez6e388RLGCDUM+DOvoESWSwFk+GWYcmtP/8uH2dUn2E//vvl8M3+EBx7o+mZ+++H+nq6/zmhKd9PZ/Obq4fN0RncPs7uv9x8GdM/5r8KbVFiZRhRj7piQbdofkVcLcjKmlD1z5Dfi4hnUGEVQ0O/kTmq18FDeSxhv4jigm4SUdn2y4Phn6lw+GQ6Xy+VgoYqBNouhrDDs8P2gdzTs9UIdr+lHj/CXsziGTINQO6ezCV2M8tVl+SbRygUJy4RcT+jdJy6fuRMRo1te8Hd9ahf6NDWCITeWKRtYbkRy2fvZ66XHfUrH+Jzgc4rPWX3kS+AZk3zJ1kD8F2cU8v8tbMbMQqjA6XxCx4MznpUWIuYKCBBvZQXB5ZLhYKGkUDwIpY6eKodzbYVX8ATZgXiRnmp9KWKXAtMj+ueU+3rfWiiBXq1qSD+RejmhZ2FFKHnJZxClPHoK9aqDdRl6b5Pr3O9uTVZBzUFpVcPs+lUbnLTpa/icbFCZ4hI8mU84vUJoI9MZEhZaLVGm1XpJNzhtzpI8wUnBH90n/XgzupswpSIGpV33UCmuEQ6awYT+e35+Xhk5vnJBppW2OYt4l7q+cCV1n75oxSJ8X2kFL5iF1q7QsQRCfMuXEFoLUgMzpCvA6bEvm/i98668dwY/+vT6rS+n/W8Trd3+ty1y/CZy59sWOa59D7XBSKjEtJGK1Av9Qm1Q2uV28bfPVVrPmrR2pawERNMOlA7CQkreJKdctG4Nim6d84pAbQ7Sssh+owQzlEKt5IumjuoKMTvlFbLoaWH8rPLo2oDkIjwYn5z3aXw68v+ODy+3o2KgxsKiIFr3Gu+P8xW1Wm5CNMbiuBL0lgcQiRpgOrKX5VsVwKjqOjuOHI+axVKvTIqFj6h35/KtePjQoTFHUrOn3cj52NJ/MN60cRhsW6rFZGg6QiocD0pV+w1Lw/Ktxr6s28PpaPRid7wzFyrPxmgi+86LBzZjUgZbUek8GlvKHbs6rvdUq/AOE3yCUl/x2O/p3tLSfN092kBvXnEpRW6FvXyL3F8ZrlyMDjbd9uL8Il8dbtPbz8P/DY/8DQC9JBSIS54iRxYlhckNFuWQCA1nT9ZfdPwwR1VhgmOPhDyHNsXlKy6ne4t4RJXs7IAwtpvVHQcMol959rPSDHsOmanqpWYHZiUPKK+6PpQNusFsUt1U2NnLNl5eFvYkrZZ/c3mopRSj92sjvkNnTPonhfsXQhC7l8PqtB1Ne3fEXXV23uwbHh31EKQ7fw/i1qInGFs5iPsObp8+Q5BCz3sKdxqzbu6/6nrt9rIHJKbk2qhjq7Kro9+en12ztm0Xo//V9ZXial43noY06jZ8Eq5exVeh/IyvEDALv3e/ebVY4i/1b6GP96KPu9DH9c1sh78UffKLW4dK0dlsttpoGVfhNefvF6PmVvcPAAAA//8BAAD//+ki3WZ7DQAA")
gr, _ = gzip.NewReader(bytes.NewBuffer(bs))
bs, _ = ioutil.ReadAll(gr)
assets["assets/css/overrides.css"] = bs

View File

@ -34,7 +34,7 @@ import (
var l = logger.DefaultLogger
const CurrentVersion = 6
const CurrentVersion = 7
type Configuration struct {
Version int `xml:"version,attr"`
@ -160,7 +160,7 @@ type FolderDeviceConfiguration struct {
type OptionsConfiguration struct {
ListenAddress []string `xml:"listenAddress" default:"0.0.0.0:22000"`
GlobalAnnServers []string `xml:"globalAnnounceServer" default:"announce.syncthing.net:22026"`
GlobalAnnServers []string `xml:"globalAnnounceServer" default:"udp4://announce.syncthing.net:22026"`
GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true"`
LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true"`
LocalAnnPort int `xml:"localAnnouncePort" default:"21025"`
@ -308,6 +308,11 @@ func (cfg *Configuration) prepare(myID protocol.DeviceID) {
convertV5V6(cfg)
}
// Upgrade to v7 configuration if appropriate
if cfg.Version == 6 {
convertV6V7(cfg)
}
// Hash old cleartext passwords
if len(cfg.GUI.Password) > 0 && cfg.GUI.Password[0] != '$' {
hash, err := bcrypt.GenerateFromPassword([]byte(cfg.GUI.Password), 0)
@ -397,6 +402,15 @@ func ChangeRequiresRestart(from, to Configuration) bool {
return false
}
func convertV6V7(cfg *Configuration) {
// Migrate announce server addresses to the new URL based format
for i := range cfg.Options.GlobalAnnServers {
cfg.Options.GlobalAnnServers[i] = "udp4://" + cfg.Options.GlobalAnnServers[i]
}
cfg.Version = 7
}
func convertV5V6(cfg *Configuration) {
// Added ".stfolder" file at folder roots to identify mount issues
// Doesn't affect the config itself, but uses config migrations to identify

View File

@ -36,7 +36,7 @@ func init() {
func TestDefaultValues(t *testing.T) {
expected := OptionsConfiguration{
ListenAddress: []string{"0.0.0.0:22000"},
GlobalAnnServers: []string{"announce.syncthing.net:22026"},
GlobalAnnServers: []string{"udp4://announce.syncthing.net:22026"},
GlobalAnnEnabled: true,
LocalAnnEnabled: true,
LocalAnnPort: 21025,
@ -139,7 +139,7 @@ func TestNoListenAddress(t *testing.T) {
func TestOverriddenValues(t *testing.T) {
expected := OptionsConfiguration{
ListenAddress: []string{":23000"},
GlobalAnnServers: []string{"syncthing.nym.se:22026"},
GlobalAnnServers: []string{"udp4://syncthing.nym.se:22026"},
GlobalAnnEnabled: false,
LocalAnnEnabled: false,
LocalAnnPort: 42123,

12
internal/config/testdata/v7.xml vendored Normal file
View File

@ -0,0 +1,12 @@
<configuration version="7">
<folder id="test" path="testdata/" ro="true" ignorePerms="false" rescanIntervalS="600">
<device id="AIR6LPZ-7K4PTTV-UXQSMUU-CPQ5YWH-OEDFIIQ-JUG777G-2YQXXR5-YD6AWQR"></device>
<device id="P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2"></device>
</folder>
<device id="AIR6LPZ-7K4PTTV-UXQSMUU-CPQ5YWH-OEDFIIQ-JUG777G-2YQXXR5-YD6AWQR" name="node one" compression="true">
<address>a</address>
</device>
<device id="P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2" name="node two" compression="true">
<address>b</address>
</device>
</configuration>

View File

@ -0,0 +1,59 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along
// with this program. If not, see <http://www.gnu.org/licenses/>.
package discover
import (
"fmt"
"net/url"
"time"
"github.com/syncthing/syncthing/internal/protocol"
)
type Factory func(*url.URL, *Announce) (Client, error)
var (
factories = make(map[string]Factory)
DefaultErrorRetryInternval = 60 * time.Second
DefaultGlobalBroadcastInterval = 1800 * time.Second
)
func Register(proto string, factory Factory) {
factories[proto] = factory
}
func New(addr string, pkt *Announce) (Client, error) {
uri, err := url.Parse(addr)
if err != nil {
return nil, err
}
factory, ok := factories[uri.Scheme]
if !ok {
return nil, fmt.Errorf("Unsupported scheme: %s", uri.Scheme)
}
client, err := factory(uri, pkt)
if err != nil {
return nil, err
}
return client, nil
}
type Client interface {
Lookup(device protocol.DeviceID) []string
StatusOK() bool
Address() string
Stop()
}

View File

@ -0,0 +1,227 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along
// with this program. If not, see <http://www.gnu.org/licenses/>.
package discover
import (
"fmt"
"net"
"sync"
"time"
"testing"
"github.com/syncthing/syncthing/internal/protocol"
)
var device protocol.DeviceID
func init() {
device, _ = protocol.DeviceIDFromString("P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2")
}
func TestUDP4Success(t *testing.T) {
conn, err := net.ListenUDP("udp4", nil)
if err != nil {
t.Fatal(err)
}
port := conn.LocalAddr().(*net.UDPAddr).Port
address := fmt.Sprintf("udp4://127.0.0.1:%d", port)
pkt := &Announce{
Magic: AnnouncementMagic,
This: Device{
device[:],
[]Address{{
IP: net.IPv4(123, 123, 123, 123),
Port: 1234,
}},
},
}
client, err := New(address, pkt)
if err != nil {
t.Fatal(err)
}
udpclient := client.(*UDPClient)
if udpclient.errorRetryInterval != DefaultErrorRetryInternval {
t.Fatal("Incorrect retry interval")
}
if udpclient.listenAddress.IP != nil || udpclient.listenAddress.Port != 0 {
t.Fatal("Wrong listen IP or port", udpclient.listenAddress)
}
if client.Address() != address {
t.Fatal("Incorrect address")
}
buf := make([]byte, 2048)
// First announcement
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, err = conn.Read(buf)
if err != nil {
t.Fatal(err)
}
// Announcement verification
conn.SetDeadline(time.Now().Add(time.Millisecond * 1100))
_, addr, err := conn.ReadFromUDP(buf)
if err != nil {
t.Fatal(err)
}
// Reply to it.
_, err = conn.WriteToUDP(pkt.MustMarshalXDR(), addr)
if err != nil {
t.Fatal(err)
}
// We should get nothing else
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, err = conn.Read(buf)
if err == nil {
t.Fatal("Expected error")
}
// Status should be ok
if !client.StatusOK() {
t.Fatal("Wrong status")
}
// Do a lookup in a separate routine
addrs := []string{}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
addrs = client.Lookup(device)
wg.Done()
}()
// Receive the lookup and reply
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, addr, err = conn.ReadFromUDP(buf)
if err != nil {
t.Fatal(err)
}
conn.WriteToUDP(pkt.MustMarshalXDR(), addr)
// Wait for the lookup to arrive, verify that the number of answers is correct
wg.Wait()
if len(addrs) != 1 || addrs[0] != "123.123.123.123:1234" {
t.Fatal("Wrong number of answers")
}
client.Stop()
}
func TestUDP4Failure(t *testing.T) {
conn, err := net.ListenUDP("udp4", nil)
if err != nil {
t.Fatal(err)
}
port := conn.LocalAddr().(*net.UDPAddr).Port
address := fmt.Sprintf("udp4://127.0.0.1:%d/?listenaddress=127.0.0.1&retry=5", port)
pkt := &Announce{
Magic: AnnouncementMagic,
This: Device{
device[:],
[]Address{{
IP: net.IPv4(123, 123, 123, 123),
Port: 1234,
}},
},
}
client, err := New(address, pkt)
if err != nil {
t.Fatal(err)
}
udpclient := client.(*UDPClient)
if udpclient.errorRetryInterval != time.Second*5 {
t.Fatal("Incorrect retry interval")
}
if !udpclient.listenAddress.IP.Equal(net.IPv4(127, 0, 0, 1)) || udpclient.listenAddress.Port != 0 {
t.Fatal("Wrong listen IP or port", udpclient.listenAddress)
}
if client.Address() != address {
t.Fatal("Incorrect address")
}
buf := make([]byte, 2048)
// First announcement
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, err = conn.Read(buf)
if err != nil {
t.Fatal(err)
}
// Announcement verification
conn.SetDeadline(time.Now().Add(time.Millisecond * 1100))
_, _, err = conn.ReadFromUDP(buf)
if err != nil {
t.Fatal(err)
}
// Don't reply
// We should get nothing else
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, err = conn.Read(buf)
if err == nil {
t.Fatal("Expected error")
}
// Status should be failure
if client.StatusOK() {
t.Fatal("Wrong status")
}
// Do a lookup in a separate routine
addrs := []string{}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
addrs = client.Lookup(device)
wg.Done()
}()
// Receive the lookup and don't reply
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, _, err = conn.ReadFromUDP(buf)
if err != nil {
t.Fatal(err)
}
// Wait for the lookup to timeout, verify that the number of answers is none
wg.Wait()
if len(addrs) != 0 {
t.Fatal("Wrong number of answers")
}
client.Stop()
}

View File

@ -0,0 +1,250 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along
// with this program. If not, see <http://www.gnu.org/licenses/>.
package discover
import (
"encoding/hex"
"io"
"net"
"net/url"
"strconv"
"sync"
"time"
"github.com/syncthing/syncthing/internal/protocol"
)
func init() {
for _, proto := range []string{"udp", "udp4", "udp6"} {
Register(proto, func(uri *url.URL, pkt *Announce) (Client, error) {
c := &UDPClient{}
err := c.Start(uri, pkt)
if err != nil {
return nil, err
}
return c, nil
})
}
}
type UDPClient struct {
url *url.URL
id protocol.DeviceID
stop chan struct{}
wg sync.WaitGroup
listenAddress *net.UDPAddr
globalBroadcastInterval time.Duration
errorRetryInterval time.Duration
status bool
mut sync.RWMutex
}
func (d *UDPClient) Start(uri *url.URL, pkt *Announce) error {
d.url = uri
d.id = protocol.DeviceIDFromBytes(pkt.This.ID)
d.stop = make(chan struct{})
params := uri.Query()
// The address must not have a port, as otherwise both announce and lookup
// sockets would try to bind to the same port.
addr, err := net.ResolveUDPAddr(d.url.Scheme, params.Get("listenaddress")+":0")
if err != nil {
return err
}
d.listenAddress = addr
broadcastSeconds, err := strconv.ParseUint(params.Get("broadcast"), 0, 0)
if err != nil {
d.globalBroadcastInterval = DefaultGlobalBroadcastInterval
} else {
d.globalBroadcastInterval = time.Duration(broadcastSeconds) * time.Second
}
retrySeconds, err := strconv.ParseUint(params.Get("retry"), 0, 0)
if err != nil {
d.errorRetryInterval = DefaultErrorRetryInternval
} else {
d.errorRetryInterval = time.Duration(retrySeconds) * time.Second
}
d.wg.Add(1)
go d.broadcast(pkt.MustMarshalXDR())
return nil
}
func (d *UDPClient) broadcast(pkt []byte) {
defer d.wg.Done()
timer := time.NewTimer(0)
conn, err := net.ListenUDP(d.url.Scheme, d.listenAddress)
for err != nil {
timer.Reset(d.errorRetryInterval)
l.Warnf("Global UDP discovery (%s): %v; trying again in %v", d.url, err, d.errorRetryInterval)
select {
case <-d.stop:
return
case <-timer.C:
}
conn, err = net.ListenUDP(d.url.Scheme, d.listenAddress)
}
defer conn.Close()
remote, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
for err != nil {
timer.Reset(d.errorRetryInterval)
l.Warnf("Global UDP discovery (%s): %v; trying again in %v", d.url, err, d.errorRetryInterval)
select {
case <-d.stop:
return
case <-timer.C:
}
remote, err = net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
}
timer.Reset(0)
for {
select {
case <-d.stop:
return
case <-timer.C:
var ok bool
if debug {
l.Debugf("Global UDP discovery (%s): send announcement -> %v\n%s", d.url, remote, hex.Dump(pkt))
}
_, err := conn.WriteTo(pkt, remote)
if err != nil {
if debug {
l.Debugf("discover %s: warning: %s", d.url, err)
}
ok = false
} else {
// Verify that the announce server responds positively for our device ID
time.Sleep(1 * time.Second)
res := d.Lookup(d.id)
if debug {
l.Debugf("discover %s: external lookup check: %v", d.url, res)
}
ok = len(res) > 0
}
d.mut.Lock()
d.status = ok
d.mut.Unlock()
if ok {
timer.Reset(d.globalBroadcastInterval)
} else {
timer.Reset(d.errorRetryInterval)
}
}
}
}
func (d *UDPClient) Lookup(device protocol.DeviceID) []string {
extIP, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
if err != nil {
if debug {
l.Debugf("discover %s: %v; no external lookup", d.url, err)
}
return nil
}
conn, err := net.DialUDP(d.url.Scheme, d.listenAddress, extIP)
if err != nil {
if debug {
l.Debugf("discover %s: %v; no external lookup", d.url, err)
}
return nil
}
defer conn.Close()
err = conn.SetDeadline(time.Now().Add(5 * time.Second))
if err != nil {
if debug {
l.Debugf("discover %s: %v; no external lookup", d.url, err)
}
return nil
}
buf := Query{QueryMagic, device[:]}.MustMarshalXDR()
_, err = conn.Write(buf)
if err != nil {
if debug {
l.Debugf("discover %s: %v; no external lookup", d.url, err)
}
return nil
}
buf = make([]byte, 2048)
n, err := conn.Read(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
// Expected if the server doesn't know about requested device ID
return nil
}
if debug {
l.Debugf("discover %s: %v; no external lookup", d.url, err)
}
return nil
}
if debug {
l.Debugf("discover %s: read external:\n%s", d.url, hex.Dump(buf[:n]))
}
var pkt Announce
err = pkt.UnmarshalXDR(buf[:n])
if err != nil && err != io.EOF {
if debug {
l.Debugln("discover %s:", d.url, err)
}
return nil
}
var addrs []string
for _, a := range pkt.This.Addresses {
deviceAddr := net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
addrs = append(addrs, deviceAddr)
}
return addrs
}
func (d *UDPClient) Stop() {
if d.stop != nil {
close(d.stop)
d.wg.Wait()
}
}
func (d *UDPClient) StatusOK() bool {
d.mut.RLock()
defer d.mut.RUnlock()
return d.status
}
func (d *UDPClient) Address() string {
return d.url.String()
}

View File

@ -31,25 +31,21 @@ import (
)
type Discoverer struct {
myID protocol.DeviceID
listenAddrs []string
localBcastIntv time.Duration
localBcastStart time.Time
globalBcastIntv time.Duration
errorRetryIntv time.Duration
cacheLifetime time.Duration
broadcastBeacon beacon.Interface
multicastBeacon beacon.Interface
registry map[protocol.DeviceID][]CacheEntry
registryLock sync.RWMutex
extServers []string
extPort uint16
localBcastTick <-chan time.Time
stopGlobal chan struct{}
globalWG sync.WaitGroup
forcedBcastTick chan time.Time
extAnnounceOK map[string]bool
extAnnounceOKmut sync.Mutex
myID protocol.DeviceID
listenAddrs []string
localBcastIntv time.Duration
localBcastStart time.Time
cacheLifetime time.Duration
broadcastBeacon beacon.Interface
multicastBeacon beacon.Interface
registry map[protocol.DeviceID][]CacheEntry
registryLock sync.RWMutex
extPort uint16
localBcastTick <-chan time.Time
forcedBcastTick chan time.Time
clients []Client
mut sync.RWMutex
}
type CacheEntry struct {
@ -63,14 +59,11 @@ var (
func NewDiscoverer(id protocol.DeviceID, addresses []string) *Discoverer {
return &Discoverer{
myID: id,
listenAddrs: addresses,
localBcastIntv: 30 * time.Second,
globalBcastIntv: 1800 * time.Second,
errorRetryIntv: 60 * time.Second,
cacheLifetime: 5 * time.Minute,
registry: make(map[protocol.DeviceID][]CacheEntry),
extAnnounceOK: make(map[string]bool),
myID: id,
listenAddrs: addresses,
localBcastIntv: 30 * time.Second,
cacheLifetime: 5 * time.Minute,
registry: make(map[protocol.DeviceID][]CacheEntry),
}
}
@ -112,38 +105,60 @@ func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
}
func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
// Wait for any previous announcer to stop before starting a new one.
d.globalWG.Wait()
d.extServers = servers
d.mut.Lock()
defer d.mut.Unlock()
if len(d.clients) > 0 {
d.stopGlobal()
}
d.extPort = extPort
d.stopGlobal = make(chan struct{})
d.globalWG.Add(1)
go func() {
defer d.globalWG.Done()
pkt := d.announcementPkt()
wg := sync.WaitGroup{}
clients := make(chan Client, len(servers))
for _, address := range servers {
wg.Add(1)
go func(addr string) {
defer wg.Done()
client, err := New(addr, pkt)
if err != nil {
l.Infoln("Error creating discovery client", addr, err)
return
}
clients <- client
}(address)
}
buf := d.announcementPkt()
wg.Wait()
close(clients)
for _, extServer := range d.extServers {
d.globalWG.Add(1)
go func(server string) {
d.sendExternalAnnouncements(server, buf)
d.globalWG.Done()
}(extServer)
}
}()
}
func (d *Discoverer) StopGlobal() {
if d.stopGlobal != nil {
close(d.stopGlobal)
d.globalWG.Wait()
for client := range clients {
d.clients = append(d.clients, client)
}
}
func (d *Discoverer) StopGlobal() {
d.mut.Lock()
defer d.mut.Unlock()
d.stopGlobal()
}
func (d *Discoverer) stopGlobal() {
for _, client := range d.clients {
client.Stop()
}
d.clients = []Client{}
}
func (d *Discoverer) ExtAnnounceOK() map[string]bool {
d.extAnnounceOKmut.Lock()
defer d.extAnnounceOKmut.Unlock()
return d.extAnnounceOK
d.mut.RLock()
defer d.mut.RUnlock()
ret := make(map[string]bool)
for _, client := range d.clients {
ret[client.Address()] = client.StatusOK()
}
return ret
}
func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
@ -151,22 +166,47 @@ func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
cached := d.filterCached(d.registry[device])
d.registryLock.RUnlock()
d.mut.RLock()
defer d.mut.RUnlock()
var addrs []string
if len(cached) > 0 {
addrs := make([]string, len(cached))
addrs = make([]string, len(cached))
for i := range cached {
addrs[i] = cached[i].Address
}
return addrs
} else if len(d.extServers) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
} else if len(d.clients) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
// Only perform external lookups if we have at least one external
// server and one local announcement interval has passed. This is to
// avoid finding local peers on their remote address at startup.
addrs := d.externalLookup(device)
cached = make([]CacheEntry, len(addrs))
for i := range addrs {
cached[i] = CacheEntry{
Address: addrs[i],
Seen: time.Now(),
// server client and one local announcement interval has passed. This is
// to avoid finding local peers on their remote address at startup.
results := make(chan []string, len(d.clients))
wg := sync.WaitGroup{}
for _, client := range d.clients {
wg.Add(1)
go func(c Client) {
defer wg.Done()
results <- c.Lookup(device)
}(client)
}
wg.Wait()
close(results)
cached := []CacheEntry{}
seen := make(map[string]struct{})
now := time.Now()
for result := range results {
for _, addr := range result {
_, ok := seen[addr]
if !ok {
cached = append(cached, CacheEntry{
Address: addr,
Seen: now,
})
seen[addr] = struct{}{}
addrs = append(addrs, addr)
}
}
}
@ -174,7 +214,7 @@ func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
d.registry[device] = cached
d.registryLock.Unlock()
}
return nil
return addrs
}
func (d *Discoverer) Hint(device string, addrs []string) {
@ -199,7 +239,7 @@ func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
return devices
}
func (d *Discoverer) announcementPkt() []byte {
func (d *Discoverer) announcementPkt() *Announce {
var addrs []Address
if d.extPort != 0 {
addrs = []Address{{Port: d.extPort}}
@ -221,11 +261,10 @@ func (d *Discoverer) announcementPkt() []byte {
}
}
}
var pkt = Announce{
return &Announce{
Magic: AnnouncementMagic,
This: Device{d.myID[:], addrs},
}
return pkt.MustMarshalXDR()
}
func (d *Discoverer) sendLocalAnnouncements() {
@ -252,80 +291,6 @@ func (d *Discoverer) sendLocalAnnouncements() {
}
}
func (d *Discoverer) sendExternalAnnouncements(extServer string, buf []byte) {
timer := time.NewTimer(0)
conn, err := net.ListenUDP("udp", nil)
for err != nil {
timer.Reset(d.errorRetryIntv)
l.Warnf("Global discovery: %v; trying again in %v", err, d.errorRetryIntv)
select {
case <-d.stopGlobal:
return
case <-timer.C:
}
conn, err = net.ListenUDP("udp", nil)
}
remote, err := net.ResolveUDPAddr("udp", extServer)
for err != nil {
timer.Reset(d.errorRetryIntv)
l.Warnf("Global discovery: %s: %v; trying again in %v", extServer, err, d.errorRetryIntv)
select {
case <-d.stopGlobal:
return
case <-timer.C:
}
remote, err = net.ResolveUDPAddr("udp", extServer)
}
// Delay the first announcement until after a full local announcement
// cycle, to increase the chance of other peers finding us locally first.
timer.Reset(d.localBcastIntv)
for {
select {
case <-d.stopGlobal:
return
case <-timer.C:
var ok bool
if debug {
l.Debugf("discover: send announcement -> %v\n%s", remote, hex.Dump(buf))
}
_, err := conn.WriteTo(buf, remote)
if err != nil {
if debug {
l.Debugln("discover: %s: warning:", extServer, err)
}
ok = false
} else {
// Verify that the announce server responds positively for our device ID
time.Sleep(1 * time.Second)
res := d.externalLookupOnServer(extServer, d.myID)
if debug {
l.Debugln("discover:", extServer, "external lookup check:", res)
}
ok = len(res) > 0
}
d.extAnnounceOKmut.Lock()
d.extAnnounceOK[extServer] = ok
d.extAnnounceOKmut.Unlock()
if ok {
timer.Reset(d.globalBcastIntv)
} else {
timer.Reset(d.errorRetryIntv)
}
}
}
}
func (d *Discoverer) recvAnnouncements(b beacon.Interface) {
for {
buf, addr := b.Recv()
@ -406,104 +371,6 @@ func (d *Discoverer) registerDevice(addr net.Addr, device Device) bool {
return len(current) > len(orig)
}
func (d *Discoverer) externalLookup(device protocol.DeviceID) []string {
// Buffer up to as many answers as we have servers to query.
results := make(chan []string, len(d.extServers))
// Query all servers.
wg := sync.WaitGroup{}
for _, extServer := range d.extServers {
wg.Add(1)
go func(server string) {
result := d.externalLookupOnServer(server, device)
if debug {
l.Debugln("discover:", result, "from", server, "for", device)
}
results <- result
wg.Done()
}(extServer)
}
wg.Wait()
close(results)
addrs := []string{}
for result := range results {
addrs = append(addrs, result...)
}
return addrs
}
func (d *Discoverer) externalLookupOnServer(extServer string, device protocol.DeviceID) []string {
extIP, err := net.ResolveUDPAddr("udp", extServer)
if err != nil {
if debug {
l.Debugf("discover: %s: %v; no external lookup", extServer, err)
}
return nil
}
conn, err := net.DialUDP("udp", nil, extIP)
if err != nil {
if debug {
l.Debugf("discover: %s: %v; no external lookup", extServer, err)
}
return nil
}
defer conn.Close()
err = conn.SetDeadline(time.Now().Add(5 * time.Second))
if err != nil {
if debug {
l.Debugf("discover: %s: %v; no external lookup", extServer, err)
}
return nil
}
buf := Query{QueryMagic, device[:]}.MustMarshalXDR()
_, err = conn.Write(buf)
if err != nil {
if debug {
l.Debugf("discover: %s: %v; no external lookup", extServer, err)
}
return nil
}
buf = make([]byte, 2048)
n, err := conn.Read(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
// Expected if the server doesn't know about requested device ID
return nil
}
if debug {
l.Debugf("discover: %s: %v; no external lookup", extServer, err)
}
return nil
}
if debug {
l.Debugf("discover: %s: read external:\n%s", extServer, hex.Dump(buf[:n]))
}
var pkt Announce
err = pkt.UnmarshalXDR(buf[:n])
if err != nil && err != io.EOF {
if debug {
l.Debugln("discover:", extServer, err)
}
return nil
}
var addrs []string
for _, a := range pkt.This.Addresses {
deviceAddr := net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
addrs = append(addrs, deviceAddr)
}
return addrs
}
func (d *Discoverer) filterCached(c []CacheEntry) []CacheEntry {
for i := 0; i < len(c); {
if ago := time.Since(c[i].Seen); ago > d.cacheLifetime {

View File

@ -13,6 +13,136 @@
// You should have received a copy of the GNU General Public License along
// with this program. If not, see <http://www.gnu.org/licenses/>.
package discover_test
package discover
// Empty test file to generate 0% coverage rather than no coverage
import (
"net/url"
"time"
"testing"
"github.com/syncthing/syncthing/internal/protocol"
)
type DummyClient struct {
url *url.URL
lookups []protocol.DeviceID
lookupRet []string
stops int
statusRet bool
statusChecks int
}
func (c *DummyClient) Lookup(device protocol.DeviceID) []string {
c.lookups = append(c.lookups, device)
return c.lookupRet
}
func (c *DummyClient) StatusOK() bool {
c.statusChecks++
return c.statusRet
}
func (c *DummyClient) Stop() {
c.stops++
}
func (c *DummyClient) Address() string {
return c.url.String()
}
func TestGlobalDiscovery(t *testing.T) {
c1 := &DummyClient{
statusRet: false,
lookupRet: []string{"test.com:1234"},
}
c2 := &DummyClient{
statusRet: true,
lookupRet: []string{},
}
c3 := &DummyClient{
statusRet: true,
lookupRet: []string{"best.com:2345"},
}
clients := []*DummyClient{c1, c2}
Register("test1", func(uri *url.URL, pkt *Announce) (Client, error) {
c := clients[0]
clients = clients[1:]
c.url = uri
return c, nil
})
Register("test2", func(uri *url.URL, pkt *Announce) (Client, error) {
c3.url = uri
return c3, nil
})
d := NewDiscoverer(device, []string{})
d.localBcastStart = time.Time{}
servers := []string{
"test1://123.123.123.123:1234",
"test1://23.23.23.23:234",
"test2://234.234.234.234.2345",
}
d.StartGlobal(servers, 1234)
if len(d.clients) != 3 {
t.Fatal("Wrong number of clients")
}
status := d.ExtAnnounceOK()
for _, c := range []*DummyClient{c1, c2, c3} {
if status[c.url.String()] != c.statusRet || c.statusChecks != 1 {
t.Fatal("Wrong status")
}
}
addrs := d.Lookup(device)
if len(addrs) != 2 {
t.Fatal("Wrong numer of addresses", addrs)
}
for _, addr := range []string{"test.com:1234", "best.com:2345"} {
found := false
for _, laddr := range addrs {
if laddr == addr {
found = true
break
}
}
if !found {
t.Fatal("Couldn't find", addr)
}
}
for _, c := range []*DummyClient{c1, c2, c3} {
if len(c.lookups) != 1 || c.lookups[0] != device {
t.Fatal("Wrong lookups")
}
}
addrs = d.Lookup(device)
if len(addrs) != 2 {
t.Fatal("Wrong numer of addresses", addrs)
}
// Answer should be cached, so number of lookups should have not incresed
for _, c := range []*DummyClient{c1, c2, c3} {
if len(c.lookups) != 1 || c.lookups[0] != device {
t.Fatal("Wrong lookups")
}
}
d.StopGlobal()
for _, c := range []*DummyClient{c1, c2, c3} {
if c.stops != 1 {
t.Fatal("Wrong number of stops")
}
}
}