diff --git a/cmd/discosrv/README.md b/cmd/discosrv/README.md index ec0191d3e..f60f2d628 100644 --- a/cmd/discosrv/README.md +++ b/cmd/discosrv/README.md @@ -5,7 +5,24 @@ discosrv This is the global discovery server for the `syncthing` project. -`go get github.com/syncthing/discosrv` +To get it, run `go get github.com/syncthing/discosrv` or download the +[latest build](http://build.syncthing.net/job/discosrv/lastSuccessfulBuild/artifact/) +from the build server. -Or download the latest [Linux build](http://build.syncthing.net/job/discosrv/lastSuccessfulBuild/artifact/). +Usage +----- +The discovery server requires a postgresql backend server. You will need +to create a database and a user with permissions to create tables in it. +Set the database URL in the environment variable `DISCOSRV_DB` before +starting discosrv. + +```bash +$ export DISCOSRV_DB="postgres://user:password@localhost/databasename" +$ discosrv +``` + +The appropriate tables and indexes will be created at first startup. If +it doesn't exit with an error, you're fine. + +See `discosrv -help` for other options. diff --git a/cmd/discosrv/clean.go b/cmd/discosrv/clean.go new file mode 100644 index 000000000..a2115190c --- /dev/null +++ b/cmd/discosrv/clean.go @@ -0,0 +1,75 @@ +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "database/sql" + "log" + "time" +) + +type cleansrv struct { + intv time.Duration + db *sql.DB + prep map[string]*sql.Stmt +} + +func (s *cleansrv) Serve() { + for { + time.Sleep(next(s.intv)) + + err := s.cleanOldAddresses() + if err != nil { + log.Println("Clean:", err) + } + } +} + +func (s *cleansrv) Stop() { + panic("stop unimplemented") +} + +func (s *cleansrv) cleanOldAddresses() (err error) { + var tx *sql.Tx + tx, err = s.db.Begin() + if err != nil { + return err + } + + defer func() { + if err == nil { + err = tx.Commit() + } else { + tx.Rollback() + } + }() + + res, err := tx.Stmt(s.prep["cleanAddress"]).Exec() + if err != nil { + return err + } + if rows, _ := res.RowsAffected(); rows > 0 { + log.Printf("Clean: %d old addresses", rows) + } + + res, err = tx.Stmt(s.prep["cleanDevice"]).Exec() + if err != nil { + return err + } + if rows, _ := res.RowsAffected(); rows > 0 { + log.Printf("Clean: %d old devices", rows) + } + + var devs, addrs int + row := tx.Stmt(s.prep["countDevice"]).QueryRow() + if err = row.Scan(&devs); err != nil { + return err + } + row = tx.Stmt(s.prep["countAddress"]).QueryRow() + if err = row.Scan(&addrs); err != nil { + return err + } + + log.Printf("Database: %d devices, %d addresses", devs, addrs) + return nil +} diff --git a/cmd/discosrv/db.go b/cmd/discosrv/db.go new file mode 100644 index 000000000..84f558133 --- /dev/null +++ b/cmd/discosrv/db.go @@ -0,0 +1,85 @@ +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). + +package main + +import "database/sql" + +func setupDB(db *sql.DB) error { + var err error + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Devices ( + DeviceID CHAR(63) NOT NULL PRIMARY KEY, + Seen TIMESTAMP NOT NULL + )`) + if err != nil { + return err + } + + row := db.QueryRow(`SELECT 'DevicesDeviceIDIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX DevicesDeviceIDIndex ON Devices (DeviceID)`) + } + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'DevicesSeenIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX DevicesSeenIndex ON Devices (Seen)`) + } + if err != nil { + return err + } + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Addresses ( + DeviceID CHAR(63) NOT NULL, + Seen TIMESTAMP NOT NULL, + Address VARCHAR(42) NOT NULL, + Port INTEGER NOT NULL + )`) + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'AddressesDeviceIDSeenIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX AddressesDeviceIDSeenIndex ON Addresses (DeviceID, Seen)`) + } + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'AddressesDeviceIDAddressPortIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX AddressesDeviceIDAddressPortIndex ON Addresses (DeviceID, Address, Port)`) + } + if err != nil { + return err + } + + return nil +} + +func compileStatements(db *sql.DB) (map[string]*sql.Stmt, error) { + stmts := map[string]string{ + "cleanAddress": "DELETE FROM Addresses WHERE Seen < now() - '2 hour'::INTERVAL", + "cleanDevice": "DELETE FROM Devices WHERE Seen < now() - '24 hour'::INTERVAL", + "countAddress": "SELECT count(*) FROM Addresses", + "countDevice": "SELECT count(*) FROM Devices", + "insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address, Port) VALUES ($1, now(), $2, $3)", + "insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())", + "selectAddress": "SELECT Address, Port from Addresses WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16", + "updateAddress": "UPDATE Addresses SET Seen=now() WHERE DeviceID=$1 AND Address=$2 AND Port=$3", + "updateDevice": "UPDATE Devices SET Seen=now() WHERE DeviceID=$1", + } + + res := make(map[string]*sql.Stmt, len(stmts)) + for key, stmt := range stmts { + prep, err := db.Prepare(stmt) + if err != nil { + return nil, err + } + res[key] = prep + } + return res, nil +} diff --git a/cmd/discosrv/main.go b/cmd/discosrv/main.go index 7c95866bd..c01312477 100644 --- a/cmd/discosrv/main.go +++ b/cmd/discosrv/main.go @@ -1,392 +1,94 @@ -// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). package main import ( - "bytes" - "encoding/binary" + "database/sql" "flag" - "fmt" - "io" "log" "net" "os" - "path/filepath" - "sync" + "strings" "time" - "github.com/golang/groupcache/lru" - "github.com/juju/ratelimit" - "github.com/syncthing/protocol" - "github.com/syncthing/syncthing/internal/discover" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" + _ "github.com/lib/pq" + "github.com/thejerf/suture" ) -const cacheLimitSeconds = 3600 - var ( - lock sync.Mutex - queries = 0 - announces = 0 - answered = 0 - limited = 0 - unknowns = 0 - debug = false - lruSize = 1024 - limitAvg = 1 - limitBurst = 10 - limiter *lru.Cache + lruSize = 10240 + limitAvg = 5 + limitBurst = 20 + dbConn = getEnvDefault("DISCOSRV_DB", "postgres://user:password@localhost/discosrv") + globalStats stats ) func main() { + const ( + cleanIntv = 1 * time.Hour + statsIntv = 5 * time.Minute + ) + var listen string - var timestamp bool - var statsIntv int - var statsFile string - var unknownFile string - var dbDir string + + log.SetOutput(os.Stdout) + log.SetFlags(0) flag.StringVar(&listen, "listen", ":22026", "Listen address") - flag.BoolVar(&debug, "debug", false, "Enable debug output") - flag.BoolVar(×tamp, "timestamp", true, "Timestamp the log output") - flag.IntVar(&statsIntv, "stats-intv", 0, "Statistics output interval (s)") - flag.StringVar(&statsFile, "stats-file", "/var/discosrv/stats", "Statistics file name") - flag.StringVar(&unknownFile, "unknown-file", "", "Unknown packet log file name") flag.IntVar(&lruSize, "limit-cache", lruSize, "Limiter cache entries") flag.IntVar(&limitAvg, "limit-avg", limitAvg, "Allowed average package rate, per 10 s") flag.IntVar(&limitBurst, "limit-burst", limitBurst, "Allowed burst size, packets") - flag.StringVar(&dbDir, "db-dir", "/var/discosrv/db", "Database directory") flag.Parse() - limiter = lru.New(lruSize) - - log.SetOutput(os.Stdout) - if !timestamp { - log.SetFlags(0) - } - addr, _ := net.ResolveUDPAddr("udp", listen) - conn, err := net.ListenUDP("udp", addr) + + if !strings.Contains(dbConn, "sslmode=") { + dbConn += "?sslmode=disable" + } + + var err error + db, err := sql.Open("postgres", dbConn) if err != nil { - log.Fatal(err) + log.Fatalln("Setup:", err) } - - parentDir := filepath.Dir(dbDir) - if _, err := os.Stat(parentDir); err != nil && os.IsNotExist(err) { - err = os.MkdirAll(parentDir, 0755) - if err != nil { - log.Fatal(err) - } - } - - db, err := leveldb.OpenFile(dbDir, &opt.Options{OpenFilesCacheCapacity: 32}) + err = setupDB(db) if err != nil { - log.Fatal(err) + log.Fatalln("Setup:", err) } - statsLog, err := os.OpenFile(statsFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - log.Fatal(err) - } + prep, err := compileStatements(db) - var unknownLog io.Writer - if unknownFile != "" { - unknownLog, err = os.OpenFile(unknownFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - log.Fatal(err) - } - } + main := suture.NewSimple("main") - if statsIntv > 0 { - go logStats(statsLog, statsIntv) - } + main.Add(&querysrv{ + addr: addr, + db: db, + prep: prep, + }) - go clean(statsLog, db) + main.Add(&cleansrv{ + intv: cleanIntv, + db: db, + prep: prep, + }) - var buf = make([]byte, 1024) - for { - buf = buf[:cap(buf)] - n, addr, err := conn.ReadFromUDP(buf) + main.Add(&statssrv{ + intv: statsIntv, + }) - if limit(addr) { - // Rate limit in effect for source - continue - } - - if err != nil { - log.Fatal(err) - } - - if n < 4 { - log.Printf("Received short packet (%d bytes)", n) - continue - } - - buf = buf[:n] - magic := binary.BigEndian.Uint32(buf) - - switch magic { - case discover.AnnouncementMagic: - err := handleAnnounceV2(db, addr, buf) - if err != nil && unknownLog != nil { - fmt.Fprintf(unknownLog, "AE %d %v %x\n", time.Now().Unix(), addr, buf) - } - - case discover.QueryMagic: - err := handleQueryV2(db, conn, addr, buf) - if err != nil && unknownLog != nil { - fmt.Fprintf(unknownLog, "QE %d %v %x\n", time.Now().Unix(), addr, buf) - } - - default: - lock.Lock() - unknowns++ - lock.Unlock() - if unknownLog != nil { - fmt.Fprintf(unknownLog, "UN %d %v %x\n", time.Now().Unix(), addr, buf) - } - } - } + globalStats.Reset() + main.Serve() } -func limit(addr *net.UDPAddr) bool { - key := addr.IP.String() - - lock.Lock() - defer lock.Unlock() - - bkt, ok := limiter.Get(key) - if ok { - bkt := bkt.(*ratelimit.Bucket) - if bkt.TakeAvailable(1) != 1 { - // Rate limit exceeded; ignore packet - if debug { - log.Println("Rate limit exceeded for", key) - } - limited++ - return true - } - } else { - if debug { - log.Println("New limiter for", key) - } - // One packet per ten seconds average rate, burst ten packets - limiter.Add(key, ratelimit.NewBucket(10*time.Second/time.Duration(limitAvg), int64(limitBurst))) +func getEnvDefault(key, def string) string { + if val := os.Getenv(key); val != "" { + return val } - - return false + return def } -func handleAnnounceV2(db *leveldb.DB, addr *net.UDPAddr, buf []byte) error { - var pkt discover.Announce - err := pkt.UnmarshalXDR(buf) - if err != nil && err != io.EOF { - return err - } - if debug { - log.Printf("<- %v %#v", addr, pkt) - } - - lock.Lock() - announces++ - lock.Unlock() - - ip := addr.IP.To4() - if ip == nil { - ip = addr.IP.To16() - } - - var addrs []address - now := time.Now().Unix() - for _, addr := range pkt.This.Addresses { - tip := addr.IP - if len(tip) == 0 { - tip = ip - } - addrs = append(addrs, address{ - ip: tip, - port: addr.Port, - seen: now, - }) - } - - var id protocol.DeviceID - if len(pkt.This.ID) == 32 { - // Raw node ID - copy(id[:], pkt.This.ID) - } else { - err = id.UnmarshalText(pkt.This.ID) - if err != nil { - return err - } - } - - update(db, id, addrs) - return nil -} - -func handleQueryV2(db *leveldb.DB, conn *net.UDPConn, addr *net.UDPAddr, buf []byte) error { - var pkt discover.Query - err := pkt.UnmarshalXDR(buf) - if err != nil { - return err - } - if debug { - log.Printf("<- %v %#v", addr, pkt) - } - - var id protocol.DeviceID - if len(pkt.DeviceID) == 32 { - // Raw node ID - copy(id[:], pkt.DeviceID) - } else { - err = id.UnmarshalText(pkt.DeviceID) - if err != nil { - return err - } - } - - lock.Lock() - queries++ - lock.Unlock() - - addrs := get(db, id) - - now := time.Now().Unix() - if len(addrs) > 0 { - ann := discover.Announce{ - Magic: discover.AnnouncementMagic, - This: discover.Device{ - ID: pkt.DeviceID, - }, - } - for _, addr := range addrs { - if now-addr.seen > cacheLimitSeconds { - continue - } - ann.This.Addresses = append(ann.This.Addresses, discover.Address{IP: addr.ip, Port: addr.port}) - } - if debug { - log.Printf("-> %v %#v", addr, pkt) - } - - if len(ann.This.Addresses) == 0 { - return nil - } - - tb, err := ann.MarshalXDR() - if err != nil { - log.Println("QueryV2 response marshal:", err) - return nil - } - _, err = conn.WriteToUDP(tb, addr) - if err != nil { - log.Println("QueryV2 response write:", err) - return nil - } - - lock.Lock() - answered++ - lock.Unlock() - } - return nil -} - -func next(intv int) time.Time { - d := time.Duration(intv) * time.Second +func next(intv time.Duration) time.Duration { t0 := time.Now() - t1 := t0.Add(d).Truncate(d) - time.Sleep(t1.Sub(t0)) - return t1 -} - -func logStats(statsLog io.Writer, intv int) { - for { - t := next(intv) - - lock.Lock() - - fmt.Fprintf(statsLog, "%d Queries:%d Answered:%d Announces:%d Unknown:%d Limited:%d\n", - t.Unix(), queries, answered, announces, unknowns, limited) - - queries = 0 - announces = 0 - answered = 0 - limited = 0 - unknowns = 0 - - lock.Unlock() - } -} - -func get(db *leveldb.DB, id protocol.DeviceID) []address { - var addrs addressList - val, err := db.Get(id[:], nil) - if err == nil { - addrs.UnmarshalXDR(val) - } - return addrs.addresses -} - -func update(db *leveldb.DB, id protocol.DeviceID, addrs []address) { - var newAddrs addressList - - val, err := db.Get(id[:], nil) - if err == nil { - newAddrs.UnmarshalXDR(val) - } - -nextAddr: - for _, newAddr := range addrs { - for i, exAddr := range newAddrs.addresses { - if bytes.Compare(newAddr.ip, exAddr.ip) == 0 { - newAddrs.addresses[i] = newAddr - continue nextAddr - } - } - newAddrs.addresses = append(newAddrs.addresses, newAddr) - } - - db.Put(id[:], newAddrs.MarshalXDR(), nil) -} - -func clean(statsLog io.Writer, db *leveldb.DB) { - for { - now := next(cacheLimitSeconds) - nowSecs := now.Unix() - - var kept, deleted int64 - iter := db.NewIterator(nil, nil) - for iter.Next() { - var addrs addressList - addrs.UnmarshalXDR(iter.Value()) - - // Remove expired addresses - newAddrs := addrs.addresses - for i := 0; i < len(newAddrs); i++ { - if nowSecs-newAddrs[i].seen > cacheLimitSeconds { - newAddrs[i] = newAddrs[len(newAddrs)-1] - newAddrs = newAddrs[:len(newAddrs)-1] - } - } - - // Delete empty records - if len(newAddrs) == 0 { - db.Delete(iter.Key(), nil) - deleted++ - continue - } - - // Update changed records - if len(newAddrs) != len(addrs.addresses) { - addrs.addresses = newAddrs - db.Put(iter.Key(), addrs.MarshalXDR(), nil) - } - kept++ - } - iter.Release() - - fmt.Fprintf(statsLog, "%d Kept:%d Deleted:%d Took:%0.04fs\n", nowSecs, kept, deleted, time.Since(now).Seconds()) - } + t1 := t0.Add(intv).Truncate(intv) + return t1.Sub(t0) } diff --git a/cmd/discosrv/querysrv.go b/cmd/discosrv/querysrv.go new file mode 100644 index 000000000..410c90386 --- /dev/null +++ b/cmd/discosrv/querysrv.go @@ -0,0 +1,265 @@ +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "database/sql" + "encoding/binary" + "fmt" + "io" + "log" + "net" + "time" + + "github.com/golang/groupcache/lru" + "github.com/juju/ratelimit" + "github.com/syncthing/protocol" + "github.com/syncthing/syncthing/internal/discover" +) + +type querysrv struct { + addr *net.UDPAddr + db *sql.DB + prep map[string]*sql.Stmt + limiter *lru.Cache +} + +func (s *querysrv) Serve() { + s.limiter = lru.New(lruSize) + + conn, err := net.ListenUDP("udp", s.addr) + if err != nil { + log.Println("Listen:", err) + return + } + + // Attempt to set the read and write buffers to 2^24 bytes (16 MB) or as high as + // possible. + for i := 24; i >= 16; i-- { + if conn.SetReadBuffer(1<= 16; i-- { + if conn.SetWriteBuffer(1< 0 { + ann := discover.Announce{ + Magic: discover.AnnouncementMagic, + This: discover.Device{ + ID: pkt.DeviceID, + Addresses: addrs, + }, + } + + tb, err := ann.MarshalXDR() + if err != nil { + return fmt.Errorf("QueryV2 response marshal: %v", err) + } + _, err = conn.WriteToUDP(tb, addr) + if err != nil { + return fmt.Errorf("QueryV2 response write: %v", err) + } + + globalStats.Answer() + } + + return nil +} + +func (s *querysrv) limit(addr *net.UDPAddr) bool { + key := addr.IP.String() + + bkt, ok := s.limiter.Get(key) + if ok { + bkt := bkt.(*ratelimit.Bucket) + if bkt.TakeAvailable(1) != 1 { + // Rate limit exceeded; ignore packet + return true + } + } else { + // One packet per ten seconds average rate, burst ten packets + s.limiter.Add(key, ratelimit.NewBucket(10*time.Second/time.Duration(limitAvg), int64(limitBurst))) + } + + return false +} + +func (s *querysrv) updateDevice(tx *sql.Tx, device protocol.DeviceID) error { + res, err := tx.Stmt(s.prep["updateDevice"]).Exec(device.String()) + if err != nil { + return err + } + + if rows, _ := res.RowsAffected(); rows == 0 { + _, err := tx.Stmt(s.prep["insertDevice"]).Exec(device.String()) + if err != nil { + return err + } + } + + return nil +} + +func (s *querysrv) updateAddress(tx *sql.Tx, device protocol.DeviceID, ip net.IP, port uint16) error { + res, err := tx.Stmt(s.prep["updateAddress"]).Exec(device.String(), ip.String(), port) + if err != nil { + return err + } + + if rows, _ := res.RowsAffected(); rows == 0 { + _, err := tx.Stmt(s.prep["insertAddress"]).Exec(device.String(), ip.String(), port) + if err != nil { + return err + } + } + + return nil +} + +func (s *querysrv) getAddresses(device protocol.DeviceID) ([]discover.Address, error) { + rows, err := s.prep["selectAddress"].Query(device.String()) + if err != nil { + return nil, err + } + + var res []discover.Address + for rows.Next() { + var addr string + var port int + err := rows.Scan(&addr, &port) + if err != nil { + log.Println("Scan:", err) + continue + } + ip := net.ParseIP(addr) + bs := ip.To4() + if bs == nil { + bs = ip.To16() + } + res = append(res, discover.Address{IP: []byte(bs), Port: uint16(port)}) + } + + return res, nil +} diff --git a/cmd/discosrv/stats.go b/cmd/discosrv/stats.go new file mode 100644 index 000000000..c0e1029b5 --- /dev/null +++ b/cmd/discosrv/stats.go @@ -0,0 +1,70 @@ +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "log" + "sync" + "time" +) + +type stats struct { + mut sync.Mutex + reset time.Time + announces int64 + queries int64 + answers int64 + errors int64 +} + +func (s *stats) Announce() { + s.mut.Lock() + s.announces++ + s.mut.Unlock() +} + +func (s *stats) Query() { + s.mut.Lock() + s.queries++ + s.mut.Unlock() +} + +func (s *stats) Answer() { + s.mut.Lock() + s.answers++ + s.mut.Unlock() +} + +func (s *stats) Error() { + s.mut.Lock() + s.errors++ + s.mut.Unlock() +} + +func (s *stats) Reset() stats { + s.mut.Lock() + ns := *s + s.announces, s.queries, s.answers = 0, 0, 0 + s.reset = time.Now() + s.mut.Unlock() + return ns +} + +type statssrv struct { + intv time.Duration +} + +func (s *statssrv) Serve() { + for { + time.Sleep(next(s.intv)) + + stats := globalStats.Reset() + s := time.Since(stats.reset).Seconds() + log.Printf("Stats: %.02f announces/s, %.02f queries/s, %.02f answers/s, %.02f errors/s", + float64(stats.announces)/s, float64(stats.queries)/s, float64(stats.answers)/s, float64(stats.errors)/s) + } +} + +func (s *statssrv) Stop() { + panic("stop unimplemented") +} diff --git a/cmd/discosrv/types.go b/cmd/discosrv/types.go deleted file mode 100644 index fda44caed..000000000 --- a/cmd/discosrv/types.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). - -package main - -type address struct { - ip []byte - port uint16 - seen int64 // epoch seconds -} - -type addressList struct { - addresses []address -} diff --git a/cmd/discosrv/types_xdr.go b/cmd/discosrv/types_xdr.go deleted file mode 100644 index a4da8e684..000000000 --- a/cmd/discosrv/types_xdr.go +++ /dev/null @@ -1,147 +0,0 @@ -// ************************************************************ -// This file is automatically generated by genxdr. Do not edit. -// ************************************************************ - -package main - -import ( - "bytes" - "io" - - "github.com/calmh/xdr" -) - -/* - -address Structure: - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -| Length of ip | -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -/ / -\ ip (variable length) \ -/ / -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -| 0x0000 | port | -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -| | -+ seen (64 bits) + -| | -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - - -struct address { - opaque ip<>; - unsigned int port; - hyper seen; -} - -*/ - -func (o address) EncodeXDR(w io.Writer) (int, error) { - var xw = xdr.NewWriter(w) - return o.encodeXDR(xw) -} - -func (o address) MarshalXDR() []byte { - return o.AppendXDR(make([]byte, 0, 128)) -} - -func (o address) AppendXDR(bs []byte) []byte { - var aw = xdr.AppendWriter(bs) - var xw = xdr.NewWriter(&aw) - o.encodeXDR(xw) - return []byte(aw) -} - -func (o address) encodeXDR(xw *xdr.Writer) (int, error) { - xw.WriteBytes(o.ip) - xw.WriteUint16(o.port) - xw.WriteUint64(uint64(o.seen)) - return xw.Tot(), xw.Error() -} - -func (o *address) DecodeXDR(r io.Reader) error { - xr := xdr.NewReader(r) - return o.decodeXDR(xr) -} - -func (o *address) UnmarshalXDR(bs []byte) error { - var br = bytes.NewReader(bs) - var xr = xdr.NewReader(br) - return o.decodeXDR(xr) -} - -func (o *address) decodeXDR(xr *xdr.Reader) error { - o.ip = xr.ReadBytes() - o.port = xr.ReadUint16() - o.seen = int64(xr.ReadUint64()) - return xr.Error() -} - -/* - -addressList Structure: - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -| Number of addresses | -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -/ / -\ Zero or more address Structures \ -/ / -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - - -struct addressList { - address addresses<>; -} - -*/ - -func (o addressList) EncodeXDR(w io.Writer) (int, error) { - var xw = xdr.NewWriter(w) - return o.encodeXDR(xw) -} - -func (o addressList) MarshalXDR() []byte { - return o.AppendXDR(make([]byte, 0, 128)) -} - -func (o addressList) AppendXDR(bs []byte) []byte { - var aw = xdr.AppendWriter(bs) - var xw = xdr.NewWriter(&aw) - o.encodeXDR(xw) - return []byte(aw) -} - -func (o addressList) encodeXDR(xw *xdr.Writer) (int, error) { - xw.WriteUint32(uint32(len(o.addresses))) - for i := range o.addresses { - o.addresses[i].encodeXDR(xw) - } - return xw.Tot(), xw.Error() -} - -func (o *addressList) DecodeXDR(r io.Reader) error { - xr := xdr.NewReader(r) - return o.decodeXDR(xr) -} - -func (o *addressList) UnmarshalXDR(bs []byte) error { - var br = bytes.NewReader(bs) - var xr = xdr.NewReader(br) - return o.decodeXDR(xr) -} - -func (o *addressList) decodeXDR(xr *xdr.Reader) error { - _addressesSize := int(xr.ReadUint32()) - o.addresses = make([]address, _addressesSize) - for i := range o.addresses { - (&o.addresses[i]).decodeXDR(xr) - } - return xr.Error() -}