Add relay support, add ql support

This commit is contained in:
Audrius Butkevicius 2015-07-21 23:56:27 +01:00
parent 860fbe48dd
commit 4d9ca822a7
7 changed files with 366 additions and 139 deletions

View File

@ -12,17 +12,29 @@ from the build server.
Usage
-----
The discovery server requires a postgresql backend server. You will need
to create a database and a user with permissions to create tables in it.
Set the database URL in the environment variable `DISCOSRV_DB` before
starting discosrv.
The discovery server supports `ql` and `postgres` backends.
Specify the backend via `-db-backend` and the database DSN via `-db-dsn`.
By default it will use in-memory `ql` backend. If you wish to persist the
information on disk between restarts in `ql`, specify a file DSN:
```bash
$ export DISCOSRV_DB="postgres://user:password@localhost/databasename"
$ discosrv
$ discosrv -db-dsn="file://var/run/discosrv.db"
```
The appropriate tables and indexes will be created at first startup. If
it doesn't exit with an error, you're fine.
For `postgres`, you will need to create a database and a user with permissions
to create tables in it, then start the discosrv as follows:
```bash
$ export DISCOSRV_DB_DSN="postgres://user:password@localhost/databasename"
$ discosrv -db-backend="postgres"
```
You can pass the DSN as command line option, but the value what you pass in will
be visible in most process managers, potentially exposing the database password
to other users.
In all cases, the appropriate tables and indexes will be created at first
startup. If it doesn't exit with an error, you're fine.
See `discosrv -help` for other options.

View File

