Merge branch 'v0.12'

* v0.12:
  Add relay support, add ql support
  Stats files
  Rewrite for a PostgreSQL backend
This commit is contained in:
Jakob Borg 2015-08-20 12:17:54 +02:00
commit e611828249
10 changed files with 866 additions and 515 deletions

View File

@ -5,7 +5,36 @@ discosrv
This is the global discovery server for the `syncthing` project.
`go get github.com/syncthing/discosrv`
To get it, run `go get github.com/syncthing/discosrv` or download the
[latest build](http://build.syncthing.net/job/discosrv/lastSuccessfulBuild/artifact/)
from the build server.
Or download the latest [Linux build](http://build.syncthing.net/job/discosrv/lastSuccessfulBuild/artifact/).
Usage
-----
The discovery server supports `ql` and `postgres` backends.
Specify the backend via `-db-backend` and the database DSN via `-db-dsn`.
By default it will use in-memory `ql` backend. If you wish to persist the
information on disk between restarts in `ql`, specify a file DSN:
```bash
$ discosrv -db-dsn="file://var/run/discosrv.db"
```
For `postgres`, you will need to create a database and a user with permissions
to create tables in it, then start the discosrv as follows:
```bash
$ export DISCOSRV_DB_DSN="postgres://user:password@localhost/databasename"
$ discosrv -db-backend="postgres"
```
You can pass the DSN as command line option, but the value what you pass in will
be visible in most process managers, potentially exposing the database password
to other users.
In all cases, the appropriate tables and indexes will be created at first
startup. If it doesn't exit with an error, you're fine.
See `discosrv -help` for other options.

87
cmd/discosrv/clean.go Normal file
View File

@ -0,0 +1,87 @@
// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file).
package main
import (
"database/sql"
"log"
"time"
)
type cleansrv struct {
intv time.Duration
db *sql.DB
prep map[string]*sql.Stmt
}
func (s *cleansrv) Serve() {
for {
time.Sleep(next(s.intv))
err := s.cleanOldEntries()
if err != nil {
log.Println("Clean:", err)
}
}
}
func (s *cleansrv) Stop() {
panic("stop unimplemented")
}
func (s *cleansrv) cleanOldEntries() (err error) {
var tx *sql.Tx
tx, err = s.db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
} else {
tx.Rollback()
}
}()
res, err := tx.Stmt(s.prep["cleanAddress"]).Exec()
if err != nil {
return err
}
if rows, _ := res.RowsAffected(); rows > 0 {
log.Printf("Clean: %d old addresses", rows)
}
res, err = tx.Stmt(s.prep["cleanRelay"]).Exec()
if err != nil {
return err
}
if rows, _ := res.RowsAffected(); rows > 0 {
log.Printf("Clean: %d old relays", rows)
}
res, err = tx.Stmt(s.prep["cleanDevice"]).Exec()
if err != nil {
return err
}
if rows, _ := res.RowsAffected(); rows > 0 {
log.Printf("Clean: %d old devices", rows)
}
var devs, addrs, relays int
row := tx.Stmt(s.prep["countDevice"]).QueryRow()
if err = row.Scan(&devs); err != nil {
return err
}
row = tx.Stmt(s.prep["countAddress"]).QueryRow()
if err = row.Scan(&addrs); err != nil {
return err
}
row = tx.Stmt(s.prep["countRelay"]).QueryRow()
if err = row.Scan(&relays); err != nil {
return err
}
log.Printf("Database: %d devices, %d addresses, %d relays", devs, addrs, relays)
return nil
}

32
cmd/discosrv/db.go Normal file
View File

@ -0,0 +1,32 @@
// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file).
package main
import (
"database/sql"
"fmt"
)
type setupFunc func(db *sql.DB) error
type compileFunc func(db *sql.DB) (map[string]*sql.Stmt, error)
var (
setupFuncs = make(map[string]setupFunc)
compileFuncs = make(map[string]compileFunc)
)
func register(name string, setup setupFunc, compile compileFunc) {
setupFuncs[name] = setup
compileFuncs[name] = compile
}
func setup(backend string, db *sql.DB) (map[string]*sql.Stmt, error) {
setup, ok := setupFuncs[backend]
if !ok {
return nil, fmt.Errorf("Unsupported backend")
}
if err := setup(db); err != nil {
return nil, err
}
return compileFuncs[backend](db)
}

