syncthing/lib/connections/quic_listen.go
Jakob Borg 9b660c1959
lib/config, lib/connections: Configurable protocol priority (ref #8626) (#8868)
This makes the various protocol priorities configurable among the other
options. With this, it's possible to prefer QUIC over TCP for WAN
connections, for example. Both sides need to be similarly configured for
this to work properly.

The default priority order remains the same as previously (TCP, QUIC,
Relay, with LAN better than WAN).

To make this happen I made each dialer & listener more priority aware,
and moved the check for whether a connection is LAN or not into the
dialer / listener -- this is the new "lanChecker" type that's passed
around.
2023-04-16 14:54:28 +02:00

245 lines
5.6 KiB
Go

// Copyright (C) 2019 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
//go:build go1.15 && !noquic
// +build go1.15,!noquic
package connections
import (
"context"
"crypto/tls"
"net"
"net/url"
"sync"
"sync/atomic"
"time"
"github.com/quic-go/quic-go"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/nat"
"github.com/syncthing/syncthing/lib/stun"
"github.com/syncthing/syncthing/lib/svcutil"
)
func init() {
factory := &quicListenerFactory{}
for _, scheme := range []string{"quic", "quic4", "quic6"} {
listeners[scheme] = factory
}
}
type quicListener struct {
svcutil.ServiceWithError
nat atomic.Uint64 // Holds a stun.NATType.
onAddressesChangedNotifier
uri *url.URL
cfg config.Wrapper
tlsCfg *tls.Config
conns chan internalConn
factory listenerFactory
registry *registry.Registry
lanChecker *lanChecker
address *url.URL
laddr net.Addr
mut sync.Mutex
}
func (t *quicListener) OnNATTypeChanged(natType stun.NATType) {
if natType != stun.NATUnknown {
l.Infof("%s detected NAT type: %s", t.uri, natType)
}
t.nat.Store(uint64(natType))
}
func (t *quicListener) OnExternalAddressChanged(address *stun.Host, via string) {
var uri *url.URL
if address != nil {
copy := *t.uri
uri = &copy
uri.Host = address.TransportAddr()
}
t.mut.Lock()
existingAddress := t.address
t.address = uri
t.mut.Unlock()
if uri != nil && (existingAddress == nil || existingAddress.String() != uri.String()) {
l.Infof("%s resolved external address %s (via %s)", t.uri, uri.String(), via)
t.notifyAddressesChanged(t)
} else if uri == nil && existingAddress != nil {
t.notifyAddressesChanged(t)
}
}
func (t *quicListener) serve(ctx context.Context) error {
network := quicNetwork(t.uri)
udpAddr, err := net.ResolveUDPAddr(network, t.uri.Host)
if err != nil {
l.Infoln("Listen (BEP/quic):", err)
return err
}
udpConn, err := net.ListenUDP(network, udpAddr)
if err != nil {
l.Infoln("Listen (BEP/quic):", err)
return err
}
defer func() { _ = udpConn.Close() }()
svc, conn := stun.New(t.cfg, t, udpConn)
defer conn.Close()
go svc.Serve(ctx)
t.registry.Register(t.uri.Scheme, conn)
defer t.registry.Unregister(t.uri.Scheme, conn)
listener, err := quic.Listen(conn, t.tlsCfg, quicConfig)
if err != nil {
l.Infoln("Listen (BEP/quic):", err)
return err
}
defer listener.Close()
t.notifyAddressesChanged(t)
defer t.clearAddresses(t)
l.Infof("QUIC listener (%v) starting", udpConn.LocalAddr())
defer l.Infof("QUIC listener (%v) shutting down", udpConn.LocalAddr())
t.mut.Lock()
t.laddr = udpConn.LocalAddr()
t.mut.Unlock()
defer func() {
t.mut.Lock()
t.laddr = nil
t.mut.Unlock()
}()
acceptFailures := 0
const maxAcceptFailures = 10
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
session, err := listener.Accept(ctx)
if err == context.Canceled {
return nil
} else if err != nil {
l.Infoln("Listen (BEP/quic): Accepting connection:", err)
acceptFailures++
if acceptFailures > maxAcceptFailures {
// Return to restart the listener, because something
// seems permanently damaged.
return err
}
// Slightly increased delay for each failure.
time.Sleep(time.Duration(acceptFailures) * time.Second)
continue
}
acceptFailures = 0
l.Debugln("connect from", session.RemoteAddr())
streamCtx, cancel := context.WithTimeout(ctx, quicOperationTimeout)
stream, err := session.AcceptStream(streamCtx)
cancel()
if err != nil {
l.Debugf("failed to accept stream from %s: %v", session.RemoteAddr(), err)
_ = session.CloseWithError(1, err.Error())
continue
}
priority := t.cfg.Options().ConnectionPriorityQUICWAN
isLocal := t.lanChecker.isLAN(session.RemoteAddr())
if isLocal {
priority = t.cfg.Options().ConnectionPriorityQUICLAN
}
t.conns <- newInternalConn(&quicTlsConn{session, stream, nil}, connTypeQUICServer, isLocal, priority)
}
}
func (t *quicListener) URI() *url.URL {
return t.uri
}
func (t *quicListener) WANAddresses() []*url.URL {
t.mut.Lock()
uris := []*url.URL{maybeReplacePort(t.uri, t.laddr)}
if t.address != nil {
uris = append(uris, t.address)
}
t.mut.Unlock()
return uris
}
func (t *quicListener) LANAddresses() []*url.URL {
t.mut.Lock()
uri := maybeReplacePort(t.uri, t.laddr)
t.mut.Unlock()
addrs := []*url.URL{uri}
network := quicNetwork(uri)
addrs = append(addrs, getURLsForAllAdaptersIfUnspecified(network, uri)...)
return addrs
}
func (t *quicListener) String() string {
return t.uri.String()
}
func (t *quicListener) Factory() listenerFactory {
return t.factory
}
func (t *quicListener) NATType() string {
v := stun.NATType(t.nat.Load())
if v == stun.NATUnknown || v == stun.NATError {
return "unknown"
}
return v.String()
}
type quicListenerFactory struct{}
func (*quicListenerFactory) Valid(config.Configuration) error {
return nil
}
func (f *quicListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, _ *nat.Service, registry *registry.Registry, lanChecker *lanChecker) genericListener {
l := &quicListener{
uri: fixupPort(uri, config.DefaultQUICPort),
cfg: cfg,
tlsCfg: tlsCfg,
conns: conns,
factory: f,
registry: registry,
lanChecker: lanChecker,
}
l.ServiceWithError = svcutil.AsService(l.serve, l.String())
l.nat.Store(uint64(stun.NATUnknown))
return l
}
func (quicListenerFactory) Enabled(_ config.Configuration) bool {
return true
}