@ -18,7 +18,7 @@ func (s *cleansrv) Serve() {
for {
time.Sleep(next(s.intv))
err := s.cleanOldAddresses()
err := s.cleanOldEntries()
if err != nil {
log.Println("Clean:", err)
}
@ -29,7 +29,7 @@ func (s *cleansrv) Stop() {
panic("stop unimplemented")
}
func (s *cleansrv) cleanOldAddresses() (err error) {
func (s *cleansrv) cleanOldEntries() (err error) {
var tx *sql.Tx
tx, err = s.db.Begin()
if err != nil {
@ -52,6 +52,14 @@ func (s *cleansrv) cleanOldAddresses() (err error) {
log.Printf("Clean: %d old addresses", rows)
}
res, err = tx.Stmt(s.prep["cleanRelay"]).Exec()
if err != nil {
return err
}
if rows, _ := res.RowsAffected(); rows > 0 {
log.Printf("Clean: %d old relays", rows)
}
res, err = tx.Stmt(s.prep["cleanDevice"]).Exec()
if err != nil {
return err
@ -60,7 +68,7 @@ func (s *cleansrv) cleanOldAddresses() (err error) {
log.Printf("Clean: %d old devices", rows)
}
var devs, addrs int
var devs, addrs, relays int
row := tx.Stmt(s.prep["countDevice"]).QueryRow()
if err = row.Scan(&devs); err != nil {
return err
@ -69,7 +77,11 @@ func (s *cleansrv) cleanOldAddresses() (err error) {
if err = row.Scan(&addrs); err != nil {
return err
}
row = tx.Stmt(s.prep["countRelay"]).QueryRow()
if err = row.Scan(&relays); err != nil {
return err
}
log.Printf("Database: %d devices, %d addresses", devs, addrs)
log.Printf("Database: %d devices, %d addresses, %d relays", devs, addrs, relays)
return nil
}

View File

@ -2,84 +2,31 @@
package main
import "database/sql"
import (
"database/sql"
"fmt"
)
func setupDB(db *sql.DB) error {
var err error
type setupFunc func(db *sql.DB) error
type compileFunc func(db *sql.DB) (map[string]*sql.Stmt, error)
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS Devices (
DeviceID CHAR(63) NOT NULL PRIMARY KEY,
Seen TIMESTAMP NOT NULL
)`)
if err != nil {
return err
}
var (
setupFuncs = make(map[string]setupFunc)
compileFuncs = make(map[string]compileFunc)
)
row := db.QueryRow(`SELECT 'DevicesDeviceIDIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX DevicesDeviceIDIndex ON Devices (DeviceID)`)
}
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'DevicesSeenIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX DevicesSeenIndex ON Devices (Seen)`)
}
if err != nil {
return err
}
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS Addresses (
DeviceID CHAR(63) NOT NULL,
Seen TIMESTAMP NOT NULL,
Address VARCHAR(42) NOT NULL,
Port INTEGER NOT NULL
)`)
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'AddressesDeviceIDSeenIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX AddressesDeviceIDSeenIndex ON Addresses (DeviceID, Seen)`)
}
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'AddressesDeviceIDAddressPortIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX AddressesDeviceIDAddressPortIndex ON Addresses (DeviceID, Address, Port)`)
}
if err != nil {
return err
}
return nil
func register(name string, setup setupFunc, compile compileFunc) {
setupFuncs[name] = setup
compileFuncs[name] = compile
}
func compileStatements(db *sql.DB) (map[string]*sql.Stmt, error) {
stmts := map[string]string{
"cleanAddress": "DELETE FROM Addresses WHERE Seen < now() - '2 hour'::INTERVAL",
"cleanDevice": "DELETE FROM Devices WHERE Seen < now() - '24 hour'::INTERVAL",
"countAddress": "SELECT count(*) FROM Addresses",
"countDevice": "SELECT count(*) FROM Devices",
"insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address, Port) VALUES ($1, now(), $2, $3)",
"insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())",
"selectAddress": "SELECT Address, Port from Addresses WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16",
"updateAddress": "UPDATE Addresses SET Seen=now() WHERE DeviceID=$1 AND Address=$2 AND Port=$3",
"updateDevice": "UPDATE Devices SET Seen=now() WHERE DeviceID=$1",
func setup(backend string, db *sql.DB) (map[string]*sql.Stmt, error) {
setup, ok := setupFuncs[backend]
if !ok {
return nil, fmt.Errorf("Unsupported backend")
}
res := make(map[string]*sql.Stmt, len(stmts))
for key, stmt := range stmts {
prep, err := db.Prepare(stmt)
if err != nil {
return nil, err
}
res[key] = prep
if err := setup(db); err != nil {
return nil, err
}
return res, nil
return compileFuncs[backend](db)
}

View File

@ -8,10 +8,8 @@ import (
"log"
"net"
"os"
"strings"
"time"
_ "github.com/lib/pq"
"github.com/thejerf/suture"
)
@ -19,9 +17,10 @@ var (
lruSize = 10240
limitAvg = 5
limitBurst = 20
dbConn = getEnvDefault("DISCOSRV_DB", "postgres://user:password@localhost/discosrv")
globalStats stats
statsFile string
backend = "ql"
dsn = getEnvDefault("DISCOSRV_DB_DSN", "memory://discosrv")
)
func main() {
@ -35,30 +34,26 @@ func main() {
log.SetOutput(os.Stdout)
log.SetFlags(0)
flag.StringVar(&listen, "listen", ":22026", "Listen address")
flag.StringVar(&listen, "listen", ":22027", "Listen address")
flag.IntVar(&lruSize, "limit-cache", lruSize, "Limiter cache entries")
flag.IntVar(&limitAvg, "limit-avg", limitAvg, "Allowed average package rate, per 10 s")
flag.IntVar(&limitBurst, "limit-burst", limitBurst, "Allowed burst size, packets")
flag.StringVar(&statsFile, "stats-file", statsFile, "File to write periodic operation stats to")
flag.StringVar(&backend, "db-backend", backend, "Database backend to use")
flag.StringVar(&dsn, "db-dsn", dsn, "Database DSN")
flag.Parse()
addr, _ := net.ResolveUDPAddr("udp", listen)
if !strings.Contains(dbConn, "sslmode=") {
dbConn += "?sslmode=disable"
}
var err error
db, err := sql.Open("postgres", dbConn)
db, err := sql.Open(backend, dsn)
if err != nil {
log.Fatalln("sql.Open:", err)
}
prep, err := setup(backend, db)
if err != nil {
log.Fatalln("Setup:", err)
}
err = setupDB(db)
if err != nil {
log.Fatalln("Setup:", err)
}
prep, err := compileStatements(db)
main := suture.NewSimple("main")

123
cmd/discosrv/psql.go Normal file
View File

@ -0,0 +1,123 @@
// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file).
package main
import (
"database/sql"
_ "github.com/lib/pq"
)
func init() {
register("postgres", postgresSetup, postgresCompile)
}
func postgresSetup(db *sql.DB) error {
var err error
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS Devices (
DeviceID CHAR(63) NOT NULL PRIMARY KEY,
Seen TIMESTAMP NOT NULL
)`)
if err != nil {
return err
}
row := db.QueryRow(`SELECT 'DevicesDeviceIDIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX DevicesDeviceIDIndex ON Devices (DeviceID)`)
}
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'DevicesSeenIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX DevicesSeenIndex ON Devices (Seen)`)
}
if err != nil {
return err
}
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS Addresses (
DeviceID CHAR(63) NOT NULL,
Seen TIMESTAMP NOT NULL,
Address VARCHAR(256) NOT NULL
)`)
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'AddressesDeviceIDSeenIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX AddressesDeviceIDSeenIndex ON Addresses (DeviceID, Seen)`)
}
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'AddressesDeviceIDAddressIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX AddressesDeviceIDAddressIndex ON Addresses (DeviceID, Address)`)
}
if err != nil {
return err
}
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS Relays (
DeviceID CHAR(63) NOT NULL,
Seen TIMESTAMP NOT NULL,
Address VARCHAR(256) NOT NULL,
Latency INTEGER NOT NULL
)`)
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'RelaysDeviceIDSeenIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX RelaysDeviceIDSeenIndex ON Relays (DeviceID, Seen)`)
}
if err != nil {
return err
}
row = db.QueryRow(`SELECT 'RelaysDeviceIDAddressIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX RelaysDeviceIDAddressIndex ON Relays (DeviceID, Address)`)
}
if err != nil {
return err
}
return nil
}
func postgresCompile(db *sql.DB) (map[string]*sql.Stmt, error) {
stmts := map[string]string{
"cleanAddress": "DELETE FROM Addresses WHERE Seen < now() - '2 hour'::INTERVAL",
"cleanRelay": "DELETE FROM Relays WHERE Seen < now() - '2 hour'::INTERVAL",
"cleanDevice": "DELETE FROM Devices WHERE Seen < now() - '24 hour'::INTERVAL",
"countAddress": "SELECT count(*) FROM Addresses",
"countDevice": "SELECT count(*) FROM Devices",
"countRelay": "SELECT count(*) FROM Relays",
"insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)",
"insertRelay": "INSERT INTO Relays (DeviceID, Seen, Address, Latency) VALUES ($1, now(), $2, $3)",
"insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())",
"selectAddress": "SELECT Address from Addresses WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16",
"selectRelay": "SELECT Address, Latency from Relays WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16",
"updateRelay": "UPDATE Relays SET Seen=now(), Latency=$3 WHERE DeviceID=$1 AND Address=$2",
"updateDevice": "UPDATE Devices SET Seen=now() WHERE DeviceID=$1",
"deleteRelay": "DELETE FROM Relays WHERE DeviceID=$1",
}
res := make(map[string]*sql.Stmt, len(stmts))
for key, stmt := range stmts {
prep, err := db.Prepare(stmt)
if err != nil {
return nil, err
}
res[key] = prep
}
return res, nil
}