View File

@ -1,392 +1,93 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file).
package main
import (
"bytes"
"encoding/binary"
"database/sql"
"flag"
"fmt"
"io"
"log"
"net"
"os"
"path/filepath"
"sync"
"time"
"github.com/golang/groupcache/lru"
"github.com/juju/ratelimit"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/discover"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/thejerf/suture"
)
const cacheLimitSeconds = 3600
var (
lock sync.Mutex
queries = 0
announces = 0
answered = 0
limited = 0
unknowns = 0
debug = false
lruSize = 1024
limitAvg = 1
limitBurst = 10
limiter *lru.Cache
lruSize = 10240
limitAvg = 5
limitBurst = 20
globalStats stats
statsFile string
backend = "ql"
dsn = getEnvDefault("DISCOSRV_DB_DSN", "memory://discosrv")
)
func main() {
var listen string
var timestamp bool
var statsIntv int
var statsFile string
var unknownFile string
var dbDir string
const (
cleanIntv = 1 * time.Hour
statsIntv = 5 * time.Minute
)
flag.StringVar(&listen, "listen", ":22026", "Listen address")
flag.BoolVar(&debug, "debug", false, "Enable debug output")
flag.BoolVar(&timestamp, "timestamp", true, "Timestamp the log output")
flag.IntVar(&statsIntv, "stats-intv", 0, "Statistics output interval (s)")
flag.StringVar(&statsFile, "stats-file", "/var/discosrv/stats", "Statistics file name")
flag.StringVar(&unknownFile, "unknown-file", "", "Unknown packet log file name")
var listen string
log.SetOutput(os.Stdout)
log.SetFlags(0)
flag.StringVar(&listen, "listen", ":22027", "Listen address")
flag.IntVar(&lruSize, "limit-cache", lruSize, "Limiter cache entries")
flag.IntVar(&limitAvg, "limit-avg", limitAvg, "Allowed average package rate, per 10 s")
flag.IntVar(&limitBurst, "limit-burst", limitBurst, "Allowed burst size, packets")
flag.StringVar(&dbDir, "db-dir", "/var/discosrv/db", "Database directory")
flag.StringVar(&statsFile, "stats-file", statsFile, "File to write periodic operation stats to")
flag.StringVar(&backend, "db-backend", backend, "Database backend to use")
flag.StringVar(&dsn, "db-dsn", dsn, "Database DSN")
flag.Parse()
limiter = lru.New(lruSize)
log.SetOutput(os.Stdout)
if !timestamp {
log.SetFlags(0)
}
addr, _ := net.ResolveUDPAddr("udp", listen)
conn, err := net.ListenUDP("udp", addr)
var err error
db, err := sql.Open(backend, dsn)
if err != nil {
log.Fatal(err)
log.Fatalln("sql.Open:", err)
}
parentDir := filepath.Dir(dbDir)
if _, err := os.Stat(parentDir); err != nil && os.IsNotExist(err) {
err = os.MkdirAll(parentDir, 0755)
if err != nil {
log.Fatal(err)
}
}
db, err := leveldb.OpenFile(dbDir, &opt.Options{OpenFilesCacheCapacity: 32})
prep, err := setup(backend, db)
if err != nil {
log.Fatal(err)
log.Fatalln("Setup:", err)
}
statsLog, err := os.OpenFile(statsFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
log.Fatal(err)
}
main := suture.NewSimple("main")
var unknownLog io.Writer
if unknownFile != "" {
unknownLog, err = os.OpenFile(unknownFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
log.Fatal(err)
}
}
main.Add(&querysrv{
addr: addr,
db: db,
prep: prep,
})
if statsIntv > 0 {
go logStats(statsLog, statsIntv)
}
main.Add(&cleansrv{
intv: cleanIntv,
db: db,
prep: prep,
})
go clean(statsLog, db)
main.Add(&statssrv{
intv: statsIntv,
file: statsFile,
db: db,
})
var buf = make([]byte, 1024)
for {
buf = buf[:cap(buf)]
n, addr, err := conn.ReadFromUDP(buf)
if limit(addr) {
// Rate limit in effect for source
continue
}
if err != nil {
log.Fatal(err)
}
if n < 4 {
log.Printf("Received short packet (%d bytes)", n)
continue
}
buf = buf[:n]
magic := binary.BigEndian.Uint32(buf)
switch magic {
case discover.AnnouncementMagic:
err := handleAnnounceV2(db, addr, buf)
if err != nil && unknownLog != nil {
fmt.Fprintf(unknownLog, "AE %d %v %x\n", time.Now().Unix(), addr, buf)
}
case discover.QueryMagic:
err := handleQueryV2(db, conn, addr, buf)
if err != nil && unknownLog != nil {
fmt.Fprintf(unknownLog, "QE %d %v %x\n", time.Now().Unix(), addr, buf)
}
default:
lock.Lock()
unknowns++
lock.Unlock()
if unknownLog != nil {
fmt.Fprintf(unknownLog, "UN %d %v %x\n", time.Now().Unix(), addr, buf)
}
}
}
globalStats.Reset()
main.Serve()
}
func limit(addr *net.UDPAddr) bool {
key := addr.IP.String()
lock.Lock()
defer lock.Unlock()
bkt, ok := limiter.Get(key)
if ok {
bkt := bkt.(*ratelimit.Bucket)
if bkt.TakeAvailable(1) != 1 {
// Rate limit exceeded; ignore packet
if debug {
log.Println("Rate limit exceeded for", key)
}
limited++
return true
}
} else {
if debug {
log.Println("New limiter for", key)
}
// One packet per ten seconds average rate, burst ten packets
limiter.Add(key, ratelimit.NewBucket(10*time.Second/time.Duration(limitAvg), int64(limitBurst)))
func getEnvDefault(key, def string) string {
if val := os.Getenv(key); val != "" {
return val
}
return false
return def
}
func handleAnnounceV2(db *leveldb.DB, addr *net.UDPAddr, buf []byte) error {
var pkt discover.Announce
err := pkt.UnmarshalXDR(buf)
if err != nil && err != io.EOF {
return err
}
if debug {
log.Printf("<- %v %#v", addr, pkt)
}
lock.Lock()
announces++
lock.Unlock()
ip := addr.IP.To4()
if ip == nil {
ip = addr.IP.To16()
}
var addrs []address
now := time.Now().Unix()
for _, addr := range pkt.This.Addresses {
tip := addr.IP
if len(tip) == 0 {
tip = ip
}
addrs = append(addrs, address{
ip: tip,
port: addr.Port,
seen: now,
})
}
var id protocol.DeviceID
if len(pkt.This.ID) == 32 {
// Raw node ID
copy(id[:], pkt.This.ID)
} else {
err = id.UnmarshalText(pkt.This.ID)
if err != nil {
return err
}
}
update(db, id, addrs)
return nil
}
func handleQueryV2(db *leveldb.DB, conn *net.UDPConn, addr *net.UDPAddr, buf []byte) error {
var pkt discover.Query
err := pkt.UnmarshalXDR(buf)
if err != nil {
return err
}
if debug {
log.Printf("<- %v %#v", addr, pkt)
}
var id protocol.DeviceID
if len(pkt.DeviceID) == 32 {
// Raw node ID
copy(id[:], pkt.DeviceID)
} else {
err = id.UnmarshalText(pkt.DeviceID)
if err != nil {
return err
}
}
lock.Lock()
queries++
lock.Unlock()
addrs := get(db, id)
now := time.Now().Unix()
if len(addrs) > 0 {
ann := discover.Announce{
Magic: discover.AnnouncementMagic,
This: discover.Device{
ID: pkt.DeviceID,
},
}
for _, addr := range addrs {
if now-addr.seen > cacheLimitSeconds {
continue
}
ann.This.Addresses = append(ann.This.Addresses, discover.Address{IP: addr.ip, Port: addr.port})
}
if debug {
log.Printf("-> %v %#v", addr, pkt)
}
if len(ann.This.Addresses) == 0 {
return nil
}
tb, err := ann.MarshalXDR()
if err != nil {
log.Println("QueryV2 response marshal:", err)
return nil
}
_, err = conn.WriteToUDP(tb, addr)
if err != nil {
log.Println("QueryV2 response write:", err)
return nil
}
lock.Lock()
answered++
lock.Unlock()
}
return nil
}
func next(intv int) time.Time {
d := time.Duration(intv) * time.Second
func next(intv time.Duration) time.Duration {
t0 := time.Now()
t1 := t0.Add(d).Truncate(d)
time.Sleep(t1.Sub(t0))
return t1
}
func logStats(statsLog io.Writer, intv int) {
for {
t := next(intv)
lock.Lock()
fmt.Fprintf(statsLog, "%d Queries:%d Answered:%d Announces:%d Unknown:%d Limited:%d\n",
t.Unix(), queries, answered, announces, unknowns, limited)
queries = 0
announces = 0
answered = 0
limited = 0
unknowns = 0
lock.Unlock()
}
}
func get(db *leveldb.DB, id protocol.DeviceID) []address {
var addrs addressList
val, err := db.Get(id[:], nil)
if err == nil {
addrs.UnmarshalXDR(val)
}
return addrs.addresses
}
func update(db *leveldb.DB, id protocol.DeviceID, addrs []address) {
var newAddrs addressList
val, err := db.Get(id[:], nil)
if err == nil {
newAddrs.UnmarshalXDR(val)
}
nextAddr:
for _, newAddr := range addrs {
for i, exAddr := range newAddrs.addresses {
if bytes.Compare(newAddr.ip, exAddr.ip) == 0 {
newAddrs.addresses[i] = newAddr
continue nextAddr
}
}
newAddrs.addresses = append(newAddrs.addresses, newAddr)
}
db.Put(id[:], newAddrs.MarshalXDR(), nil)
}
func clean(statsLog io.Writer, db *leveldb.DB) {
for {
now := next(cacheLimitSeconds)
nowSecs := now.Unix()
var kept, deleted int64
iter := db.NewIterator(nil, nil)
for iter.Next() {
var addrs addressList
addrs.UnmarshalXDR(iter.Value())
// Remove expired addresses
newAddrs := addrs.addresses
for i := 0; i < len(newAddrs); i++ {
if nowSecs-newAddrs[i].seen > cacheLimitSeconds {
newAddrs[i] = newAddrs[len(newAddrs)-1]
newAddrs = newAddrs[:len(newAddrs)-1]
}
}
// Delete empty records
if len(newAddrs) == 0 {
db.Delete(iter.Key(), nil)
deleted++
continue
}
// Update changed records
if len(newAddrs) != len(addrs.addresses) {
addrs.addresses = newAddrs
db.Put(iter.Key(), addrs.MarshalXDR(), nil)
}
kept++
}
iter.Release()
fmt.Fprintf(statsLog, "%d Kept:%d Deleted:%d Took:%0.04fs\n", nowSecs, kept, deleted, time.Since(now).Seconds())
}
t1 := t0.Add(intv).Truncate(intv)
return t1.Sub(t0)
}

123
cmd/discosrv/psql.go Normal file
View File

@ -0,0 +1,123 @@
// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file).
package main
import (
"database/sql"
_ "github.com/lib/pq"
)
func init() {
register("postgres", postgresSetup, postgresCompile)
}
func postgresSetup(db *sql.DB) error {
var err error
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS Devices (
DeviceID CHAR(63) NOT NULL PRIMARY KEY,
Seen TIMESTAMP NOT NULL
)`)
if err != nil {
return err
}
row := db.QueryRow(`SELECT 'DevicesDeviceIDIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX DevicesDeviceIDIndex ON Devices (DeviceID)`)
}
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'DevicesSeenIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX DevicesSeenIndex ON Devices (Seen)`)
}
if err != nil {
return err
}
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS Addresses (
DeviceID CHAR(63) NOT NULL,
Seen TIMESTAMP NOT NULL,
Address VARCHAR(256) NOT NULL
)`)
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'AddressesDeviceIDSeenIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX AddressesDeviceIDSeenIndex ON Addresses (DeviceID, Seen)`)
}
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'AddressesDeviceIDAddressIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX AddressesDeviceIDAddressIndex ON Addresses (DeviceID, Address)`)
}
if err != nil {
return err
}
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS Relays (
DeviceID CHAR(63) NOT NULL,
Seen TIMESTAMP NOT NULL,
Address VARCHAR(256) NOT NULL,
Latency INTEGER NOT NULL
)`)
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'RelaysDeviceIDSeenIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX RelaysDeviceIDSeenIndex ON Relays (DeviceID, Seen)`)
}
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'RelaysDeviceIDAddressIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX RelaysDeviceIDAddressIndex ON Relays (DeviceID, Address)`)
}
if err != nil {
return err
}
return nil
}
func postgresCompile(db *sql.DB) (map[string]*sql.Stmt, error) {
stmts := map[string]string{
"cleanAddress": "DELETE FROM Addresses WHERE Seen < now() - '2 hour'::INTERVAL",
"cleanRelay": "DELETE FROM Relays WHERE Seen < now() - '2 hour'::INTERVAL",
"cleanDevice": "DELETE FROM Devices WHERE Seen < now() - '24 hour'::INTERVAL",
"countAddress": "SELECT count(*) FROM Addresses",
"countDevice": "SELECT count(*) FROM Devices",
"countRelay": "SELECT count(*) FROM Relays",
"insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)",
"insertRelay": "INSERT INTO Relays (DeviceID, Seen, Address, Latency) VALUES ($1, now(), $2, $3)",
"insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())",
"selectAddress": "SELECT Address from Addresses WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16",
"selectRelay": "SELECT Address, Latency from Relays WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16",
"updateRelay": "UPDATE Relays SET Seen=now(), Latency=$3 WHERE DeviceID=$1 AND Address=$2",
"updateDevice": "UPDATE Devices SET Seen=now() WHERE DeviceID=$1",
"deleteRelay": "DELETE FROM Relays WHERE DeviceID=$1",
}
res := make(map[string]*sql.Stmt, len(stmts))
for key, stmt := range stmts {
prep, err := db.Prepare(stmt)
if err != nil {
return nil, err
}
res[key] = prep
}
return res, nil
}

