Add incoming connection relay service

This commit is contained in:
Audrius Butkevicius 2015-06-28 20:09:53 +01:00
parent c2ccab4361
commit 27465353c1
4 changed files with 215 additions and 31 deletions

View File

@ -60,36 +60,14 @@ func newConnectionSvc(cfg *config.Wrapper, myID protocol.DeviceID, mdl *model.Mo
connType: make(map[protocol.DeviceID]model.ConnectionType),
}
cfg.Subscribe(svc)
// There are several moving parts here; one routine per listening address
// to handle incoming connections, one routine to periodically attempt
// outgoing connections, and lastly one routine to the the common handling
// regardless of whether the connection was incoming or outgoing. It ends
// up as in the diagram below. We embed a Supervisor to manage the
// routines (i.e. log and restart if they crash or exit, etc).
//
// +-----------------+
// Incoming | +---------------+-+ +-----------------+
// Connections | | | | |
// -------------->| | listener | | | Outgoing connections via dialers
// | | (1 per listen | | svc.connect |----------------------------------->
// | | address) | | |
// +-+ | | |
// +-----------------+ +-----------------+
// v v
// | |
// | |
// +------------+-----------+
// |
// | svc.conns
// v
// +-----------------+
// | |
// | |
// | svc.handle |------> model.AddConnection()
// | |
// | |
// +-----------------+
// outgoing connections, one routine to the the common handling
// regardless of whether the connection was incoming or outgoing.
// Furthermore, a relay service which handles incoming requests to connect
// via the relays.
//
// TODO: Clean shutdown, and/or handling config changes on the fly. We
// partly do this now - new devices and addresses will be picked up, but

View File

@ -668,6 +668,13 @@ func syncthingMain() {
l.Fatalln("Bad listen address:", err)
}
// Start the relevant services
connectionSvc := newConnectionSvc(cfg, myID, m, tlsCfg)
relaySvc := newRelaySvc(cfg, tlsCfg, connectionSvc.conns)
connectionSvc.Add(relaySvc)
mainSvc.Add(connectionSvc)
// Start discovery
localPort := addr.Port
@ -681,10 +688,6 @@ func syncthingMain() {
mainSvc.Add(upnpSvc)
}
connectionSvc := newConnectionSvc(cfg, myID, m, tlsCfg)
cfg.Subscribe(connectionSvc)
mainSvc.Add(connectionSvc)
if cpuProfile {
f, err := os.Create(fmt.Sprintf("cpu-%d.pprof", os.Getpid()))
if err != nil {

193
cmd/syncthing/relays.go Normal file
View File

@ -0,0 +1,193 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package main
import (
"crypto/tls"
"net"
"net/url"
"time"
"github.com/syncthing/relaysrv/client"
"github.com/syncthing/relaysrv/protocol"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/sync"
"github.com/thejerf/suture"
)
func newRelaySvc(cfg *config.Wrapper, tlsCfg *tls.Config, conns chan<- intermediateConnection) *relaySvc {
svc := &relaySvc{
Supervisor: suture.New("relaySvc", suture.Spec{
Log: func(log string) {
if debugNet {
l.Infoln(log)
}
},
FailureBackoff: 5 * time.Minute,
FailureDecay: float64((10 * time.Minute) / time.Second),
FailureThreshold: 5,
}),
cfg: cfg,
tlsCfg: tlsCfg,
tokens: make(map[string]suture.ServiceToken),
clients: make(map[string]*client.ProtocolClient),
mut: sync.NewRWMutex(),
invitations: make(chan protocol.SessionInvitation),
}
rcfg := cfg.Raw()
svc.CommitConfiguration(rcfg, rcfg)
cfg.Subscribe(svc)
receiver := &invitationReceiver{
tlsCfg: tlsCfg,
conns: conns,
invitations: svc.invitations,
}
svc.receiverToken = svc.Add(receiver)
return svc
}
type relaySvc struct {
*suture.Supervisor
cfg *config.Wrapper
tlsCfg *tls.Config
receiverToken suture.ServiceToken
tokens map[string]suture.ServiceToken
clients map[string]*client.ProtocolClient
mut sync.RWMutex
invitations chan protocol.SessionInvitation
}
func (s *relaySvc) VerifyConfiguration(from, to config.Configuration) error {
for _, addr := range to.Options.RelayServers {
_, err := url.Parse(addr)
if err != nil {
return err
}
}
return nil
}
func (s *relaySvc) CommitConfiguration(from, to config.Configuration) bool {
existing := make(map[string]struct{}, len(to.Options.RelayServers))
for _, addr := range to.Options.RelayServers {
uri, err := url.Parse(addr)
if err != nil {
if debugNet {
l.Debugln("Failed to parse relay address", addr, err)
}
continue
}
existing[uri.String()] = struct{}{}
_, ok := s.tokens[uri.String()]
if !ok {
if debugNet {
l.Debugln("Connecting to relay", uri)
}
c := client.NewProtocolClient(uri, s.tlsCfg.Certificates, s.invitations)
s.tokens[uri.String()] = s.Add(c)
s.mut.Lock()
s.clients[uri.String()] = c
s.mut.Unlock()
}
}
for uri, token := range s.tokens {
_, ok := existing[uri]
if !ok {
err := s.Remove(token)
delete(s.tokens, uri)
s.mut.Lock()
delete(s.clients, uri)
s.mut.Unlock()
if debugNet {
l.Debugln("Disconnecting from relay", uri, err)
}
}
}
return true
}
func (s *relaySvc) ClientStatus() map[string]bool {
s.mut.RLock()
status := make(map[string]bool, len(s.clients))
for uri, client := range s.clients {
status[uri] = client.StatusOK()
}
s.mut.RUnlock()
return status
}
type invitationReceiver struct {
invitations chan protocol.SessionInvitation
tlsCfg *tls.Config
conns chan<- intermediateConnection
stop chan struct{}
}
func (r *invitationReceiver) Serve() {
if r.stop != nil {
return
}
r.stop = make(chan struct{})
for {
select {
case inv := <-r.invitations:
if debugNet {
l.Debugln("Received relay invitation", inv)
}
conn, err := client.JoinSession(inv)
if err != nil {
if debugNet {
l.Debugf("Failed to join relay session %s: %v", inv, err)
}
continue
}
setTCPOptions(conn.(*net.TCPConn))
var tc *tls.Conn
if inv.ServerSocket {
tc = tls.Server(conn, r.tlsCfg)
} else {
tc = tls.Client(conn, r.tlsCfg)
}
err = tc.Handshake()
if err != nil {
l.Infof("TLS handshake (BEP/relay %s): %v", inv, err)
tc.Close()
continue
}
r.conns <- intermediateConnection{
tc, model.ConnectionTypeRelayAccept,
}
case <-r.stop:
return
}
}
}
func (r *invitationReceiver) Stop() {
if r.stop == nil {
return
}
r.stop <- struct{}{}
r.stop = nil
}

View File

@ -21,6 +21,8 @@ type Connection struct {
const (
ConnectionTypeBasicAccept ConnectionType = iota
ConnectionTypeBasicDial
ConnectionTypeRelayAccept
ConnectionTypeRelayDial
)
type ConnectionType int
@ -31,6 +33,14 @@ func (t ConnectionType) String() string {
return "basic-accept"
case ConnectionTypeBasicDial:
return "basic-dial"
case ConnectionTypeRelayAccept:
return "relay-accept"
case ConnectionTypeRelayDial:
return "relay-dial"
}
return "unknown"
}
func (t ConnectionType) IsDirect() bool {
return t == ConnectionTypeBasicAccept || t == ConnectionTypeBasicDial
}