diff --git a/cmd/syncthing/connections.go b/cmd/syncthing/connections.go index 7b175e842..f02489ee1 100644 --- a/cmd/syncthing/connections.go +++ b/cmd/syncthing/connections.go @@ -22,6 +22,7 @@ import ( "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/model" "github.com/syncthing/syncthing/lib/osutil" + "github.com/syncthing/syncthing/lib/relay" "github.com/thejerf/suture" ) @@ -44,6 +45,7 @@ type connectionSvc struct { tlsCfg *tls.Config discoverer *discover.Discoverer conns chan model.IntermediateConnection + relaySvc *relay.Svc lastRelayCheck map[protocol.DeviceID]time.Time @@ -52,7 +54,7 @@ type connectionSvc struct { relaysEnabled bool } -func newConnectionSvc(cfg *config.Wrapper, myID protocol.DeviceID, mdl *model.Model, tlsCfg *tls.Config, discoverer *discover.Discoverer) *connectionSvc { +func newConnectionSvc(cfg *config.Wrapper, myID protocol.DeviceID, mdl *model.Model, tlsCfg *tls.Config, discoverer *discover.Discoverer, relaySvc *relay.Svc) *connectionSvc { svc := &connectionSvc{ Supervisor: suture.NewSimple("connectionSvc"), cfg: cfg, @@ -60,6 +62,7 @@ func newConnectionSvc(cfg *config.Wrapper, myID protocol.DeviceID, mdl *model.Mo model: mdl, tlsCfg: tlsCfg, discoverer: discoverer, + relaySvc: relaySvc, conns: make(chan model.IntermediateConnection), connType: make(map[protocol.DeviceID]model.ConnectionType), @@ -104,6 +107,10 @@ func newConnectionSvc(cfg *config.Wrapper, myID protocol.DeviceID, mdl *model.Mo } svc.Add(serviceFunc(svc.handle)) + if svc.relaySvc != nil { + svc.Add(serviceFunc(svc.acceptRelayConns)) + } + return svc } @@ -385,6 +392,12 @@ func (s *connectionSvc) connect() { } } +func (s *connectionSvc) acceptRelayConns() { + for { + s.conns <- s.relaySvc.Accept() + } +} + func (s *connectionSvc) shouldLimit(addr net.Addr) bool { if s.cfg.Options().LimitBandwidthInLan { return true diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 04fa9eb1f..6210136ed 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -711,16 +711,19 @@ func syncthingMain() { setupGUI(mainSvc, cfg, m, apiSub, discoverer) + // Start relay management + + var relaySvc *relay.Svc + if opts.RelaysEnabled && (opts.GlobalAnnEnabled || opts.RelayWithoutGlobalAnn) { + relaySvc = relay.NewSvc(cfg, tlsCfg) + mainSvc.Add(relaySvc) + } + // Start connection management - connectionSvc := newConnectionSvc(cfg, myID, m, tlsCfg, discoverer) + connectionSvc := newConnectionSvc(cfg, myID, m, tlsCfg, discoverer, relaySvc) mainSvc.Add(connectionSvc) - if opts.RelaysEnabled && (opts.GlobalAnnEnabled || opts.RelayWithoutGlobalAnn) { - relaySvc = relay.NewSvc(cfg, tlsCfg, connectionSvc.conns) - connectionSvc.Add(relaySvc) - } - if cpuProfile { f, err := os.Create(fmt.Sprintf("cpu-%d.pprof", os.Getpid())) if err != nil { diff --git a/lib/relay/relay.go b/lib/relay/relay.go index e97bb1107..a651a2156 100644 --- a/lib/relay/relay.go +++ b/lib/relay/relay.go @@ -25,7 +25,9 @@ import ( "github.com/thejerf/suture" ) -func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config, conns chan<- model.IntermediateConnection) *Svc { +func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config) *Svc { + conns := make(chan model.IntermediateConnection) + svc := &Svc{ Supervisor: suture.New("Svc", suture.Spec{ Log: func(log string) { @@ -40,11 +42,11 @@ func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config, conns chan<- model.Intermed cfg: cfg, tlsCfg: tlsCfg, - tokens: make(map[string]suture.ServiceToken), - clients: make(map[string]*client.ProtocolClient), - mut: sync.NewRWMutex(), - + tokens: make(map[string]suture.ServiceToken), + clients: make(map[string]*client.ProtocolClient), + mut: sync.NewRWMutex(), invitations: make(chan protocol.SessionInvitation), + conns: conns, } rcfg := cfg.Raw() @@ -72,6 +74,7 @@ type Svc struct { clients map[string]*client.ProtocolClient mut sync.RWMutex invitations chan protocol.SessionInvitation + conns chan model.IntermediateConnection } func (s *Svc) VerifyConfiguration(from, to config.Configuration) error { @@ -207,6 +210,10 @@ func (s *Svc) ClientStatus() map[string]bool { return status } +func (s *Svc) Accept() model.IntermediateConnection { + return <-s.conns +} + type invitationReceiver struct { invitations chan protocol.SessionInvitation tlsCfg *tls.Config