diff --git a/lib/relay/client/client.go b/lib/relay/client/client.go index aeef8f995..2dee8c6ae 100644 --- a/lib/relay/client/client.go +++ b/lib/relay/client/client.go @@ -5,279 +5,37 @@ package client import ( "crypto/tls" "fmt" - "net" "net/url" "time" - "github.com/syncthing/syncthing/lib/dialer" - syncthingprotocol "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/relay/protocol" - "github.com/syncthing/syncthing/lib/sync" ) -type ProtocolClient struct { - URI *url.URL - Invitations chan protocol.SessionInvitation +type relayClientFactory func(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) RelayClient - closeInvitationsOnFinish bool +var ( + supportedSchemes = map[string]relayClientFactory{ + "relay": newStaticClient, + "dynamic+http": newDynamicClient, + "dynamic+https": newDynamicClient, + } +) - config *tls.Config - - timeout time.Duration - - stop chan struct{} - stopped chan struct{} - - conn *tls.Conn - - mut sync.RWMutex - connected bool - latency time.Duration +type RelayClient interface { + Serve() + Stop() + StatusOK() bool + Latency() time.Duration + String() string + Invitations() chan protocol.SessionInvitation + URI() *url.URL } -func NewProtocolClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) *ProtocolClient { - closeInvitationsOnFinish := false - if invitations == nil { - closeInvitationsOnFinish = true - invitations = make(chan protocol.SessionInvitation) +func NewClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) (RelayClient, error) { + factory, ok := supportedSchemes[uri.Scheme] + if !ok { + return nil, fmt.Errorf("Unsupported scheme: %s", uri.Scheme) } - return &ProtocolClient{ - URI: uri, - Invitations: invitations, - - closeInvitationsOnFinish: closeInvitationsOnFinish, - - config: configForCerts(certs), - - timeout: time.Minute * 2, - - stop: make(chan struct{}), - stopped: make(chan struct{}), - - mut: sync.NewRWMutex(), - connected: false, - } -} - -func (c *ProtocolClient) Serve() { - c.stop = make(chan struct{}) - c.stopped = make(chan struct{}) - defer close(c.stopped) - - if err := c.connect(); err != nil { - l.Debugln("Relay connect:", err) - return - } - - l.Debugln(c, "connected", c.conn.RemoteAddr()) - - if err := c.join(); err != nil { - c.conn.Close() - l.Infoln("Relay join:", err) - return - } - - if err := c.conn.SetDeadline(time.Time{}); err != nil { - c.conn.Close() - l.Infoln("Relay set deadline:", err) - return - } - - l.Debugln(c, "joined", c.conn.RemoteAddr(), "via", c.conn.LocalAddr()) - - defer c.cleanup() - c.mut.Lock() - c.connected = true - c.mut.Unlock() - - messages := make(chan interface{}) - errors := make(chan error, 1) - - go messageReader(c.conn, messages, errors) - - timeout := time.NewTimer(c.timeout) - - for { - select { - case message := <-messages: - timeout.Reset(c.timeout) - l.Debugf("%s received message %T", c, message) - - switch msg := message.(type) { - case protocol.Ping: - if err := protocol.WriteMessage(c.conn, protocol.Pong{}); err != nil { - l.Infoln("Relay write:", err) - return - - } - l.Debugln(c, "sent pong") - - case protocol.SessionInvitation: - ip := net.IP(msg.Address) - if len(ip) == 0 || ip.IsUnspecified() { - msg.Address = c.conn.RemoteAddr().(*net.TCPAddr).IP[:] - } - c.Invitations <- msg - - default: - l.Infoln("Relay: protocol error: unexpected message %v", msg) - return - } - - case <-c.stop: - l.Debugln(c, "stopping") - return - - case err := <-errors: - l.Infoln("Relay received:", err) - return - - case <-timeout.C: - l.Debugln(c, "timed out") - return - } - } -} - -func (c *ProtocolClient) Stop() { - if c.stop == nil { - return - } - - close(c.stop) - <-c.stopped -} - -func (c *ProtocolClient) StatusOK() bool { - c.mut.RLock() - con := c.connected - c.mut.RUnlock() - return con -} - -func (c *ProtocolClient) Latency() time.Duration { - c.mut.RLock() - lat := c.latency - c.mut.RUnlock() - return lat -} - -func (c *ProtocolClient) String() string { - return fmt.Sprintf("ProtocolClient@%p", c) -} - -func (c *ProtocolClient) connect() error { - if c.URI.Scheme != "relay" { - return fmt.Errorf("Unsupported relay schema: %v", c.URI.Scheme) - } - - t0 := time.Now() - tcpConn, err := dialer.Dial("tcp", c.URI.Host) - if err != nil { - return err - } - - c.mut.Lock() - c.latency = time.Since(t0) - c.mut.Unlock() - - conn := tls.Client(tcpConn, c.config) - if err = conn.Handshake(); err != nil { - return err - } - - if err := conn.SetDeadline(time.Now().Add(10 * time.Second)); err != nil { - conn.Close() - return err - } - - if err := performHandshakeAndValidation(conn, c.URI); err != nil { - conn.Close() - return err - } - - c.conn = conn - return nil -} - -func (c *ProtocolClient) cleanup() { - if c.closeInvitationsOnFinish { - close(c.Invitations) - c.Invitations = make(chan protocol.SessionInvitation) - } - - l.Debugln(c, "cleaning up") - - c.mut.Lock() - c.connected = false - c.mut.Unlock() - - c.conn.Close() -} - -func (c *ProtocolClient) join() error { - if err := protocol.WriteMessage(c.conn, protocol.JoinRelayRequest{}); err != nil { - return err - } - - message, err := protocol.ReadMessage(c.conn) - if err != nil { - return err - } - - switch msg := message.(type) { - case protocol.Response: - if msg.Code != 0 { - return fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message) - } - - default: - return fmt.Errorf("protocol error: expecting response got %v", msg) - } - - return nil -} - -func performHandshakeAndValidation(conn *tls.Conn, uri *url.URL) error { - if err := conn.Handshake(); err != nil { - return err - } - - cs := conn.ConnectionState() - if !cs.NegotiatedProtocolIsMutual || cs.NegotiatedProtocol != protocol.ProtocolName { - return fmt.Errorf("protocol negotiation error") - } - - q := uri.Query() - relayIDs := q.Get("id") - if relayIDs != "" { - relayID, err := syncthingprotocol.DeviceIDFromString(relayIDs) - if err != nil { - return fmt.Errorf("relay address contains invalid verification id: %s", err) - } - - certs := cs.PeerCertificates - if cl := len(certs); cl != 1 { - return fmt.Errorf("unexpected certificate count: %d", cl) - } - - remoteID := syncthingprotocol.NewDeviceID(certs[0].Raw) - if remoteID != relayID { - return fmt.Errorf("relay id does not match. Expected %v got %v", relayID, remoteID) - } - } - - return nil -} - -func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) { - for { - msg, err := protocol.ReadMessage(conn) - if err != nil { - errors <- err - return - } - messages <- msg - } + return factory(uri, certs, invitations), nil } diff --git a/lib/relay/client/dynamic.go b/lib/relay/client/dynamic.go new file mode 100644 index 000000000..34de84751 --- /dev/null +++ b/lib/relay/client/dynamic.go @@ -0,0 +1,217 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package client + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "net/url" + "sort" + "time" + + "github.com/syncthing/syncthing/lib/osutil" + "github.com/syncthing/syncthing/lib/relay/protocol" + "github.com/syncthing/syncthing/lib/sync" +) + +type dynamicClient struct { + pooladdr *url.URL + certs []tls.Certificate + invitations chan protocol.SessionInvitation + closeInvitationsOnFinish bool + + mut sync.RWMutex + client RelayClient + stop chan struct{} +} + +func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) RelayClient { + closeInvitationsOnFinish := false + if invitations == nil { + closeInvitationsOnFinish = true + invitations = make(chan protocol.SessionInvitation) + } + return &dynamicClient{ + pooladdr: uri, + certs: certs, + invitations: invitations, + closeInvitationsOnFinish: closeInvitationsOnFinish, + + mut: sync.NewRWMutex(), + } +} + +func (c *dynamicClient) Serve() { + c.mut.Lock() + c.stop = make(chan struct{}) + c.mut.Unlock() + + uri := *c.pooladdr + + // Trim off the `dynamic+` prefix + uri.Scheme = uri.Scheme[8:] + + l.Debugln(c, "looking up dynamic relays") + + data, err := http.Get(uri.String()) + if err != nil { + l.Debugln(c, "failed to lookup dynamic relays", err) + return + } + + var ann dynamicAnnouncement + err = json.NewDecoder(data.Body).Decode(&ann) + data.Body.Close() + if err != nil { + l.Debugln(c, "failed to lookup dynamic relays", err) + return + } + + defer c.cleanup() + + var addrs []string + for _, relayAnn := range ann.Relays { + ruri, err := url.Parse(relayAnn.URL) + if err != nil { + l.Debugln(c, "failed to parse dynamic relay address", relayAnn.URL, err) + continue + } + l.Debugln(c, "found", ruri) + addrs = append(addrs, ruri.String()) + } + + for _, addr := range relayAddressesSortedByLatency(addrs) { + select { + case <-c.stop: + l.Debugln(c, "stopping") + return + default: + ruri, err := url.Parse(addr) + if err != nil { + l.Debugln(c, "skipping relay", addr, err) + continue + } + client, err := NewClient(ruri, c.certs, c.invitations) + if err != nil { + continue + } + c.mut.Lock() + c.client = client + c.mut.Unlock() + + c.client.Serve() + + c.mut.Lock() + c.client = nil + c.mut.Unlock() + } + } + l.Debugln(c, "could not find a connectable relay") +} + +func (c *dynamicClient) Stop() { + c.mut.RLock() + defer c.mut.RUnlock() + close(c.stop) + if c.client == nil { + return + } + c.client.Stop() +} + +func (c *dynamicClient) StatusOK() bool { + c.mut.RLock() + defer c.mut.RUnlock() + if c.client == nil { + return false + } + return c.client.StatusOK() +} + +func (c *dynamicClient) Latency() time.Duration { + c.mut.RLock() + defer c.mut.RUnlock() + if c.client == nil { + return time.Hour + } + return c.client.Latency() +} + +func (c *dynamicClient) String() string { + return fmt.Sprintf("DynamicClient:%p:%s@%s", c, c.URI(), c.pooladdr) +} + +func (c *dynamicClient) URI() *url.URL { + c.mut.RLock() + defer c.mut.RUnlock() + if c.client == nil { + return c.pooladdr + } + return c.client.URI() +} + +func (c *dynamicClient) Invitations() chan protocol.SessionInvitation { + c.mut.RLock() + inv := c.invitations + c.mut.RUnlock() + return inv +} + +func (c *dynamicClient) cleanup() { + c.mut.Lock() + if c.closeInvitationsOnFinish { + close(c.invitations) + c.invitations = make(chan protocol.SessionInvitation) + } + c.mut.Unlock() +} + +// This is the announcement recieved from the relay server; +// {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]} +type dynamicAnnouncement struct { + Relays []struct { + URL string + } +} + +// relayAddressesSortedByLatency adds local latency to the relay, and sorts them +// by sum latency, and returns the addresses. +func relayAddressesSortedByLatency(input []string) []string { + relays := make(relayList, len(input)) + for i, relay := range input { + if latency, err := osutil.GetLatencyForURL(relay); err == nil { + relays[i] = relayWithLatency{relay, int(latency / time.Millisecond)} + } else { + relays[i] = relayWithLatency{relay, int(time.Hour / time.Millisecond)} + } + } + + sort.Sort(relays) + + addresses := make([]string, len(relays)) + for i, relay := range relays { + addresses[i] = relay.relay + } + return addresses +} + +type relayWithLatency struct { + relay string + latency int +} + +type relayList []relayWithLatency + +func (l relayList) Len() int { + return len(l) +} + +func (l relayList) Less(a, b int) bool { + return l[a].latency < l[b].latency +} + +func (l relayList) Swap(a, b int) { + l[a], l[b] = l[b], l[a] +} diff --git a/lib/relay/client/methods.go b/lib/relay/client/methods.go index 92a7e9b07..546e8279b 100644 --- a/lib/relay/client/methods.go +++ b/lib/relay/client/methods.go @@ -102,7 +102,11 @@ func JoinSession(invitation protocol.SessionInvitation) (net.Conn, error) { func TestRelay(uri *url.URL, certs []tls.Certificate, sleep time.Duration, times int) bool { id := syncthingprotocol.NewDeviceID(certs[0].Certificate[0]) invs := make(chan protocol.SessionInvitation, 1) - c := NewProtocolClient(uri, certs, invs) + c, err := NewClient(uri, certs, invs) + if err != nil { + close(invs) + return false + } go c.Serve() defer func() { close(invs) diff --git a/lib/relay/client/static.go b/lib/relay/client/static.go new file mode 100644 index 000000000..5cce7b25c --- /dev/null +++ b/lib/relay/client/static.go @@ -0,0 +1,291 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package client + +import ( + "crypto/tls" + "fmt" + "net" + "net/url" + "time" + + syncthingprotocol "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/relay/protocol" + "github.com/syncthing/syncthing/lib/sync" +) + +type staticClient struct { + uri *url.URL + invitations chan protocol.SessionInvitation + + closeInvitationsOnFinish bool + + config *tls.Config + + timeout time.Duration + + stop chan struct{} + stopped chan struct{} + + conn *tls.Conn + + mut sync.RWMutex + connected bool + latency time.Duration +} + +func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) RelayClient { + closeInvitationsOnFinish := false + if invitations == nil { + closeInvitationsOnFinish = true + invitations = make(chan protocol.SessionInvitation) + } + + return &staticClient{ + uri: uri, + invitations: invitations, + + closeInvitationsOnFinish: closeInvitationsOnFinish, + + config: configForCerts(certs), + + timeout: time.Minute * 2, + + stop: make(chan struct{}), + stopped: make(chan struct{}), + + mut: sync.NewRWMutex(), + connected: false, + } +} + +func (c *staticClient) Serve() { + c.stop = make(chan struct{}) + c.stopped = make(chan struct{}) + defer close(c.stopped) + + if err := c.connect(); err != nil { + l.Debugln("Relay connect:", err) + return + } + + l.Debugln(c, "connected", c.conn.RemoteAddr()) + + if err := c.join(); err != nil { + c.conn.Close() + l.Infoln("Relay join:", err) + return + } + + if err := c.conn.SetDeadline(time.Time{}); err != nil { + c.conn.Close() + l.Infoln("Relay set deadline:", err) + return + } + + l.Debugln(c, "joined", c.conn.RemoteAddr(), "via", c.conn.LocalAddr()) + + defer c.cleanup() + c.mut.Lock() + c.connected = true + c.mut.Unlock() + + messages := make(chan interface{}) + errors := make(chan error, 1) + + go messageReader(c.conn, messages, errors) + + timeout := time.NewTimer(c.timeout) + + for { + select { + case message := <-messages: + timeout.Reset(c.timeout) + l.Debugf("%s received message %T", c, message) + + switch msg := message.(type) { + case protocol.Ping: + if err := protocol.WriteMessage(c.conn, protocol.Pong{}); err != nil { + l.Infoln("Relay write:", err) + return + + } + l.Debugln(c, "sent pong") + + case protocol.SessionInvitation: + ip := net.IP(msg.Address) + if len(ip) == 0 || ip.IsUnspecified() { + msg.Address = c.conn.RemoteAddr().(*net.TCPAddr).IP[:] + } + c.invitations <- msg + + default: + l.Infoln("Relay: protocol error: unexpected message %v", msg) + return + } + + case <-c.stop: + l.Debugln(c, "stopping") + return + + case err := <-errors: + l.Infoln("Relay received:", err) + return + + case <-timeout.C: + l.Debugln(c, "timed out") + return + } + } +} + +func (c *staticClient) Stop() { + if c.stop == nil { + return + } + + close(c.stop) + <-c.stopped +} + +func (c *staticClient) StatusOK() bool { + c.mut.RLock() + con := c.connected + c.mut.RUnlock() + return con +} + +func (c *staticClient) Latency() time.Duration { + c.mut.RLock() + lat := c.latency + c.mut.RUnlock() + return lat +} + +func (c *staticClient) String() string { + return fmt.Sprintf("StaticClient:%p@%s", c, c.URI()) +} + +func (c *staticClient) URI() *url.URL { + return c.uri +} + +func (c *staticClient) Invitations() chan protocol.SessionInvitation { + c.mut.RLock() + inv := c.invitations + c.mut.RUnlock() + return inv +} + +func (c *staticClient) connect() error { + if c.uri.Scheme != "relay" { + return fmt.Errorf("Unsupported relay schema: %v", c.uri.Scheme) + } + + t0 := time.Now() + tcpConn, err := net.Dial("tcp", c.uri.Host) + if err != nil { + return err + } + + c.mut.Lock() + c.latency = time.Since(t0) + c.mut.Unlock() + + conn := tls.Client(tcpConn, c.config) + if err = conn.Handshake(); err != nil { + return err + } + + if err := conn.SetDeadline(time.Now().Add(10 * time.Second)); err != nil { + conn.Close() + return err + } + + if err := performHandshakeAndValidation(conn, c.uri); err != nil { + conn.Close() + return err + } + + c.conn = conn + return nil +} + +func (c *staticClient) cleanup() { + l.Debugln(c, "cleaning up") + c.mut.Lock() + if c.closeInvitationsOnFinish { + close(c.invitations) + c.invitations = make(chan protocol.SessionInvitation) + } + c.connected = false + c.mut.Unlock() + + c.conn.Close() +} + +func (c *staticClient) join() error { + if err := protocol.WriteMessage(c.conn, protocol.JoinRelayRequest{}); err != nil { + return err + } + + message, err := protocol.ReadMessage(c.conn) + if err != nil { + return err + } + + switch msg := message.(type) { + case protocol.Response: + if msg.Code != 0 { + return fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message) + } + + default: + return fmt.Errorf("protocol error: expecting response got %v", msg) + } + + return nil +} + +func performHandshakeAndValidation(conn *tls.Conn, uri *url.URL) error { + if err := conn.Handshake(); err != nil { + return err + } + + cs := conn.ConnectionState() + if !cs.NegotiatedProtocolIsMutual || cs.NegotiatedProtocol != protocol.ProtocolName { + return fmt.Errorf("protocol negotiation error") + } + + q := uri.Query() + relayIDs := q.Get("id") + if relayIDs != "" { + relayID, err := syncthingprotocol.DeviceIDFromString(relayIDs) + if err != nil { + return fmt.Errorf("relay address contains invalid verification id: %s", err) + } + + certs := cs.PeerCertificates + if cl := len(certs); cl != 1 { + return fmt.Errorf("unexpected certificate count: %d", cl) + } + + remoteID := syncthingprotocol.NewDeviceID(certs[0].Raw) + if remoteID != relayID { + return fmt.Errorf("relay id does not match. Expected %v got %v", relayID, remoteID) + } + } + + return nil +} + +func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) { + for { + msg, err := protocol.ReadMessage(conn) + if err != nil { + errors <- err + return + } + messages <- msg + } +} diff --git a/lib/relay/relay.go b/lib/relay/relay.go index 135164fef..91cb855a8 100644 --- a/lib/relay/relay.go +++ b/lib/relay/relay.go @@ -8,15 +8,13 @@ package relay import ( "crypto/tls" - "encoding/json" - "net/http" + "net/url" "sort" "time" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/events" - "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/relay/client" "github.com/syncthing/syncthing/lib/relay/protocol" "github.com/syncthing/syncthing/lib/sync" @@ -34,7 +32,7 @@ type Svc struct { tlsCfg *tls.Config tokens map[string]suture.ServiceToken - clients map[string]*client.ProtocolClient + clients map[string]client.RelayClient mut sync.RWMutex invitations chan protocol.SessionInvitation conns chan *tls.Conn @@ -56,7 +54,7 @@ func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config) *Svc { tlsCfg: tlsCfg, tokens: make(map[string]suture.ServiceToken), - clients: make(map[string]*client.ProtocolClient), + clients: make(map[string]client.RelayClient), mut: sync.NewRWMutex(), invitations: make(chan protocol.SessionInvitation), conns: conns, @@ -106,61 +104,17 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool { existing[uri.String()] = uri } - // Query dynamic addresses, and pick the closest relay from the ones they provide. - for key, uri := range existing { - if uri.Scheme != "dynamic+http" && uri.Scheme != "dynamic+https" { - continue - } - delete(existing, key) - - // Trim off the `dynamic+` prefix - uri.Scheme = uri.Scheme[8:] - - l.Debugln("Looking up dynamic relays from", uri) - - data, err := http.Get(uri.String()) - if err != nil { - l.Debugln("Failed to lookup dynamic relays", err) - continue - } - - var ann dynamicAnnouncement - err = json.NewDecoder(data.Body).Decode(&ann) - data.Body.Close() - if err != nil { - l.Debugln("Failed to lookup dynamic relays", err) - continue - } - - var dynRelayAddrs []string - for _, relayAnn := range ann.Relays { - ruri, err := url.Parse(relayAnn.URL) - if err != nil { - l.Debugln("Failed to parse dynamic relay address", relayAnn.URL, err) - continue - } - l.Debugln("Found", ruri, "via", uri) - dynRelayAddrs = append(dynRelayAddrs, ruri.String()) - } - - if len(dynRelayAddrs) > 0 { - dynRelayAddrs = relayAddressesSortedByLatency(dynRelayAddrs) - closestRelay := dynRelayAddrs[0] - l.Debugln("Picking", closestRelay, "as closest dynamic relay from", uri) - ruri, _ := url.Parse(closestRelay) - existing[closestRelay] = ruri - } else { - l.Debugln("No dynamic relay found on", uri) - } - } - s.mut.Lock() for key, uri := range existing { _, ok := s.tokens[key] if !ok { l.Debugln("Connecting to relay", uri) - c := client.NewProtocolClient(uri, s.tlsCfg.Certificates, s.invitations) + c, err := client.NewClient(uri, s.tlsCfg.Certificates, s.invitations) + if err != nil { + l.Debugln("Failed to connect to relay", uri, err) + continue + } s.tokens[key] = s.Add(c) s.clients[key] = c } @@ -197,8 +151,8 @@ func (s *Svc) Relays() []string { s.mut.RLock() relays := make([]string, 0, len(s.clients)) - for uri := range s.clients { - relays = append(relays, uri) + for _, client := range s.clients { + relays = append(relays, client.URI().String()) } s.mut.RUnlock() @@ -216,14 +170,14 @@ func (s *Svc) RelayStatus(uri string) (time.Duration, bool) { } s.mut.RLock() - client, ok := s.clients[uri] + for _, client := range s.clients { + if client.URI().String() == uri { + return client.Latency(), client.StatusOK() + } + } s.mut.RUnlock() - if !ok || !client.StatusOK() { - return time.Hour, false - } - - return client.Latency(), true + return time.Hour, false } // Accept returns a new *tls.Conn. The connection is already handshaken. @@ -277,7 +231,7 @@ func (r *invitationReceiver) Stop() { // The eventBroadcaster sends a RelayStateChanged event when the relay status // changes. We need this somewhat ugly polling mechanism as there's currently // no way to get the event feed directly from the relay lib. This may be -// somethign to revisit later, possibly. +// something to revisit later, possibly. type eventBroadcaster struct { svc *Svc stop chan struct{} @@ -322,51 +276,3 @@ func (e *eventBroadcaster) Serve() { func (e *eventBroadcaster) Stop() { close(e.stop) } - -// This is the announcement recieved from the relay server; -// {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]} -type dynamicAnnouncement struct { - Relays []struct { - URL string - } -} - -// relayAddressesSortedByLatency adds local latency to the relay, and sorts them -// by sum latency, and returns the addresses. -func relayAddressesSortedByLatency(input []string) []string { - relays := make(relayList, len(input)) - for i, relay := range input { - if latency, err := osutil.GetLatencyForURL(relay); err == nil { - relays[i] = relayWithLatency{relay, int(latency / time.Millisecond)} - } else { - relays[i] = relayWithLatency{relay, int(time.Hour / time.Millisecond)} - } - } - - sort.Sort(relays) - - addresses := make([]string, len(relays)) - for i, relay := range relays { - addresses[i] = relay.relay - } - return addresses -} - -type relayWithLatency struct { - relay string - latency int -} - -type relayList []relayWithLatency - -func (l relayList) Len() int { - return len(l) -} - -func (l relayList) Less(a, b int) bool { - return l[a].latency < l[b].latency -} - -func (l relayList) Swap(a, b int) { - l[a], l[b] = l[b], l[a] -}