ceea5ebeb3
Closes #4032
286 lines
6.2 KiB
Go
286 lines
6.2 KiB
Go
// Copyright (C) 2016 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
package connections
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"net"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/AudriusButkevicius/pfilter"
|
|
"github.com/ccding/go-stun/stun"
|
|
"github.com/syncthing/syncthing/lib/config"
|
|
"github.com/syncthing/syncthing/lib/nat"
|
|
"github.com/xtaci/kcp-go"
|
|
"github.com/xtaci/smux"
|
|
)
|
|
|
|
func init() {
|
|
factory := &kcpListenerFactory{}
|
|
for _, scheme := range []string{"kcp", "kcp4", "kcp6"} {
|
|
listeners[scheme] = factory
|
|
}
|
|
}
|
|
|
|
type kcpListener struct {
|
|
onAddressesChangedNotifier
|
|
|
|
uri *url.URL
|
|
cfg *config.Wrapper
|
|
tlsCfg *tls.Config
|
|
stop chan struct{}
|
|
conns chan internalConn
|
|
factory listenerFactory
|
|
|
|
address *url.URL
|
|
err error
|
|
mut sync.RWMutex
|
|
}
|
|
|
|
func (t *kcpListener) Serve() {
|
|
t.mut.Lock()
|
|
t.err = nil
|
|
t.mut.Unlock()
|
|
|
|
network := strings.Replace(t.uri.Scheme, "kcp", "udp", -1)
|
|
|
|
packetConn, err := net.ListenPacket(network, t.uri.Host)
|
|
if err != nil {
|
|
t.mut.Lock()
|
|
t.err = err
|
|
t.mut.Unlock()
|
|
l.Infoln("listen (BEP/kcp):", err)
|
|
return
|
|
}
|
|
filterConn := pfilter.NewPacketFilter(packetConn)
|
|
kcpConn := filterConn.NewConn(kcpNoFilterPriority, nil)
|
|
stunConn := filterConn.NewConn(kcpStunFilterPriority, &stunFilter{
|
|
ids: make(map[string]time.Time),
|
|
})
|
|
|
|
filterConn.Start()
|
|
registerFilter(filterConn)
|
|
|
|
listener, err := kcp.ServeConn(nil, 0, 0, kcpConn)
|
|
if err != nil {
|
|
t.mut.Lock()
|
|
t.err = err
|
|
t.mut.Unlock()
|
|
l.Infoln("listen (BEP/kcp):", err)
|
|
return
|
|
}
|
|
|
|
defer listener.Close()
|
|
defer stunConn.Close()
|
|
defer kcpConn.Close()
|
|
defer deregisterFilter(filterConn)
|
|
defer packetConn.Close()
|
|
|
|
l.Infof("KCP listener (%v) starting", kcpConn.LocalAddr())
|
|
defer l.Infof("KCP listener (%v) shutting down", kcpConn.LocalAddr())
|
|
|
|
go t.stunRenewal(stunConn)
|
|
|
|
for {
|
|
listener.SetDeadline(time.Now().Add(time.Second))
|
|
conn, err := listener.AcceptKCP()
|
|
|
|
select {
|
|
case <-t.stop:
|
|
if err == nil {
|
|
conn.Close()
|
|
}
|
|
return
|
|
default:
|
|
}
|
|
if err != nil {
|
|
if err, ok := err.(net.Error); !ok || !err.Timeout() {
|
|
l.Warnln("Accepting connection (BEP/kcp):", err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
opts := t.cfg.Options()
|
|
|
|
conn.SetStreamMode(true)
|
|
conn.SetACKNoDelay(false)
|
|
conn.SetWindowSize(opts.KCPSendWindowSize, opts.KCPReceiveWindowSize)
|
|
conn.SetNoDelay(boolInt(opts.KCPNoDelay), opts.KCPUpdateIntervalMs, boolInt(opts.KCPFastResend), boolInt(!opts.KCPCongestionControl))
|
|
|
|
l.Debugln("connect from", conn.RemoteAddr())
|
|
|
|
ses, err := smux.Server(conn, smuxConfig)
|
|
if err != nil {
|
|
l.Debugln("smux server:", err)
|
|
conn.Close()
|
|
continue
|
|
}
|
|
|
|
stream, err := ses.AcceptStream()
|
|
if err != nil {
|
|
l.Debugln("smux accept:", err)
|
|
ses.Close()
|
|
continue
|
|
}
|
|
|
|
tc := tls.Server(&sessionClosingStream{stream, ses}, t.tlsCfg)
|
|
tc.SetDeadline(time.Now().Add(time.Second * 10))
|
|
err = tc.Handshake()
|
|
if err != nil {
|
|
l.Debugln("TLS handshake (BEP/kcp):", err)
|
|
tc.Close()
|
|
continue
|
|
}
|
|
tc.SetDeadline(time.Time{})
|
|
|
|
t.conns <- internalConn{tc, connTypeKCPServer, kcpPriority}
|
|
}
|
|
}
|
|
|
|
func (t *kcpListener) Stop() {
|
|
close(t.stop)
|
|
}
|
|
|
|
func (t *kcpListener) URI() *url.URL {
|
|
return t.uri
|
|
}
|
|
|
|
func (t *kcpListener) WANAddresses() []*url.URL {
|
|
uris := t.LANAddresses()
|
|
t.mut.RLock()
|
|
if t.address != nil {
|
|
uris = append(uris, t.address)
|
|
}
|
|
t.mut.RUnlock()
|
|
return uris
|
|
}
|
|
|
|
func (t *kcpListener) LANAddresses() []*url.URL {
|
|
return []*url.URL{t.uri}
|
|
}
|
|
|
|
func (t *kcpListener) Error() error {
|
|
t.mut.RLock()
|
|
err := t.err
|
|
t.mut.RUnlock()
|
|
return err
|
|
}
|
|
|
|
func (t *kcpListener) String() string {
|
|
return t.uri.String()
|
|
}
|
|
|
|
func (t *kcpListener) Factory() listenerFactory {
|
|
return t.factory
|
|
}
|
|
|
|
func (t *kcpListener) stunRenewal(listener net.PacketConn) {
|
|
client := stun.NewClientWithConnection(listener)
|
|
client.SetSoftwareName("syncthing")
|
|
|
|
var uri url.URL
|
|
var natType stun.NATType
|
|
var extAddr *stun.Host
|
|
var err error
|
|
|
|
oldType := stun.NATUnknown
|
|
|
|
for {
|
|
|
|
disabled:
|
|
if t.cfg.Options().StunKeepaliveS < 1 {
|
|
time.Sleep(time.Second)
|
|
oldType = stun.NATUnknown
|
|
t.mut.Lock()
|
|
t.address = nil
|
|
t.mut.Unlock()
|
|
continue
|
|
}
|
|
|
|
for _, addr := range t.cfg.StunServers() {
|
|
client.SetServerAddr(addr)
|
|
|
|
natType, extAddr, err = client.Discover()
|
|
if err != nil || extAddr == nil {
|
|
l.Debugf("%s stun discovery on %s: %s", t.uri, addr, err)
|
|
continue
|
|
}
|
|
|
|
// The stun server is most likely borked, try another one.
|
|
if natType == stun.NATError || natType == stun.NATUnknown || natType == stun.NATBlocked {
|
|
l.Debugf("%s stun discovery on %s resolved to %s", t.uri, addr, natType)
|
|
continue
|
|
}
|
|
|
|
if oldType != natType {
|
|
l.Infof("%s detected NAT type: %s", t.uri, natType)
|
|
}
|
|
|
|
for {
|
|
changed := false
|
|
uri = *t.uri
|
|
uri.Host = extAddr.TransportAddr()
|
|
|
|
t.mut.Lock()
|
|
if t.address == nil || t.address.String() != uri.String() {
|
|
l.Infof("%s resolved external address %s (via %s)", t.uri, uri.String(), addr)
|
|
t.address = &uri
|
|
changed = true
|
|
}
|
|
t.mut.Unlock()
|
|
|
|
// This will most likely result in a call to WANAddresses() which tries to
|
|
// get t.mut, so notify while unlocked.
|
|
if changed {
|
|
t.notifyAddressesChanged(t)
|
|
}
|
|
|
|
select {
|
|
case <-time.After(time.Duration(t.cfg.Options().StunKeepaliveS) * time.Second):
|
|
case <-t.stop:
|
|
return
|
|
}
|
|
|
|
if t.cfg.Options().StunKeepaliveS < 1 {
|
|
goto disabled
|
|
}
|
|
|
|
extAddr, err = client.Keepalive()
|
|
if err != nil {
|
|
l.Debugf("%s stun keepalive on %s: %s (%v)", t.uri, addr, err, extAddr)
|
|
break
|
|
}
|
|
}
|
|
|
|
oldType = natType
|
|
}
|
|
|
|
// We failed to contact all provided stun servers, chillout for a while.
|
|
time.Sleep(time.Minute)
|
|
}
|
|
}
|
|
|
|
type kcpListenerFactory struct{}
|
|
|
|
func (f *kcpListenerFactory) New(uri *url.URL, cfg *config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener {
|
|
return &kcpListener{
|
|
uri: fixupPort(uri, config.DefaultKCPPort),
|
|
cfg: cfg,
|
|
tlsCfg: tlsCfg,
|
|
conns: conns,
|
|
stop: make(chan struct{}),
|
|
factory: f,
|
|
}
|
|
}
|
|
|
|
func (kcpListenerFactory) Enabled(cfg config.Configuration) bool {
|
|
return true
|
|
}
|