98
cmd/discosrv/ql.go Normal file
View File

@ -0,0 +1,98 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package main
import (
"database/sql"
"log"
"github.com/cznic/ql"
)
func init() {
ql.RegisterDriver()
register("ql", qlSetup, qlCompile)
}
func qlSetup(db *sql.DB) (err error) {
tx, err := db.Begin()
if err != nil {
return
}
defer func() {
if err == nil {
err = tx.Commit()
} else {
tx.Rollback()
}
}()
_, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Devices (
DeviceID STRING NOT NULL,
Seen TIME NOT NULL
)`)
if err != nil {
return
}
if _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS DevicesDeviceIDIndex ON Devices (DeviceID)`); err != nil {
return
}
_, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Addresses (
DeviceID STRING NOT NULL,
Seen TIME NOT NULL,
Address STRING NOT NULL,
)`)
if err != nil {
return
}
if _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS AddressesDeviceIDAddressIndex ON Addresses (DeviceID, Address)`); err != nil {
return
}
_, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Relays (
DeviceID STRING NOT NULL,
Seen TIME NOT NULL,
Address STRING NOT NULL,
Latency INT NOT NULL,
)`)
if err != nil {
return
}
_, err = tx.Exec(`CREATE INDEX IF NOT EXISTS RelaysDeviceIDAddressIndex ON Relays (DeviceID, Address)`)
return
}
func qlCompile(db *sql.DB) (map[string]*sql.Stmt, error) {
stmts := map[string]string{
"cleanAddress": `DELETE FROM Addresses WHERE Seen < now() - duration("2h")`,
"cleanRelay": `DELETE FROM Relays WHERE Seen < now() - duration("2h")`,
"cleanDevice": `DELETE FROM Devices WHERE Seen < now() - duration("24h")`,
"countAddress": "SELECT count(*) FROM Addresses",
"countDevice": "SELECT count(*) FROM Devices",
"countRelay": "SELECT count(*) FROM Relays",
"insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)",
"insertRelay": "INSERT INTO Relays (DeviceID, Seen, Address, Latency) VALUES ($1, now(), $2, $3)",
"insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())",
"selectAddress": `SELECT Address from Addresses WHERE DeviceID==$1 AND Seen > now() - duration("1h") LIMIT 16`,
"selectRelay": `SELECT Address, Latency from Relays WHERE DeviceID==$1 AND Seen > now() - duration("1h") LIMIT 16`,
"updateAddress": "UPDATE Addresses Seen=now()WHERE DeviceID==$1 AND Address==$2",
"updateDevice": "UPDATE Devices Seen=now() WHERE DeviceID==$1",
"deleteRelay": "DELETE FROM Relays WHERE DeviceID==$1",
}
res := make(map[string]*sql.Stmt, len(stmts))
for key, stmt := range stmts {
prep, err := db.Prepare(stmt)
if err != nil {
log.Println("Failed to compile", stmt)
return nil, err
}
res[key] = prep
}
return res, nil
}

View File

@ -9,6 +9,7 @@ import (
"io"
"log"
"net"
"net/url"
"time"
"github.com/golang/groupcache/lru"
@ -70,7 +71,7 @@ func (s *querysrv) Serve() {
switch magic {
case discover.AnnouncementMagic:
err := s.handleAnnounceV2(addr, buf)
err := s.handleAnnounce(addr, buf)
globalStats.Announce()
if err != nil {
log.Println("Announce:", err)
@ -78,7 +79,7 @@ func (s *querysrv) Serve() {
}
case discover.QueryMagic:
err := s.handleQueryV2(conn, addr, buf)
err := s.handleQuery(conn, addr, buf)
globalStats.Query()
if err != nil {
log.Println("Query:", err)
@ -95,7 +96,7 @@ func (s *querysrv) Stop() {
panic("stop unimplemented")
}
func (s *querysrv) handleAnnounceV2(addr *net.UDPAddr, buf []byte) error {
func (s *querysrv) handleAnnounce(addr *net.UDPAddr, buf []byte) error {
var pkt discover.Announce
err := pkt.UnmarshalXDR(buf)
if err != nil && err != io.EOF {
@ -103,15 +104,7 @@ func (s *querysrv) handleAnnounceV2(addr *net.UDPAddr, buf []byte) error {
}
var id protocol.DeviceID
if len(pkt.This.ID) == 32 {
// Raw node ID
copy(id[:], pkt.This.ID)
} else {
err = id.UnmarshalText(pkt.This.ID)
if err != nil {
return err
}
}
copy(id[:], pkt.This.ID)
if id == protocol.LocalDeviceID {
return fmt.Errorf("Rejecting announce for local device ID from %v", addr)
@ -123,11 +116,40 @@ func (s *querysrv) handleAnnounceV2(addr *net.UDPAddr, buf []byte) error {
}
for _, annAddr := range pkt.This.Addresses {
tip := annAddr.IP
if len(tip) == 0 {
tip = addr.IP
uri, err := url.Parse(annAddr)
if err != nil {
continue
}
if err := s.updateAddress(tx, id, tip, annAddr.Port); err != nil {
host, port, err := net.SplitHostPort(uri.Host)
if err != nil {
continue
}
if len(host) == 0 {
uri.Host = net.JoinHostPort(addr.IP.String(), port)
}
if err := s.updateAddress(tx, id, uri.String()); err != nil {
tx.Rollback()
return err
}
}
_, err = tx.Stmt(s.prep["deleteRelay"]).Exec(id.String())
if err != nil {
tx.Rollback()
return err
}
for _, relay := range pkt.This.Relays {
uri, err := url.Parse(relay.Address)
if err != nil {
continue
}
_, err = tx.Stmt(s.prep["insertRelay"]).Exec(id.String(), uri, relay.Latency)
if err != nil {
tx.Rollback()
return err
}
@ -141,7 +163,7 @@ func (s *querysrv) handleAnnounceV2(addr *net.UDPAddr, buf []byte) error {
return tx.Commit()
}
func (s *querysrv) handleQueryV2(conn *net.UDPConn, addr *net.UDPAddr, buf []byte) error {
func (s *querysrv) handleQuery(conn *net.UDPConn, addr *net.UDPAddr, buf []byte) error {
var pkt discover.Query
err := pkt.UnmarshalXDR(buf)
if err != nil {
@ -149,27 +171,25 @@ func (s *querysrv) handleQueryV2(conn *net.UDPConn, addr *net.UDPAddr, buf []byt
}
var id protocol.DeviceID
if len(pkt.DeviceID) == 32 {
// Raw node ID
copy(id[:], pkt.DeviceID)
} else {
err = id.UnmarshalText(pkt.DeviceID)
if err != nil {
return err
}
}
copy(id[:], pkt.DeviceID)
addrs, err := s.getAddresses(id)
if err != nil {
return err
}
relays, err := s.getRelays(id)
if err != nil {
return err
}
if len(addrs) > 0 {
ann := discover.Announce{
Magic: discover.AnnouncementMagic,
This: discover.Device{
ID: pkt.DeviceID,
Addresses: addrs,
Relays: relays,
},
}
@ -222,14 +242,14 @@ func (s *querysrv) updateDevice(tx *sql.Tx, device protocol.DeviceID) error {
return nil
}
func (s *querysrv) updateAddress(tx *sql.Tx, device protocol.DeviceID, ip net.IP, port uint16) error {
res, err := tx.Stmt(s.prep["updateAddress"]).Exec(device.String(), ip.String(), port)
func (s *querysrv) updateAddress(tx *sql.Tx, device protocol.DeviceID, uri string) error {
res, err := tx.Stmt(s.prep["updateAddress"]).Exec(device.String(), uri)
if err != nil {
return err
}
if rows, _ := res.RowsAffected(); rows == 0 {
_, err := tx.Stmt(s.prep["insertAddress"]).Exec(device.String(), ip.String(), port)
_, err := tx.Stmt(s.prep["insertAddress"]).Exec(device.String(), uri)
if err != nil {
return err
}
@ -238,27 +258,47 @@ func (s *querysrv) updateAddress(tx *sql.Tx, device protocol.DeviceID, ip net.IP
return nil
}
func (s *querysrv) getAddresses(device protocol.DeviceID) ([]discover.Address, error) {
func (s *querysrv) getAddresses(device protocol.DeviceID) ([]string, error) {
rows, err := s.prep["selectAddress"].Query(device.String())
if err != nil {
return nil, err
}
var res []discover.Address
var res []string
for rows.Next() {
var addr string
var port int
err := rows.Scan(&addr, &port)
err := rows.Scan(&addr)
if err != nil {
log.Println("Scan:", err)
continue
}
ip := net.ParseIP(addr)
bs := ip.To4()
if bs == nil {
bs = ip.To16()
}
res = append(res, discover.Address{IP: []byte(bs), Port: uint16(port)})
res = append(res, addr)
}
return res, nil
}
func (s *querysrv) getRelays(device protocol.DeviceID) ([]discover.Relay, error) {
rows, err := s.prep["selectRelay"].Query(device.String())
if err != nil {
return nil, err
}
var res []discover.Relay
for rows.Next() {
var addr string
var latency int32
err := rows.Scan(&addr, &latency)
if err != nil {
log.Println("Scan:", err)
continue
}
res = append(res, discover.Relay{
Address: addr,
Latency: latency,
})
}
return res, nil