syncthing/cmd/strelaypoolsrv/main.go
Jakob Borg 4d979a1ce9
all: Truncate some timestamps (fixes #7457) (#7459)
This truncates times meant for API consumption to second precision,
where fractions won't typically matter or add any value. Exception to
this is timestamps on logs and events, and of course I'm not touching
things like file metadata.

I'm not 100% certain this is an exhaustive change, but it's the things I
found by grepping and following the breadcrumbs from lib/api...

I also considered general-but-ugly solutions, like having the API
serializer itself do reflection magic or even regexps on returned
objects, but decided against it because aurgh...
2021-03-12 10:35:10 +01:00

660 lines
17 KiB
Go

// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package main
import (
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/golang/groupcache/lru"
"github.com/oschwald/geoip2-golang"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/syncthing/syncthing/cmd/strelaypoolsrv/auto"
"github.com/syncthing/syncthing/lib/assets"
"github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/relay/client"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/tlsutil"
"golang.org/x/time/rate"
)
type location struct {
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
City string `json:"city"`
Country string `json:"country"`
Continent string `json:"continent"`
}
type relay struct {
URL string `json:"url"`
Location location `json:"location"`
uri *url.URL
Stats *stats `json:"stats"`
StatsRetrieved time.Time `json:"statsRetrieved"`
}
type stats struct {
StartTime time.Time `json:"startTime"`
UptimeSeconds int `json:"uptimeSeconds"`
PendingSessionKeys int `json:"numPendingSessionKeys"`
ActiveSessions int `json:"numActiveSessions"`
Connections int `json:"numConnections"`
Proxies int `json:"numProxies"`
BytesProxied int `json:"bytesProxied"`
GoVersion string `json:"goVersion"`
GoOS string `json:"goOS"`
GoArch string `json:"goArch"`
GoMaxProcs int `json:"goMaxProcs"`
GoRoutines int `json:"goNumRoutine"`
Rates []int64 `json:"kbps10s1m5m15m30m60m"`
Options struct {
NetworkTimeout int `json:"network-timeout"`
PintInterval int `json:"ping-interval"`
MessageTimeout int `json:"message-timeout"`
SessionRate int `json:"per-session-rate"`
GlobalRate int `json:"global-rate"`
Pools []string `json:"pools"`
ProvidedBy string `json:"provided-by"`
} `json:"options"`
}
func (r relay) String() string {
return r.URL
}
type request struct {
relay *relay
result chan result
queueTimer *prometheus.Timer
}
type result struct {
err error
eviction time.Duration
}
var (
testCert tls.Certificate
knownRelaysFile = filepath.Join(os.TempDir(), "strelaypoolsrv_known_relays")
listen = ":80"
dir string
evictionTime = time.Hour
debug bool
getLRUSize = 10 << 10
getLimitBurst = 10
getLimitAvg = 2
postLRUSize = 1 << 10
postLimitBurst = 2
postLimitAvg = 2
getLimit time.Duration
postLimit time.Duration
permRelaysFile string
ipHeader string
geoipPath string
proto string
statsRefresh = time.Minute / 2
requestQueueLen = 10
requestProcessors = 1
getMut = sync.NewMutex()
getLRUCache *lru.Cache
postMut = sync.NewMutex()
postLRUCache *lru.Cache
requests chan request
mut = sync.NewRWMutex()
knownRelays = make([]*relay, 0)
permanentRelays = make([]*relay, 0)
evictionTimers = make(map[string]*time.Timer)
)
const (
httpStatusEnhanceYourCalm = 429
)
func main() {
log.SetOutput(os.Stdout)
log.SetFlags(log.Lshortfile)
flag.StringVar(&listen, "listen", listen, "Listen address")
flag.StringVar(&dir, "keys", dir, "Directory where http-cert.pem and http-key.pem is stored for TLS listening")
flag.BoolVar(&debug, "debug", debug, "Enable debug output")
flag.DurationVar(&evictionTime, "eviction", evictionTime, "After how long the relay is evicted")
flag.IntVar(&getLRUSize, "get-limit-cache", getLRUSize, "Get request limiter cache size")
flag.IntVar(&getLimitAvg, "get-limit-avg", getLimitAvg, "Allowed average get request rate, per 10 s")
flag.IntVar(&getLimitBurst, "get-limit-burst", getLimitBurst, "Allowed burst get requests")
flag.IntVar(&postLRUSize, "post-limit-cache", postLRUSize, "Post request limiter cache size")
flag.IntVar(&postLimitAvg, "post-limit-avg", postLimitAvg, "Allowed average post request rate, per minute")
flag.IntVar(&postLimitBurst, "post-limit-burst", postLimitBurst, "Allowed burst post requests")
flag.StringVar(&permRelaysFile, "perm-relays", "", "Path to list of permanent relays")
flag.StringVar(&ipHeader, "ip-header", "", "Name of header which holds clients ip:port. Only meaningful when running behind a reverse proxy.")
flag.StringVar(&geoipPath, "geoip", "GeoLite2-City.mmdb", "Path to GeoLite2-City database")
flag.StringVar(&proto, "protocol", "tcp", "Protocol used for listening. 'tcp' for IPv4 and IPv6, 'tcp4' for IPv4, 'tcp6' for IPv6")
flag.DurationVar(&statsRefresh, "stats-refresh", statsRefresh, "Interval at which to refresh relay stats")
flag.IntVar(&requestQueueLen, "request-queue", requestQueueLen, "Queue length for incoming test requests")
flag.IntVar(&requestProcessors, "request-processors", requestProcessors, "Number of request processor routines")
flag.Parse()
requests = make(chan request, requestQueueLen)
getLimit = 10 * time.Second / time.Duration(getLimitAvg)
postLimit = time.Minute / time.Duration(postLimitAvg)
getLRUCache = lru.New(getLRUSize)
postLRUCache = lru.New(postLRUSize)
var listener net.Listener
var err error
if permRelaysFile != "" {
permanentRelays = loadRelays(permRelaysFile)
}
testCert = createTestCertificate()
for i := 0; i < requestProcessors; i++ {
go requestProcessor()
}
// Load relays from cache in the background.
// Load them in a serial fashion to make sure any genuine requests
// are not dropped.
go func() {
for _, relay := range loadRelays(knownRelaysFile) {
resultChan := make(chan result)
requests <- request{relay, resultChan, nil}
result := <-resultChan
if result.err != nil {
relayTestsTotal.WithLabelValues("failed").Inc()
} else {
relayTestsTotal.WithLabelValues("success").Inc()
}
}
// Run the the stats refresher once the relays are loaded.
statsRefresher(statsRefresh)
}()
if dir != "" {
if debug {
log.Println("Starting TLS listener on", listen)
}
certFile, keyFile := filepath.Join(dir, "http-cert.pem"), filepath.Join(dir, "http-key.pem")
var cert tls.Certificate
cert, err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
log.Fatalln("Failed to load HTTP X509 key pair:", err)
}
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS10, // No SSLv3
ClientAuth: tls.RequestClientCert,
CipherSuites: []uint16{
// No RC4
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
tls.TLS_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA,
tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
},
}
listener, err = tls.Listen(proto, listen, tlsCfg)
} else {
if debug {
log.Println("Starting plain listener on", listen)
}
listener, err = net.Listen(proto, listen)
}
if err != nil {
log.Fatalln("listen:", err)
}
handler := http.NewServeMux()
handler.HandleFunc("/", handleAssets)
handler.HandleFunc("/endpoint", handleRequest)
handler.HandleFunc("/metrics", handleMetrics)
srv := http.Server{
Handler: handler,
ReadTimeout: 10 * time.Second,
}
err = srv.Serve(listener)
if err != nil {
log.Fatalln("serve:", err)
}
}
func handleMetrics(w http.ResponseWriter, r *http.Request) {
timer := prometheus.NewTimer(metricsRequestsSeconds)
// Acquire the mutex just to make sure we're not caught mid-way stats collection
mut.RLock()
promhttp.Handler().ServeHTTP(w, r)
mut.RUnlock()
timer.ObserveDuration()
}
func handleAssets(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "no-cache, must-revalidate")
path := r.URL.Path[1:]
if path == "" {
path = "index.html"
}
as, ok := auto.Assets()[path]
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}
assets.Serve(w, r, as)
}
func handleRequest(w http.ResponseWriter, r *http.Request) {
timer := prometheus.NewTimer(apiRequestsSeconds.WithLabelValues(r.Method))
w = NewLoggingResponseWriter(w)
defer func() {
timer.ObserveDuration()
lw := w.(*loggingResponseWriter)
apiRequestsTotal.WithLabelValues(r.Method, strconv.Itoa(lw.statusCode)).Inc()
}()
if ipHeader != "" {
r.RemoteAddr = r.Header.Get(ipHeader)
}
w.Header().Set("Access-Control-Allow-Origin", "*")
switch r.Method {
case "GET":
if limit(r.RemoteAddr, getLRUCache, getMut, getLimit, getLimitBurst) {
w.WriteHeader(httpStatusEnhanceYourCalm)
return
}
handleGetRequest(w, r)
case "POST":
if limit(r.RemoteAddr, postLRUCache, postMut, postLimit, postLimitBurst) {
w.WriteHeader(httpStatusEnhanceYourCalm)
return
}
handlePostRequest(w, r)
default:
if debug {
log.Println("Unhandled HTTP method", r.Method)
}
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func handleGetRequest(rw http.ResponseWriter, r *http.Request) {
rw.Header().Set("Content-Type", "application/json; charset=utf-8")
mut.RLock()
relays := make([]*relay, len(permanentRelays)+len(knownRelays))
n := copy(relays, permanentRelays)
copy(relays[n:], knownRelays)
mut.RUnlock()
// Shuffle
rand.Shuffle(relays)
w := io.Writer(rw)
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
rw.Header().Set("Content-Encoding", "gzip")
gw := gzip.NewWriter(rw)
defer gw.Close()
w = gw
}
_ = json.NewEncoder(w).Encode(map[string][]*relay{
"relays": relays,
})
}
func handlePostRequest(w http.ResponseWriter, r *http.Request) {
var relayCert *x509.Certificate
if r.TLS != nil && len(r.TLS.PeerCertificates) > 0 {
relayCert = r.TLS.PeerCertificates[0]
log.Printf("Got TLS cert from relay server")
}
var newRelay relay
err := json.NewDecoder(r.Body).Decode(&newRelay)
r.Body.Close()
if err != nil {
if debug {
log.Println("Failed to parse payload")
}
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
uri, err := url.Parse(newRelay.URL)
if err != nil {
if debug {
log.Println("Failed to parse URI", newRelay.URL)
}
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if relayCert != nil {
advertisedId := uri.Query().Get("id")
idFromCert := protocol.NewDeviceID(relayCert.Raw).String()
if advertisedId != idFromCert {
log.Println("Warning: Relay server requested to join with an ID different from the join request, rejecting")
http.Error(w, "mismatched advertised id and join request cert", http.StatusBadRequest)
return
}
}
host, port, err := net.SplitHostPort(uri.Host)
if err != nil {
if debug {
log.Println("Failed to split URI", newRelay.URL)
}
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Get the IP address of the client
rhost := r.RemoteAddr
if host, _, err := net.SplitHostPort(rhost); err == nil {
rhost = host
}
ip := net.ParseIP(host)
// The client did not provide an IP address, use the IP address of the client.
if ip == nil || ip.IsUnspecified() {
uri.Host = net.JoinHostPort(rhost, port)
newRelay.URL = uri.String()
} else if host != rhost && relayCert == nil {
if debug {
log.Println("IP address advertised does not match client IP address", r.RemoteAddr, uri)
}
http.Error(w, fmt.Sprintf("IP advertised %s does not match client IP %s", host, rhost), http.StatusUnauthorized)
return
}
newRelay.uri = uri
for _, current := range permanentRelays {
if current.uri.Host == newRelay.uri.Host {
if debug {
log.Println("Asked to add a relay", newRelay, "which exists in permanent list")
}
http.Error(w, "Invalid request", http.StatusBadRequest)
return
}
}
reschan := make(chan result)
select {
case requests <- request{&newRelay, reschan, prometheus.NewTimer(relayTestActionsSeconds.WithLabelValues("queue"))}:
result := <-reschan
if result.err != nil {
relayTestsTotal.WithLabelValues("failed").Inc()
http.Error(w, result.err.Error(), http.StatusBadRequest)
return
}
relayTestsTotal.WithLabelValues("success").Inc()
w.Header().Set("Content-Type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(map[string]time.Duration{
"evictionIn": result.eviction,
})
default:
relayTestsTotal.WithLabelValues("dropped").Inc()
if debug {
log.Println("Dropping request")
}
w.WriteHeader(httpStatusEnhanceYourCalm)
}
}
func requestProcessor() {
for request := range requests {
if request.queueTimer != nil {
request.queueTimer.ObserveDuration()
}
timer := prometheus.NewTimer(relayTestActionsSeconds.WithLabelValues("test"))
handleRelayTest(request)
timer.ObserveDuration()
}
}
func handleRelayTest(request request) {
if debug {
log.Println("Request for", request.relay)
}
if err := client.TestRelay(context.TODO(), request.relay.uri, []tls.Certificate{testCert}, time.Second, 2*time.Second, 3); err != nil {
if debug {
log.Println("Test for relay", request.relay, "failed:", err)
}
request.result <- result{err, 0}
return
}
stats := fetchStats(request.relay)
location := getLocation(request.relay.uri.Host)
mut.Lock()
if stats != nil {
updateMetrics(request.relay.uri.Host, *stats, location)
}
request.relay.Stats = stats
request.relay.StatsRetrieved = time.Now().Truncate(time.Second)
request.relay.Location = location
timer, ok := evictionTimers[request.relay.uri.Host]
if ok {
if debug {
log.Println("Stopping existing timer for", request.relay)
}
timer.Stop()
}
for i, current := range knownRelays {
if current.uri.Host == request.relay.uri.Host {
if debug {
log.Println("Relay", request.relay, "already exists")
}
// Evict the old entry anyway, as configuration might have changed.
last := len(knownRelays) - 1
knownRelays[i] = knownRelays[last]
knownRelays = knownRelays[:last]
goto found
}
}
if debug {
log.Println("Adding new relay", request.relay)
}
found:
knownRelays = append(knownRelays, request.relay)
evictionTimers[request.relay.uri.Host] = time.AfterFunc(evictionTime, evict(request.relay))
mut.Unlock()
if err := saveRelays(knownRelaysFile, knownRelays); err != nil {
log.Println("Failed to write known relays: " + err.Error())
}
request.result <- result{nil, evictionTime}
}
func evict(relay *relay) func() {
return func() {
mut.Lock()
defer mut.Unlock()
if debug {
log.Println("Evicting", relay)
}
for i, current := range knownRelays {
if current.uri.Host == relay.uri.Host {
if debug {
log.Println("Evicted", relay)
}
last := len(knownRelays) - 1
knownRelays[i] = knownRelays[last]
knownRelays = knownRelays[:last]
deleteMetrics(current.uri.Host)
}
}
delete(evictionTimers, relay.uri.Host)
}
}
func limit(addr string, cache *lru.Cache, lock sync.Mutex, intv time.Duration, burst int) bool {
if host, _, err := net.SplitHostPort(addr); err == nil {
addr = host
}
lock.Lock()
v, _ := cache.Get(addr)
bkt, ok := v.(*rate.Limiter)
if !ok {
bkt = rate.NewLimiter(rate.Every(intv), burst)
cache.Add(addr, bkt)
}
lock.Unlock()
return !bkt.Allow()
}
func loadRelays(file string) []*relay {
content, err := ioutil.ReadFile(file)
if err != nil {
log.Println("Failed to load relays: " + err.Error())
return nil
}
var relays []*relay
for _, line := range strings.Split(string(content), "\n") {
if len(line) == 0 {
continue
}
uri, err := url.Parse(line)
if err != nil {
if debug {
log.Println("Skipping relay", line, "due to parse error", err)
}
continue
}
relays = append(relays, &relay{
URL: line,
Location: getLocation(uri.Host),
uri: uri,
})
if debug {
log.Println("Adding relay", line)
}
}
return relays
}
func saveRelays(file string, relays []*relay) error {
var content string
for _, relay := range relays {
content += relay.uri.String() + "\n"
}
return ioutil.WriteFile(file, []byte(content), 0777)
}
func createTestCertificate() tls.Certificate {
tmpDir, err := ioutil.TempDir("", "relaypoolsrv")
if err != nil {
log.Fatal(err)
}
certFile, keyFile := filepath.Join(tmpDir, "cert.pem"), filepath.Join(tmpDir, "key.pem")
cert, err := tlsutil.NewCertificate(certFile, keyFile, "relaypoolsrv", 20*365)
if err != nil {
log.Fatalln("Failed to create test X509 key pair:", err)
}
return cert
}
func getLocation(host string) location {
timer := prometheus.NewTimer(locationLookupSeconds)
defer timer.ObserveDuration()
db, err := geoip2.Open(geoipPath)
if err != nil {
return location{}
}
defer db.Close()
addr, err := net.ResolveTCPAddr("tcp", host)
if err != nil {
return location{}
}
city, err := db.City(addr.IP)
if err != nil {
return location{}
}
return location{
Longitude: city.Location.Longitude,
Latitude: city.Location.Latitude,
City: city.City.Names["en"],
Country: city.Country.IsoCode,
Continent: city.Continent.Code,
}
}
type loggingResponseWriter struct {
http.ResponseWriter
statusCode int
}
func NewLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
return &loggingResponseWriter{w, http.StatusOK}
}
func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}