98
cmd/discosrv/ql.go Normal file
View File

@ -0,0 +1,98 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package main
import (
"database/sql"
"log"
"github.com/cznic/ql"
)
func init() {
ql.RegisterDriver()
register("ql", qlSetup, qlCompile)
}
func qlSetup(db *sql.DB) (err error) {
tx, err := db.Begin()
if err != nil {
return
}
defer func() {
if err == nil {
err = tx.Commit()
} else {
tx.Rollback()
}
}()
_, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Devices (
DeviceID STRING NOT NULL,
Seen TIME NOT NULL
)`)
if err != nil {
return
}
if _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS DevicesDeviceIDIndex ON Devices (DeviceID)`); err != nil {
return
}
_, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Addresses (
DeviceID STRING NOT NULL,
Seen TIME NOT NULL,
Address STRING NOT NULL,
)`)
if err != nil {
return
}
if _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS AddressesDeviceIDAddressIndex ON Addresses (DeviceID, Address)`); err != nil {
return
}
_, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Relays (
DeviceID STRING NOT NULL,
Seen TIME NOT NULL,
Address STRING NOT NULL,
Latency INT NOT NULL,
)`)
if err != nil {
return
}
_, err = tx.Exec(`CREATE INDEX IF NOT EXISTS RelaysDeviceIDAddressIndex ON Relays (DeviceID, Address)`)
return
}
func qlCompile(db *sql.DB) (map[string]*sql.Stmt, error) {
stmts := map[string]string{
"cleanAddress": `DELETE FROM Addresses WHERE Seen < now() - duration("2h")`,
"cleanRelay": `DELETE FROM Relays WHERE Seen < now() - duration("2h")`,
"cleanDevice": `DELETE FROM Devices WHERE Seen < now() - duration("24h")`,
"countAddress": "SELECT count(*) FROM Addresses",
"countDevice": "SELECT count(*) FROM Devices",
"countRelay": "SELECT count(*) FROM Relays",
"insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)",
"insertRelay": "INSERT INTO Relays (DeviceID, Seen, Address, Latency) VALUES ($1, now(), $2, $3)",
"insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())",
"selectAddress": `SELECT Address from Addresses WHERE DeviceID==$1 AND Seen > now() - duration("1h") LIMIT 16`,
"selectRelay": `SELECT Address, Latency from Relays WHERE DeviceID==$1 AND Seen > now() - duration("1h") LIMIT 16`,
"updateAddress": "UPDATE Addresses Seen=now()WHERE DeviceID==$1 AND Address==$2",
"updateDevice": "UPDATE Devices Seen=now() WHERE DeviceID==$1",
"deleteRelay": "DELETE FROM Relays WHERE DeviceID==$1",
}
res := make(map[string]*sql.Stmt, len(stmts))
for key, stmt := range stmts {
prep, err := db.Prepare(stmt)
if err != nil {
log.Println("Failed to compile", stmt)
return nil, err
}
res[key] = prep
}
return res, nil
}

305
cmd/discosrv/querysrv.go Normal file
View File

@ -0,0 +1,305 @@
// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file).
package main
import (
"database/sql"
"encoding/binary"
"fmt"
"io"
"log"
"net"
"net/url"
"time"
"github.com/golang/groupcache/lru"
"github.com/juju/ratelimit"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/discover"
)
type querysrv struct {
addr *net.UDPAddr
db *sql.DB
prep map[string]*sql.Stmt
limiter *lru.Cache
}
func (s *querysrv) Serve() {
s.limiter = lru.New(lruSize)
conn, err := net.ListenUDP("udp", s.addr)
if err != nil {
log.Println("Listen:", err)
return
}
// Attempt to set the read and write buffers to 2^24 bytes (16 MB) or as high as
// possible.
for i := 24; i >= 16; i-- {
if conn.SetReadBuffer(1<<uint(i)) == nil {
break
}
}
for i := 24; i >= 16; i-- {
if conn.SetWriteBuffer(1<<uint(i)) == nil {
break
}
}
var buf = make([]byte, 1024)
for {
buf = buf[:cap(buf)]
n, addr, err := conn.ReadFromUDP(buf)
if err != nil {
log.Println("Read:", err)
return
}
if s.limit(addr) {
// Rate limit in effect for source
continue
}
if n < 4 {
log.Printf("Received short packet (%d bytes)", n)
continue
}
buf = buf[:n]
magic := binary.BigEndian.Uint32(buf)
switch magic {
case discover.AnnouncementMagic:
err := s.handleAnnounce(addr, buf)
globalStats.Announce()
if err != nil {
log.Println("Announce:", err)
globalStats.Error()
}
case discover.QueryMagic:
err := s.handleQuery(conn, addr, buf)
globalStats.Query()
if err != nil {
log.Println("Query:", err)
globalStats.Error()
}
default:
globalStats.Error()
}
}
}
func (s *querysrv) Stop() {
panic("stop unimplemented")
}
func (s *querysrv) handleAnnounce(addr *net.UDPAddr, buf []byte) error {
var pkt discover.Announce
err := pkt.UnmarshalXDR(buf)
if err != nil && err != io.EOF {
return err
}
var id protocol.DeviceID
copy(id[:], pkt.This.ID)
if id == protocol.LocalDeviceID {
return fmt.Errorf("Rejecting announce for local device ID from %v", addr)
}
tx, err := s.db.Begin()
if err != nil {
return err
}
for _, annAddr := range pkt.This.Addresses {
uri, err := url.Parse(annAddr)
if err != nil {
continue
}
host, port, err := net.SplitHostPort(uri.Host)
if err != nil {
continue
}
if len(host) == 0 {
uri.Host = net.JoinHostPort(addr.IP.String(), port)
}
if err := s.updateAddress(tx, id, uri.String()); err != nil {
tx.Rollback()
return err
}
}
_, err = tx.Stmt(s.prep["deleteRelay"]).Exec(id.String())
if err != nil {
tx.Rollback()
return err
}
for _, relay := range pkt.This.Relays {
uri, err := url.Parse(relay.Address)
if err != nil {
continue
}
_, err = tx.Stmt(s.prep["insertRelay"]).Exec(id.String(), uri, relay.Latency)
if err != nil {
tx.Rollback()
return err
}
}
if err := s.updateDevice(tx, id); err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
func (s *querysrv) handleQuery(conn *net.UDPConn, addr *net.UDPAddr, buf []byte) error {
var pkt discover.Query
err := pkt.UnmarshalXDR(buf)
if err != nil {
return err
}
var id protocol.DeviceID
copy(id[:], pkt.DeviceID)
addrs, err := s.getAddresses(id)
if err != nil {
return err
}
relays, err := s.getRelays(id)
if err != nil {
return err
}
if len(addrs) > 0 {
ann := discover.Announce{
Magic: discover.AnnouncementMagic,
This: discover.Device{
ID: pkt.DeviceID,
Addresses: addrs,
Relays: relays,
},
}
tb, err := ann.MarshalXDR()
if err != nil {
return fmt.Errorf("QueryV2 response marshal: %v", err)
}
_, err = conn.WriteToUDP(tb, addr)
if err != nil {
return fmt.Errorf("QueryV2 response write: %v", err)
}
globalStats.Answer()
}
return nil
}
func (s *querysrv) limit(addr *net.UDPAddr) bool {
key := addr.IP.String()
bkt, ok := s.limiter.Get(key)
if ok {
bkt := bkt.(*ratelimit.Bucket)
if bkt.TakeAvailable(1) != 1 {
// Rate limit exceeded; ignore packet
return true
}
} else {
// One packet per ten seconds average rate, burst ten packets
s.limiter.Add(key, ratelimit.NewBucket(10*time.Second/time.Duration(limitAvg), int64(limitBurst)))
}
return false
}
func (s *querysrv) updateDevice(tx *sql.Tx, device protocol.DeviceID) error {
res, err := tx.Stmt(s.prep["updateDevice"]).Exec(device.String())
if err != nil {
return err
}
if rows, _ := res.RowsAffected(); rows == 0 {
_, err := tx.Stmt(s.prep["insertDevice"]).Exec(device.String())
if err != nil {
return err
}
}
return nil
}
func (s *querysrv) updateAddress(tx *sql.Tx, device protocol.DeviceID, uri string) error {
res, err := tx.Stmt(s.prep["updateAddress"]).Exec(device.String(), uri)
if err != nil {
return err
}
if rows, _ := res.RowsAffected(); rows == 0 {
_, err := tx.Stmt(s.prep["insertAddress"]).Exec(device.String(), uri)
if err != nil {
return err
}
}
return nil
}
func (s *querysrv) getAddresses(device protocol.DeviceID) ([]string, error) {
rows, err := s.prep["selectAddress"].Query(device.String())
if err != nil {
return nil, err
}
var res []string
for rows.Next() {
var addr string
err := rows.Scan(&addr)
if err != nil {
log.Println("Scan:", err)
continue
}
res = append(res, addr)
}
return res, nil
}
func (s *querysrv) getRelays(device protocol.DeviceID) ([]discover.Relay, error) {
rows, err := s.prep["selectRelay"].Query(device.String())
if err != nil {
return nil, err
}
var res []discover.Relay
for rows.Next() {
var addr string
var latency int32
err := rows.Scan(&addr, &latency)
if err != nil {
log.Println("Scan:", err)
continue
}
res = append(res, discover.Relay{
Address: addr,
Latency: latency,
})
}
return res, nil
}

136
cmd/discosrv/stats.go Normal file
View File

@ -0,0 +1,136 @@
// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file).
package main
import (
"bytes"
"database/sql"
"fmt"
"io/ioutil"
"log"
"os"
"sync"
"time"
)
type stats struct {
mut sync.Mutex
reset time.Time
announces int64
queries int64
answers int64
errors int64
}
func (s *stats) Announce() {
s.mut.Lock()
s.announces++
s.mut.Unlock()
}
func (s *stats) Query() {
s.mut.Lock()
s.queries++
s.mut.Unlock()
}
func (s *stats) Answer() {
s.mut.Lock()
s.answers++
s.mut.Unlock()
}
func (s *stats) Error() {
s.mut.Lock()
s.errors++
s.mut.Unlock()
}
func (s *stats) Reset() stats {
s.mut.Lock()
ns := *s
s.announces, s.queries, s.answers = 0, 0, 0
s.reset = time.Now()
s.mut.Unlock()
return ns
}
type statssrv struct {
intv time.Duration
file string
db *sql.DB
}
func (s *statssrv) Serve() {
for {
time.Sleep(next(s.intv))
stats := globalStats.Reset()
d := time.Since(stats.reset).Seconds()
log.Printf("Stats: %.02f announces/s, %.02f queries/s, %.02f answers/s, %.02f errors/s",
float64(stats.announces)/d, float64(stats.queries)/d, float64(stats.answers)/d, float64(stats.errors)/d)
if s.file != "" {
s.writeToFile(stats, d)
}
}
}
func (s *statssrv) Stop() {
panic("stop unimplemented")
}
func (s *statssrv) writeToFile(stats stats, secs float64) {
newLine := []byte("\n")
var addrs int
row := s.db.QueryRow("SELECT COUNT(*) FROM Addresses")
if err := row.Scan(&addrs); err != nil {
log.Println("stats query:", err)
return
}
fd, err := os.OpenFile(s.file, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
log.Println("stats file:", err)
return
}
bs, err := ioutil.ReadAll(fd)
if err != nil {
log.Println("stats file:", err)
return
}
lines := bytes.Split(bytes.TrimSpace(bs), newLine)
if len(lines) > 12 {
lines = lines[len(lines)-12:]
}
latest := fmt.Sprintf("%v: %6d addresses, %8.02f announces/s, %8.02f queries/s, %8.02f answers/s, %8.02f errors/s\n",
time.Now().UTC().Format(time.RFC3339), addrs,
float64(stats.announces)/secs, float64(stats.queries)/secs, float64(stats.answers)/secs, float64(stats.errors)/secs)
lines = append(lines, []byte(latest))
_, err = fd.Seek(0, 0)
if err != nil {
log.Println("stats file:", err)
return
}
err = fd.Truncate(0)
if err != nil {
log.Println("stats file:", err)
return
}
_, err = fd.Write(bytes.Join(lines, newLine))
if err != nil {
log.Println("stats file:", err)
return
}
err = fd.Close()
if err != nil {
log.Println("stats file:", err)
return
}
}

View File

@ -1,13 +0,0 @@
// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
package main
type address struct {
ip []byte
port uint16
seen int64 // epoch seconds
}
type addressList struct {
addresses []address
}

View File

@ -1,147 +0,0 @@
// ************************************************************
// This file is automatically generated by genxdr. Do not edit.
// ************************************************************
package main
import (
"bytes"
"io"
"github.com/calmh/xdr"
)
/*
address Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of ip |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ ip (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 0x0000 | port |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+ seen (64 bits) +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct address {
opaque ip<>;
unsigned int port;
hyper seen;
}
*/
func (o address) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.encodeXDR(xw)
}
func (o address) MarshalXDR() []byte {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o address) AppendXDR(bs []byte) []byte {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
o.encodeXDR(xw)
return []byte(aw)
}
func (o address) encodeXDR(xw *xdr.Writer) (int, error) {
xw.WriteBytes(o.ip)
xw.WriteUint16(o.port)
xw.WriteUint64(uint64(o.seen))
return xw.Tot(), xw.Error()
}
func (o *address) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.decodeXDR(xr)
}
func (o *address) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.decodeXDR(xr)
}
func (o *address) decodeXDR(xr *xdr.Reader) error {
o.ip = xr.ReadBytes()
o.port = xr.ReadUint16()
o.seen = int64(xr.ReadUint64())
return xr.Error()
}
/*
addressList Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Number of addresses |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Zero or more address Structures \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct addressList {
address addresses<>;
}
*/
func (o addressList) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.encodeXDR(xw)
}
func (o addressList) MarshalXDR() []byte {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o addressList) AppendXDR(bs []byte) []byte {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
o.encodeXDR(xw)
return []byte(aw)
}
func (o addressList) encodeXDR(xw *xdr.Writer) (int, error) {
xw.WriteUint32(uint32(len(o.addresses)))
for i := range o.addresses {
o.addresses[i].encodeXDR(xw)
}
return xw.Tot(), xw.Error()
}
func (o *addressList) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.decodeXDR(xr)
}
func (o *addressList) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.decodeXDR(xr)
}
func (o *addressList) decodeXDR(xr *xdr.Reader) error {
_addressesSize := int(xr.ReadUint32())
o.addresses = make([]address, _addressesSize)
for i := range o.addresses {
(&o.addresses[i]).decodeXDR(xr)
}
return xr.Error()
}