From 8db1bf9732a1853ab3c8415417bfe96ee8eb6a4f Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 18 Mar 2014 17:51:55 +0100 Subject: [PATCH] Fix local announce (IPv6 multicast, include all listen addresses) --- cmd/syncthing/main.go | 17 ++-- discover/discover.go | 202 ++++++++++++++++++++------------------ integration/h1/config.xml | 8 +- integration/h2/config.xml | 10 +- integration/h3/config.xml | 8 +- 5 files changed, 125 insertions(+), 120 deletions(-) diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 30fdc473d..6a85ddc69 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -14,7 +14,6 @@ import ( "path" "runtime" "runtime/debug" - "strconv" "strings" "time" @@ -263,7 +262,7 @@ func main() { if verbose { infoln("Attempting to connect to other nodes") } - disc := discovery(cfg.Options.ListenAddress[0]) + disc := discovery() go connect(myID, disc, m, tlsCfg, connOpts) // Routine to pull blocks from other nodes to synchronize the local @@ -452,24 +451,20 @@ listen: } } -func discovery(addr string) *discover.Discoverer { - _, portstr, err := net.SplitHostPort(addr) - fatalErr(err) - port, _ := strconv.Atoi(portstr) - +func discovery() *discover.Discoverer { if !cfg.Options.LocalAnnEnabled { - port = -1 - } else if verbose { - infoln("Sending local discovery announcements") + return nil } + infoln("Sending local discovery announcements") + if !cfg.Options.GlobalAnnEnabled { cfg.Options.GlobalAnnServer = "" } else if verbose { infoln("Sending external discovery announcements") } - disc, err := discover.NewDiscoverer(myID, port, cfg.Options.GlobalAnnServer) + disc, err := discover.NewDiscoverer(myID, cfg.Options.ListenAddress, cfg.Options.GlobalAnnServer) if err != nil { warnf("No discovery possible (%v)", err) diff --git a/discover/discover.go b/discover/discover.go index f8f60eb99..c17e02f0d 100644 --- a/discover/discover.go +++ b/discover/discover.go @@ -9,6 +9,7 @@ import ( "strings" "sync" "time" + "code.google.com/p/go.net/ipv6" "github.com/calmh/syncthing/buffers" ) @@ -19,14 +20,16 @@ const ( type Discoverer struct { MyID string - ListenPort int + ListenAddresses []string BroadcastIntv time.Duration ExtBroadcastIntv time.Duration - conn *net.UDPConn + conn *ipv6.PacketConn + intfs []*net.Interface registry map[string][]string registryLock sync.RWMutex extServer string + group *net.UDPAddr localBroadcastTick <-chan time.Time forcedBroadcastTick chan time.Time @@ -41,113 +44,114 @@ var ( // When we hit this many errors in succession, we stop. const maxErrors = 30 -func NewDiscoverer(id string, port int, extServer string) (*Discoverer, error) { - local := &net.UDPAddr{IP: nil, Port: AnnouncementPort} - conn, err := net.ListenUDP("udp", local) +func NewDiscoverer(id string, addresses []string, extServer string) (*Discoverer, error) { + disc := &Discoverer{ + MyID: id, + ListenAddresses: addresses, + BroadcastIntv: 30 * time.Second, + ExtBroadcastIntv: 1800 * time.Second, + registry: make(map[string][]string), + extServer: extServer, + group: &net.UDPAddr{IP: net.ParseIP("ff02::2012:1025"), Port: AnnouncementPort}, + } + + // Listen on a multicast socket. This enables sharing the socket, i.e. + // other instances of syncting on the same box can listen on the same + // group/port. + + conn, err := net.ListenPacket("udp6", fmt.Sprintf("[ff02::]:%d", AnnouncementPort)) if err != nil { return nil, err } + disc.conn = ipv6.NewPacketConn(conn) - disc := &Discoverer{ - MyID: id, - ListenPort: port, - BroadcastIntv: 30 * time.Second, - ExtBroadcastIntv: 1800 * time.Second, + // Join the multicast group on as many interfaces as possible. Remember + // which those were. - conn: conn, - registry: make(map[string][]string), - extServer: extServer, + intfs, err := net.Interfaces() + if err != nil { + log.Printf("discover/interfaces: %v; no local announcements", err) + conn.Close() + return nil, err } + for _, intf := range intfs { + intf := intf + addrs, err := intf.Addrs() + if err == nil && len(addrs) > 0 && intf.Flags&net.FlagMulticast != 0 && intf.Flags&net.FlagUp != 0 { + if err := disc.conn.JoinGroup(&intf, disc.group); err != nil { + if debug { + dlog.Printf("%v; not joining on %s", err, intf.Name) + } + } else { + disc.intfs = append(disc.intfs, &intf) + } + } + } + + // Receive announcements sent to the local multicast group. + go disc.recvAnnouncements() - if disc.ListenPort > 0 { + // If we got a list of addresses that we listen on, announce those + // locally. + + if len(disc.ListenAddresses) > 0 { disc.localBroadcastTick = time.Tick(disc.BroadcastIntv) disc.forcedBroadcastTick = make(chan time.Time) - go disc.sendAnnouncements() - } - if len(disc.extServer) > 0 { - go disc.sendExtAnnouncements() + go disc.sendLocalAnnouncements() + + // If we have an external server address, also announce to that + // server. + + if len(disc.extServer) > 0 { + go disc.sendExternalAnnouncements() + } } return disc, nil } -func (d *Discoverer) sendAnnouncements() { - var pkt = AnnounceV2{AnnouncementMagicV2, d.MyID, []Address{{nil, 22000}}} - var buf = pkt.MarshalXDR() +func (d *Discoverer) announcementPkt() []byte { + var addrs []Address + for _, astr := range d.ListenAddresses { + addr, err := net.ResolveTCPAddr("tcp", astr) + if err != nil { + log.Printf("discover/announcement: %v: not announcing %s", err, astr) + continue + } else if debug { + dlog.Printf("announcing %s: %#v", astr, addr) + } + if len(addr.IP) == 0 || addr.IP.IsUnspecified() { + addrs = append(addrs, Address{Port: uint16(addr.Port)}) + } else if bs := addr.IP.To4(); bs != nil { + addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)}) + } else if bs := addr.IP.To16(); bs != nil { + addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)}) + } + } + var pkt = AnnounceV2{ + Magic: AnnouncementMagicV2, + NodeID: d.MyID, + Addresses: addrs, + } + return pkt.MarshalXDR() +} + +func (d *Discoverer) sendLocalAnnouncements() { + var buf = d.announcementPkt() var errCounter = 0 var err error - remote := &net.UDPAddr{ - IP: net.IP{255, 255, 255, 255}, - Port: AnnouncementPort, - } - + wcm := ipv6.ControlMessage{HopLimit: 1} for errCounter < maxErrors { - intfs, err := net.Interfaces() - if err != nil { - log.Printf("discover/listInterfaces: %v; no local announcements", err) - return - } - - for _, intf := range intfs { - if intf.Flags&(net.FlagBroadcast|net.FlagLoopback) == net.FlagBroadcast { - addrs, err := intf.Addrs() - if err != nil { - log.Println("discover/listAddrs: warning:", err) - errCounter++ - continue - } - - var srcAddr string - for _, addr := range addrs { - if strings.Contains(addr.String(), ".") { - // Found an IPv4 adress - parts := strings.Split(addr.String(), "/") - srcAddr = parts[0] - break - } - } - if len(srcAddr) == 0 { - if debug { - dlog.Println("no source address found on interface", intf.Name) - } - continue - } - - iaddr, err := net.ResolveUDPAddr("udp4", srcAddr+":0") - if err != nil { - log.Println("discover/resolve: warning:", err) - errCounter++ - continue - } - - conn, err := net.ListenUDP("udp4", iaddr) - if err != nil { - log.Println("discover/listen: warning:", err) - errCounter++ - continue - } - - if debug { - dlog.Println("send announcement from", conn.LocalAddr(), "to", remote, "on", intf.Name) - } - - _, err = conn.WriteTo(buf, remote) - if err != nil { - // Some interfaces don't seem to support broadcast even though the flags claims they do, i.e. vmnet - conn.Close() - - if debug { - log.Println(err) - } - - errCounter++ - continue - } - - conn.Close() + for _, intf := range d.intfs { + wcm.IfIndex = intf.Index + if _, err = d.conn.WriteTo(buf, &wcm, d.group); err != nil { + log.Printf("discover/sendLocalAnnouncements: on %s: %v; no local announcement", intf.Name, err) + errCounter++ + continue + } else { errCounter = 0 } } @@ -157,25 +161,29 @@ func (d *Discoverer) sendAnnouncements() { case <-d.forcedBroadcastTick: } } - log.Println("discover/write: local: stopping due to too many errors:", err) } -func (d *Discoverer) sendExtAnnouncements() { +func (d *Discoverer) sendExternalAnnouncements() { remote, err := net.ResolveUDPAddr("udp", d.extServer) if err != nil { log.Printf("discover/external: %v; no external announcements", err) return } - var pkt = AnnounceV2{AnnouncementMagicV2, d.MyID, []Address{{nil, 22000}}} - var buf = pkt.MarshalXDR() + conn, err := net.ListenUDP("udp", nil) + if err != nil { + log.Printf("discover/external: %v; no external announcements", err) + return + } + + var buf = d.announcementPkt() var errCounter = 0 for errCounter < maxErrors { if debug { dlog.Println("send announcement -> ", remote) } - _, err = d.conn.WriteTo(buf, remote) + _, err = conn.WriteTo(buf, remote) if err != nil { log.Println("discover/write: warning:", err) errCounter++ @@ -192,7 +200,7 @@ func (d *Discoverer) recvAnnouncements() { var errCounter = 0 var err error for errCounter < maxErrors { - n, addr, err := d.conn.ReadFromUDP(buf) + n, _, addr, err := d.conn.ReadFrom(buf) if err != nil { errCounter++ time.Sleep(time.Second) @@ -224,7 +232,9 @@ func (d *Discoverer) recvAnnouncements() { if len(a.IP) > 0 { nodeAddr = fmt.Sprintf("%s:%d", ipStr(a.IP), a.Port) } else { - nodeAddr = fmt.Sprintf("%s:%d", addr.IP.String(), a.Port) + ua := addr.(*net.UDPAddr) + ua.Port = int(a.Port) + nodeAddr = ua.String() } addrs = append(addrs, nodeAddr) } diff --git a/integration/h1/config.xml b/integration/h1/config.xml index c62208277..f5840342f 100644 --- a/integration/h1/config.xml +++ b/integration/h1/config.xml @@ -1,13 +1,13 @@ -
127.0.0.1:22001
+
dynamic
-
127.0.0.1:22002
+
dynamic
-
127.0.0.1:22003
+
dynamic
@@ -19,7 +19,7 @@ 127.0.0.1:8081 announce.syncthing.net:22025 false - false + true 16 0 60 diff --git a/integration/h2/config.xml b/integration/h2/config.xml index 6896e58c2..c18c147ec 100644 --- a/integration/h2/config.xml +++ b/integration/h2/config.xml @@ -1,17 +1,17 @@ -
127.0.0.1:22001
+
dynamic
-
127.0.0.1:22002
+
dynamic
-
127.0.0.1:22003
+
dynamic
- 127.0.0.2:22002 + 127.0.0.1:22002 false true true @@ -19,7 +19,7 @@ 127.0.0.1:8082 announce.syncthing.net:22025 false - false + true 16 0 60 diff --git a/integration/h3/config.xml b/integration/h3/config.xml index 6c48cb150..f990deeba 100644 --- a/integration/h3/config.xml +++ b/integration/h3/config.xml @@ -1,13 +1,13 @@ -
127.0.0.1:22001
+
dynamic
-
127.0.0.1:22002
+
dynamic
-
127.0.0.1:22003
+
dynamic
@@ -19,7 +19,7 @@ 127.0.0.1:8083 announce.syncthing.net:22025 false - false + true 16 0 60