Benchmarking

This commit is contained in:
Jakob Borg 2014-01-09 13:58:35 +01:00
parent 71f78f0d62
commit 2935aebe53
5 changed files with 191 additions and 67 deletions

View File

@ -282,7 +282,8 @@ listen:
for nodeID := range nodeAddrs {
if nodeID == remoteID {
m.AddConnection(conn, remoteID)
protoConn := protocol.NewConnection(remoteID, conn, conn, m)
m.AddConnection(conn, protoConn)
continue listen
}
}
@ -351,7 +352,8 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.M
continue
}
m.AddConnection(conn, remoteID)
protoConn := protocol.NewConnection(remoteID, conn, conn, m)
m.AddConnection(conn, protoConn)
continue nextNode
}
}

View File

@ -31,12 +31,12 @@ type Model struct {
sync.RWMutex
dir string
global map[string]File // the latest version of each file as it exists in the cluster
local map[string]File // the files we currently have locally on disk
remote map[string]map[string]File
need map[string]bool // the files we need to update
nodes map[string]*protocol.Connection
rawConn map[string]io.ReadWriteCloser
global map[string]File // the latest version of each file as it exists in the cluster
local map[string]File // the files we currently have locally on disk
remote map[string]map[string]File
need map[string]bool // the files we need to update
protoConn map[string]Connection
rawConn map[string]io.Closer
updatedLocal int64 // timestamp of last update to local
updateGlobal int64 // timestamp of last update to remote
@ -55,6 +55,13 @@ type Model struct {
fileWasSuppressed map[string]int
}
type Connection interface {
ID() string
Index([]protocol.FileInfo)
Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error)
Statistics() protocol.Statistics
}
const (
idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
@ -78,8 +85,8 @@ func NewModel(dir string) *Model {
local: make(map[string]File),
remote: make(map[string]map[string]File),
need: make(map[string]bool),
nodes: make(map[string]*protocol.Connection),
rawConn: make(map[string]io.ReadWriteCloser),
protoConn: make(map[string]Connection),
rawConn: make(map[string]io.Closer),
lastIdxBcast: time.Now(),
trace: make(map[string]bool),
fileLastChanged: make(map[string]time.Time),
@ -141,7 +148,7 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
defer m.RUnlock()
var res = make(map[string]ConnectionInfo)
for node, conn := range m.nodes {
for node, conn := range m.protoConn {
ci := ConnectionInfo{
Statistics: conn.Statistics(),
}
@ -288,7 +295,7 @@ func (m *Model) Close(node string, err error) {
}
delete(m.remote, node)
delete(m.nodes, node)
delete(m.protoConn, node)
delete(m.rawConn, node)
m.recomputeGlobal()
@ -383,7 +390,7 @@ func (m *Model) SeedLocal(fs []protocol.FileInfo) {
func (m *Model) ConnectedTo(nodeID string) bool {
m.RLock()
defer m.RUnlock()
_, ok := m.nodes[nodeID]
_, ok := m.protoConn[nodeID]
return ok
}
@ -402,12 +409,11 @@ func (m *Model) RepoID() string {
// AddConnection adds a new peer connection to the model. An initial index will
// be sent to the connected peer, thereafter index updates whenever the local
// repository changes.
func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
node := protocol.NewConnection(nodeID, conn, conn, m)
func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
nodeID := protoConn.ID()
m.Lock()
m.nodes[nodeID] = node
m.rawConn[nodeID] = conn
m.protoConn[nodeID] = protoConn
m.rawConn[nodeID] = rawConn
m.Unlock()
m.RLock()
@ -415,7 +421,7 @@ func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
m.RUnlock()
go func() {
node.Index(idx)
protoConn.Index(idx)
}()
}
@ -461,7 +467,7 @@ func (m *Model) protocolIndex() []protocol.FileInfo {
func (m *Model) requestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
m.RLock()
nc, ok := m.nodes[nodeID]
nc, ok := m.protoConn[nodeID]
m.RUnlock()
if !ok {
return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
@ -485,10 +491,10 @@ func (m *Model) broadcastIndexLoop() {
if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
m.Lock()
var indexWg sync.WaitGroup
indexWg.Add(len(m.nodes))
indexWg.Add(len(m.protoConn))
idx := m.protocolIndex()
m.lastIdxBcast = time.Now()
for _, node := range m.nodes {
for _, node := range m.protoConn {
node := node
if m.trace["net"] {
log.Printf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
@ -547,10 +553,14 @@ func (m *Model) recomputeGlobal() {
newGlobal[n] = f
}
var highestMod int64
for _, fs := range m.remote {
for n, nf := range fs {
if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
newGlobal[n] = nf
if nf.Modified > highestMod {
highestMod = nf.Modified
}
}
}
}
@ -558,7 +568,7 @@ func (m *Model) recomputeGlobal() {
// Figure out if anything actually changed
var updated bool
if len(newGlobal) != len(m.global) {
if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) {
updated = true
} else {
for n, f0 := range newGlobal {
@ -616,14 +626,14 @@ func (m *Model) whoHas(name string) []string {
}
func fileFromFileInfo(f protocol.FileInfo) File {
var blocks []Block
var blocks = make([]Block, len(f.Blocks))
var offset uint64
for _, b := range f.Blocks {
blocks = append(blocks, Block{
for i, b := range f.Blocks {
blocks[i] = Block{
Offset: offset,
Length: b.Length,
Hash: b.Hash,
})
}
offset += uint64(b.Length)
}
return File{
@ -636,12 +646,12 @@ func fileFromFileInfo(f protocol.FileInfo) File {
}
func fileInfoFromFile(f File) protocol.FileInfo {
var blocks []protocol.BlockInfo
for _, b := range f.Blocks {
blocks = append(blocks, protocol.BlockInfo{
var blocks = make([]protocol.BlockInfo, len(f.Blocks))
for i, b := range f.Blocks {
blocks[i] = protocol.BlockInfo{
Length: b.Length,
Hash: b.Hash,
})
}
}
return protocol.FileInfo{
Name: f.Name,

View File

@ -2,6 +2,7 @@ package model
import (
"bytes"
"fmt"
"os"
"reflect"
"testing"
@ -406,3 +407,113 @@ func TestIgnoreWithUnknownFlags(t *testing.T) {
t.Error("Model not should include", invalid)
}
}
func prepareModel(n int, m *Model) []protocol.FileInfo {
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
files := make([]protocol.FileInfo, n)
t := time.Now().Unix()
for i := 0; i < n; i++ {
files[i] = protocol.FileInfo{
Name: fmt.Sprintf("file%d", i),
Modified: t,
Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}},
}
}
m.Index("42", files)
return files
}
func BenchmarkRecomputeGlobal10k(b *testing.B) {
m := NewModel("testdata")
prepareModel(10000, m)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.recomputeGlobal()
}
}
func BenchmarkRecomputeNeed10K(b *testing.B) {
m := NewModel("testdata")
prepareModel(10000, m)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.recomputeNeed()
}
}
func BenchmarkIndexUpdate10000(b *testing.B) {
m := NewModel("testdata")
files := prepareModel(10000, m)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.IndexUpdate("42", files)
}
}
type FakeConnection struct {
id string
requestData []byte
}
func (FakeConnection) Close() error {
return nil
}
func (f FakeConnection) ID() string {
return string(f.id)
}
func (FakeConnection) Index([]protocol.FileInfo) {}
func (f FakeConnection) Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
return f.requestData, nil
}
func (FakeConnection) Ping() bool {
return true
}
func (FakeConnection) Statistics() protocol.Statistics {
return protocol.Statistics{}
}
func BenchmarkRequest(b *testing.B) {
m := NewModel("testdata")
fs, _ := m.Walk(false)
m.ReplaceLocal(fs)
const n = 1000
files := make([]protocol.FileInfo, n)
t := time.Now().Unix()
for i := 0; i < n; i++ {
files[i] = protocol.FileInfo{
Name: fmt.Sprintf("file%d", i),
Modified: t,
Blocks: []protocol.BlockInfo{{100, []byte("some hash bytes")}},
}
}
fc := FakeConnection{
id: "42",
requestData: []byte("some data to return"),
}
m.AddConnection(fc, fc)
m.Index("42", files)
b.ResetTimer()
for i := 0; i < b.N; i++ {
data, err := m.requestGlobal("42", files[i%n].Name, 0, 32, nil)
if err != nil {
b.Error(err)
}
if data == nil {
b.Error("nil data")
}
}
}

View File

@ -52,7 +52,7 @@ type Model interface {
type Connection struct {
sync.RWMutex
ID string
id string
receiver Model
reader io.Reader
mreader *marshalReader
@ -89,13 +89,13 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
}
c := Connection{
id: nodeID,
receiver: receiver,
reader: flrd,
mreader: &marshalReader{r: flrd},
writer: flwr,
mwriter: &marshalWriter{w: flwr},
awaiting: make(map[int]chan asyncResult),
ID: nodeID,
}
go c.readerLoop()
@ -104,6 +104,10 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
return &c
}
func (c *Connection) ID() string {
return c.id
}
// Index writes the list of file information to the connected peer node
func (c *Connection) Index(idx []FileInfo) {
c.Lock()
@ -137,10 +141,10 @@ func (c *Connection) Index(idx []FileInfo) {
c.Unlock()
if err != nil {
c.Close(err)
c.close(err)
return
} else if c.mwriter.err != nil {
c.Close(c.mwriter.err)
c.close(c.mwriter.err)
return
}
}
@ -158,13 +162,13 @@ func (c *Connection) Request(name string, offset uint64, size uint32, hash []byt
c.mwriter.writeRequest(request{name, offset, size, hash})
if c.mwriter.err != nil {
c.Unlock()
c.Close(c.mwriter.err)
c.close(c.mwriter.err)
return nil, c.mwriter.err
}
err := c.flush()
if err != nil {
c.Unlock()
c.Close(err)
c.close(err)
return nil, err
}
c.nextId = (c.nextId + 1) & 0xfff
@ -177,7 +181,7 @@ func (c *Connection) Request(name string, offset uint64, size uint32, hash []byt
return res.val, res.err
}
func (c *Connection) Ping() bool {
func (c *Connection) ping() bool {
c.Lock()
if c.closed {
c.Unlock()
@ -189,11 +193,11 @@ func (c *Connection) Ping() bool {
err := c.flush()
if err != nil {
c.Unlock()
c.Close(err)
c.close(err)
return false
} else if c.mwriter.err != nil {
c.Unlock()
c.Close(c.mwriter.err)
c.close(c.mwriter.err)
return false
}
c.nextId = (c.nextId + 1) & 0xfff
@ -203,9 +207,6 @@ func (c *Connection) Ping() bool {
return ok && res.err == nil
}
func (c *Connection) Stop() {
}
type flusher interface {
Flush() error
}
@ -217,7 +218,7 @@ func (c *Connection) flush() error {
return nil
}
func (c *Connection) Close(err error) {
func (c *Connection) close(err error) {
c.Lock()
if c.closed {
c.Unlock()
@ -230,7 +231,7 @@ func (c *Connection) Close(err error) {
c.awaiting = nil
c.Unlock()
c.receiver.Close(c.ID, err)
c.receiver.Close(c.id, err)
}
func (c *Connection) isClosed() bool {
@ -244,11 +245,11 @@ loop:
for {
hdr := c.mreader.readHeader()
if c.mreader.err != nil {
c.Close(c.mreader.err)
c.close(c.mreader.err)
break loop
}
if hdr.version != 0 {
c.Close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version))
c.close(fmt.Errorf("Protocol error: %s: unknown message version %#x", c.ID, hdr.version))
break loop
}
@ -256,10 +257,10 @@ loop:
case messageTypeIndex:
files := c.mreader.readIndex()
if c.mreader.err != nil {
c.Close(c.mreader.err)
c.close(c.mreader.err)
break loop
} else {
c.receiver.Index(c.ID, files)
c.receiver.Index(c.id, files)
}
c.Lock()
c.hasRecvdIndex = true
@ -268,16 +269,16 @@ loop:
case messageTypeIndexUpdate:
files := c.mreader.readIndex()
if c.mreader.err != nil {
c.Close(c.mreader.err)
c.close(c.mreader.err)
break loop
} else {
c.receiver.IndexUpdate(c.ID, files)
c.receiver.IndexUpdate(c.id, files)
}
case messageTypeRequest:
req := c.mreader.readRequest()
if c.mreader.err != nil {
c.Close(c.mreader.err)
c.close(c.mreader.err)
break loop
}
go c.processRequest(hdr.msgID, req)
@ -286,7 +287,7 @@ loop:
data := c.mreader.readResponse()
if c.mreader.err != nil {
c.Close(c.mreader.err)
c.close(c.mreader.err)
break loop
} else {
c.Lock()
@ -306,10 +307,10 @@ loop:
err := c.flush()
c.Unlock()
if err != nil {
c.Close(err)
c.close(err)
break loop
} else if c.mwriter.err != nil {
c.Close(c.mwriter.err)
c.close(c.mwriter.err)
break loop
}
@ -328,14 +329,14 @@ loop:
}
default:
c.Close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
break loop
}
}
}
func (c *Connection) processRequest(msgID int, req request) {
data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash)
data, _ := c.receiver.Request(c.id, req.name, req.offset, req.size, req.hash)
c.Lock()
c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse}))
@ -345,9 +346,9 @@ func (c *Connection) processRequest(msgID int, req request) {
buffers.Put(data)
if err != nil {
c.Close(err)
c.close(err)
} else if c.mwriter.err != nil {
c.Close(c.mwriter.err)
c.close(c.mwriter.err)
}
}
@ -362,15 +363,15 @@ func (c *Connection) pingerLoop() {
if ready {
go func() {
rc <- c.Ping()
rc <- c.ping()
}()
select {
case ok := <-rc:
if !ok {
c.Close(fmt.Errorf("Ping failure"))
c.close(fmt.Errorf("Ping failure"))
}
case <-time.After(pingTimeout):
c.Close(fmt.Errorf("Ping timeout"))
c.close(fmt.Errorf("Ping timeout"))
}
}
}

View File

@ -46,10 +46,10 @@ func TestPing(t *testing.T) {
c0 := NewConnection("c0", ar, bw, nil)
c1 := NewConnection("c1", br, aw, nil)
if ok := c0.Ping(); !ok {
if ok := c0.ping(); !ok {
t.Error("c0 ping failed")
}
if ok := c1.Ping(); !ok {
if ok := c1.ping(); !ok {
t.Error("c1 ping failed")
}
}
@ -70,7 +70,7 @@ func TestPingErr(t *testing.T) {
c0 := NewConnection("c0", ar, ebw, m0)
NewConnection("c1", br, eaw, m1)
res := c0.Ping()
res := c0.ping()
if (i < 4 || j < 4) && res {
t.Errorf("Unexpected ping success; i=%d, j=%d", i, j)
} else if (i >= 8 && j >= 8) && !res {
@ -190,7 +190,7 @@ func TestClose(t *testing.T) {
c0 := NewConnection("c0", ar, bw, m0)
NewConnection("c1", br, aw, m1)
c0.Close(nil)
c0.close(nil)
ok := c0.isClosed()
if !ok {
@ -199,7 +199,7 @@ func TestClose(t *testing.T) {
// None of these should panic, some should return an error
ok = c0.Ping()
ok = c0.ping()
if ok {
t.Error("Ping should not return true")
}