Memory usage optimizations

This commit is contained in:
Jakob Borg 2013-12-29 20:33:57 -05:00
parent 469e96126a
commit 976baff44f
5 changed files with 77 additions and 53 deletions

View File

@ -1,13 +1,26 @@
package buffers package buffers
var buffers = make(chan []byte, 32) const (
largeMin = 1024
)
var (
smallBuffers = make(chan []byte, 32)
largeBuffers = make(chan []byte, 32)
)
func Get(size int) []byte { func Get(size int) []byte {
var ch = largeBuffers
if size < largeMin {
ch = smallBuffers
}
var buf []byte var buf []byte
select { select {
case buf = <-buffers: case buf = <-ch:
default: default:
} }
if len(buf) < size { if len(buf) < size {
return make([]byte, size) return make([]byte, size)
} }
@ -15,12 +28,18 @@ func Get(size int) []byte {
} }
func Put(buf []byte) { func Put(buf []byte) {
if cap(buf) == 0 { buf = buf[:cap(buf)]
if len(buf) == 0 {
return return
} }
buf = buf[:cap(buf)]
var ch = largeBuffers
if len(buf) < largeMin {
ch = smallBuffers
}
select { select {
case buffers <- buf: case ch <- buf:
default: default:
} }
} }

View File

@ -111,7 +111,7 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
// Files marked as deleted do not even enter the model // Files marked as deleted do not even enter the model
continue continue
} }
m.remote[nodeID][f.Name] = fileFromProtocol(f) m.remote[nodeID][f.Name] = fileFromFileInfo(f)
} }
m.recomputeGlobal() m.recomputeGlobal()
@ -136,7 +136,7 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
// Files marked as deleted do not even enter the model // Files marked as deleted do not even enter the model
continue continue
} }
repo[f.Name] = fileFromProtocol(f) repo[f.Name] = fileFromFileInfo(f)
} }
m.recomputeGlobal() m.recomputeGlobal()
@ -149,7 +149,7 @@ func (m *Model) SeedIndex(fs []protocol.FileInfo) {
m.local = make(map[string]File) m.local = make(map[string]File)
for _, f := range fs { for _, f := range fs {
m.local[f.Name] = fileFromProtocol(f) m.local[f.Name] = fileFromFileInfo(f)
} }
m.recomputeGlobal() m.recomputeGlobal()
@ -396,17 +396,7 @@ func (m *Model) ProtocolIndex() []protocol.FileInfo {
func (m *Model) protocolIndex() []protocol.FileInfo { func (m *Model) protocolIndex() []protocol.FileInfo {
var index []protocol.FileInfo var index []protocol.FileInfo
for _, f := range m.local { for _, f := range m.local {
mf := protocol.FileInfo{ mf := fileInfoFromFile(f)
Name: f.Name,
Flags: f.Flags,
Modified: int64(f.Modified),
}
for _, b := range f.Blocks {
mf.Blocks = append(mf.Blocks, protocol.BlockInfo{
Length: b.Length,
Hash: b.Hash,
})
}
if opts.Debug.TraceIdx { if opts.Debug.TraceIdx {
var flagComment string var flagComment string
if mf.Flags&FlagDeleted != 0 { if mf.Flags&FlagDeleted != 0 {
@ -436,7 +426,7 @@ func (m *Model) AddNode(node *protocol.Connection) {
go node.Index(idx) go node.Index(idx)
} }
func fileFromProtocol(f protocol.FileInfo) File { func fileFromFileInfo(f protocol.FileInfo) File {
var blocks []Block var blocks []Block
var offset uint64 var offset uint64
for _, b := range f.Blocks { for _, b := range f.Blocks {
@ -445,6 +435,7 @@ func fileFromProtocol(f protocol.FileInfo) File {
Length: b.Length, Length: b.Length,
Hash: b.Hash, Hash: b.Hash,
}) })
buffers.Put(b.Hash)
offset += uint64(b.Length) offset += uint64(b.Length)
} }
return File{ return File{
@ -454,3 +445,19 @@ func fileFromProtocol(f protocol.FileInfo) File {
Blocks: blocks, Blocks: blocks,
} }
} }
func fileInfoFromFile(f File) protocol.FileInfo {
var blocks []protocol.BlockInfo
for _, b := range f.Blocks {
blocks = append(blocks, protocol.BlockInfo{
Length: b.Length,
Hash: b.Hash,
})
}
return protocol.FileInfo{
Name: f.Name,
Flags: f.Flags,
Modified: int64(f.Modified),
Blocks: blocks,
}
}

View File

@ -21,6 +21,7 @@ type marshalWriter struct {
w io.Writer w io.Writer
tot int tot int
err error err error
b [8]byte
} }
// We will never encode nor expect to decode blobs larger than 10 MB. Check // We will never encode nor expect to decode blobs larger than 10 MB. Check
@ -57,12 +58,11 @@ func (w *marshalWriter) writeUint32(v uint32) {
if w.err != nil { if w.err != nil {
return return
} }
var b [4]byte w.b[0] = byte(v >> 24)
b[0] = byte(v >> 24) w.b[1] = byte(v >> 16)
b[1] = byte(v >> 16) w.b[2] = byte(v >> 8)
b[2] = byte(v >> 8) w.b[3] = byte(v)
b[3] = byte(v) _, w.err = w.w.Write(w.b[:4])
_, w.err = w.w.Write(b[:])
w.tot += 4 w.tot += 4
} }
@ -70,16 +70,15 @@ func (w *marshalWriter) writeUint64(v uint64) {
if w.err != nil { if w.err != nil {
return return
} }
var b [8]byte w.b[0] = byte(v >> 56)
b[0] = byte(v >> 56) w.b[1] = byte(v >> 48)
b[1] = byte(v >> 48) w.b[2] = byte(v >> 40)
b[2] = byte(v >> 40) w.b[3] = byte(v >> 32)
b[3] = byte(v >> 32) w.b[4] = byte(v >> 24)
b[4] = byte(v >> 24) w.b[5] = byte(v >> 16)
b[5] = byte(v >> 16) w.b[6] = byte(v >> 8)
b[6] = byte(v >> 8) w.b[7] = byte(v)
b[7] = byte(v) _, w.err = w.w.Write(w.b[:8])
_, w.err = w.w.Write(b[:])
w.tot += 8 w.tot += 8
} }
@ -87,6 +86,7 @@ type marshalReader struct {
r io.Reader r io.Reader
tot int tot int
err error err error
b [8]byte
} }
func (r *marshalReader) readString() string { func (r *marshalReader) readString() string {
@ -117,19 +117,17 @@ func (r *marshalReader) readUint32() uint32 {
if r.err != nil { if r.err != nil {
return 0 return 0
} }
var b [4]byte _, r.err = io.ReadFull(r.r, r.b[:4])
_, r.err = io.ReadFull(r.r, b[:])
r.tot += 4 r.tot += 4
return uint32(b[3]) | uint32(b[2])<<8 | uint32(b[1])<<16 | uint32(b[0])<<24 return uint32(r.b[3]) | uint32(r.b[2])<<8 | uint32(r.b[1])<<16 | uint32(r.b[0])<<24
} }
func (r *marshalReader) readUint64() uint64 { func (r *marshalReader) readUint64() uint64 {
if r.err != nil { if r.err != nil {
return 0 return 0
} }
var b [8]byte _, r.err = io.ReadFull(r.r, r.b[:8])
_, r.err = io.ReadFull(r.r, b[:])
r.tot += 8 r.tot += 8
return uint64(b[7]) | uint64(b[6])<<8 | uint64(b[5])<<16 | uint64(b[4])<<24 | return uint64(r.b[7]) | uint64(r.b[6])<<8 | uint64(r.b[5])<<16 | uint64(r.b[4])<<24 |
uint64(b[3])<<32 | uint64(b[2])<<40 | uint64(b[1])<<48 | uint64(b[0])<<56 uint64(r.b[3])<<32 | uint64(r.b[2])<<40 | uint64(r.b[1])<<48 | uint64(r.b[0])<<56
} }

View File

@ -48,7 +48,7 @@ func (w *marshalWriter) writeIndex(idx []FileInfo) {
} }
func WriteIndex(w io.Writer, idx []FileInfo) (int, error) { func WriteIndex(w io.Writer, idx []FileInfo) (int, error) {
mw := marshalWriter{w, 0, nil} mw := marshalWriter{w: w}
mw.writeIndex(idx) mw.writeIndex(idx)
return mw.tot, mw.err return mw.tot, mw.err
} }
@ -90,7 +90,7 @@ func (r *marshalReader) readIndex() []FileInfo {
} }
func ReadIndex(r io.Reader) ([]FileInfo, error) { func ReadIndex(r io.Reader) ([]FileInfo, error) {
mr := marshalReader{r, 0, nil} mr := marshalReader{r: r}
idx := mr.readIndex() idx := mr.readIndex()
return idx, mr.err return idx, mr.err
} }

View File

@ -57,7 +57,7 @@ type Connection struct {
lastReceive time.Time lastReceive time.Time
peerLatency time.Duration peerLatency time.Duration
lastStatistics Statistics lastStatistics Statistics
lastIndexSent map[string]FileInfo indexSent map[string]int64
} }
var ErrClosed = errors.New("Connection closed") var ErrClosed = errors.New("Connection closed")
@ -80,9 +80,9 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
c := Connection{ c := Connection{
receiver: receiver, receiver: receiver,
reader: flrd, reader: flrd,
mreader: &marshalReader{flrd, 0, nil}, mreader: &marshalReader{r: flrd},
writer: flwr, writer: flwr,
mwriter: &marshalWriter{flwr, 0, nil}, mwriter: &marshalWriter{w: flwr},
awaiting: make(map[int]chan asyncResult), awaiting: make(map[int]chan asyncResult),
lastReceive: time.Now(), lastReceive: time.Now(),
ID: nodeID, ID: nodeID,
@ -100,22 +100,22 @@ func (c *Connection) Index(idx []FileInfo) {
c.Lock() c.Lock()
var msgType int var msgType int
if c.lastIndexSent == nil { if c.indexSent == nil {
// This is the first time we send an index. // This is the first time we send an index.
msgType = messageTypeIndex msgType = messageTypeIndex
c.lastIndexSent = make(map[string]FileInfo) c.indexSent = make(map[string]int64)
for _, f := range idx { for _, f := range idx {
c.lastIndexSent[f.Name] = f c.indexSent[f.Name] = f.Modified
} }
} else { } else {
// We have sent one full index. Only send updates now. // We have sent one full index. Only send updates now.
msgType = messageTypeIndexUpdate msgType = messageTypeIndexUpdate
var diff []FileInfo var diff []FileInfo
for _, f := range idx { for _, f := range idx {
if ef, ok := c.lastIndexSent[f.Name]; !ok || ef.Modified != f.Modified { if modified, ok := c.indexSent[f.Name]; !ok || f.Modified != modified {
diff = append(diff, f) diff = append(diff, f)
c.lastIndexSent[f.Name] = f c.indexSent[f.Name] = f.Modified
} }
} }
idx = diff idx = diff