From 992ad97ad5540c7c61debb321b9b3f6acdcaa9bc Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Sun, 16 Nov 2014 13:18:52 +0000 Subject: [PATCH] Cache file descriptors --- Godeps/Godeps.json | 8 +- .../AudriusButkevicius/lfu-go/LICENSE | 19 -- .../AudriusButkevicius/lfu-go/README.md | 19 -- .../AudriusButkevicius/lfu-go/lfu.go | 156 -------------- .../AudriusButkevicius/lfu-go/lfu_test.go | 68 ------ .../AudriusButkevicius/lrufdcache/.gitignore | 24 +++ .../AudriusButkevicius/lrufdcache/LICENSE | 25 +++ .../AudriusButkevicius/lrufdcache/README.md | 4 + .../lrufdcache/lrufdcache.go | 88 ++++++++ .../lrufdcache/lrufdcache_test.go | 195 ++++++++++++++++++ .../github.com/golang/groupcache/lru/lru.go | 121 +++++++++++ .../golang/groupcache/lru/lru_test.go | 73 +++++++ internal/model/model.go | 9 +- internal/model/puller.go | 32 +-- 14 files changed, 546 insertions(+), 295 deletions(-) delete mode 100644 Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/LICENSE delete mode 100644 Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/README.md delete mode 100644 Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/lfu.go delete mode 100644 Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/lfu_test.go create mode 100644 Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/.gitignore create mode 100644 Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/LICENSE create mode 100644 Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/README.md create mode 100644 Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/lrufdcache.go create mode 100644 Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/lrufdcache_test.go create mode 100644 Godeps/_workspace/src/github.com/golang/groupcache/lru/lru.go create mode 100644 Godeps/_workspace/src/github.com/golang/groupcache/lru/lru_test.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index dda1a7add..8ecbf4744 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -6,8 +6,8 @@ ], "Deps": [ { - "ImportPath": "github.com/AudriusButkevicius/lfu-go", - "Rev": "164bcecceb92fd6037f4d18a8d97b495ec6ef669" + "ImportPath": "github.com/AudriusButkevicius/lrufdcache", + "Rev": "9bddff8f67224ab3e7d80525a6ae9bcf1ce10769" }, { "ImportPath": "github.com/bkaradzic/go-lz4", @@ -25,6 +25,10 @@ "ImportPath": "github.com/calmh/xdr", "Rev": "ec3d404f43731551258977b38dd72cf557d00398" }, + { + "ImportPath": "github.com/golang/groupcache/lru", + "Rev": "f391194b967ae0d21deadc861ea87120d9687447" + }, { "ImportPath": "github.com/juju/ratelimit", "Rev": "f9f36d11773655c0485207f0ad30dc2655f69d56" diff --git a/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/LICENSE b/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/LICENSE deleted file mode 100644 index 431a0037f..000000000 --- a/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/LICENSE +++ /dev/null @@ -1,19 +0,0 @@ -Copyright (C) 2012 Dave Grijalva - -Permission is hereby granted, free of charge, to any person obtaining a -copy of this software and associated documentation files (the "Software"), -to deal in the Software without restriction, including without limitation -the rights to use, copy, modify, merge, publish, distribute, sublicense, -and/or sell copies of the Software, and to permit persons to whom the -Software is furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included -in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. \ No newline at end of file diff --git a/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/README.md b/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/README.md deleted file mode 100644 index 2a0742586..000000000 --- a/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/README.md +++ /dev/null @@ -1,19 +0,0 @@ -A simple LFU cache for golang. Based on the paper [An O(1) algorithm for implementing the LFU cache eviction scheme](http://dhruvbird.com/lfu.pdf). - -Usage: - -```go -import "github.com/dgrijalva/lfu-go" - -// Make a new thing -c := lfu.New() - -// Set some values -c.Set("myKey", myValue) - -// Retrieve some values -myValue = c.Get("myKey") - -// Evict some values -c.Evict(1) -``` \ No newline at end of file diff --git a/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/lfu.go b/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/lfu.go deleted file mode 100644 index cfe387156..000000000 --- a/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/lfu.go +++ /dev/null @@ -1,156 +0,0 @@ -package lfu - -import ( - "container/list" - "sync" -) - -type Eviction struct { - Key string - Value interface{} -} - -type Cache struct { - // If len > UpperBound, cache will automatically evict - // down to LowerBound. If either value is 0, this behavior - // is disabled. - UpperBound int - LowerBound int - values map[string]*cacheEntry - freqs *list.List - len int - lock *sync.Mutex - EvictionChannel chan<- Eviction -} - -type cacheEntry struct { - key string - value interface{} - freqNode *list.Element -} - -type listEntry struct { - entries map[*cacheEntry]byte - freq int -} - -func New() *Cache { - c := new(Cache) - c.values = make(map[string]*cacheEntry) - c.freqs = list.New() - c.lock = new(sync.Mutex) - return c -} - -func (c *Cache) Get(key string) interface{} { - c.lock.Lock() - defer c.lock.Unlock() - if e, ok := c.values[key]; ok { - c.increment(e) - return e.value - } - return nil -} - -func (c *Cache) Set(key string, value interface{}) { - c.lock.Lock() - defer c.lock.Unlock() - if e, ok := c.values[key]; ok { - // value already exists for key. overwrite - e.value = value - c.increment(e) - } else { - // value doesn't exist. insert - e := new(cacheEntry) - e.key = key - e.value = value - c.values[key] = e - c.increment(e) - c.len++ - // bounds mgmt - if c.UpperBound > 0 && c.LowerBound > 0 { - if c.len > c.UpperBound { - c.evict(c.len - c.LowerBound) - } - } - } -} - -func (c *Cache) Len() int { - c.lock.Lock() - defer c.lock.Unlock() - return c.len -} - -func (c *Cache) Evict(count int) int { - c.lock.Lock() - defer c.lock.Unlock() - return c.evict(count) -} - -func (c *Cache) evict(count int) int { - // No lock here so it can be called - // from within the lock (during Set) - var evicted int - for i := 0; i < count; { - if place := c.freqs.Front(); place != nil { - for entry, _ := range place.Value.(*listEntry).entries { - if i < count { - if c.EvictionChannel != nil { - c.EvictionChannel <- Eviction{ - Key: entry.key, - Value: entry.value, - } - } - delete(c.values, entry.key) - c.remEntry(place, entry) - evicted++ - c.len-- - i++ - } - } - } - } - return evicted -} - -func (c *Cache) increment(e *cacheEntry) { - currentPlace := e.freqNode - var nextFreq int - var nextPlace *list.Element - if currentPlace == nil { - // new entry - nextFreq = 1 - nextPlace = c.freqs.Front() - } else { - // move up - nextFreq = currentPlace.Value.(*listEntry).freq + 1 - nextPlace = currentPlace.Next() - } - - if nextPlace == nil || nextPlace.Value.(*listEntry).freq != nextFreq { - // create a new list entry - li := new(listEntry) - li.freq = nextFreq - li.entries = make(map[*cacheEntry]byte) - if currentPlace != nil { - nextPlace = c.freqs.InsertAfter(li, currentPlace) - } else { - nextPlace = c.freqs.PushFront(li) - } - } - e.freqNode = nextPlace - nextPlace.Value.(*listEntry).entries[e] = 1 - if currentPlace != nil { - // remove from current position - c.remEntry(currentPlace, e) - } -} - -func (c *Cache) remEntry(place *list.Element, entry *cacheEntry) { - entries := place.Value.(*listEntry).entries - delete(entries, entry) - if len(entries) == 0 { - c.freqs.Remove(place) - } -} diff --git a/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/lfu_test.go b/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/lfu_test.go deleted file mode 100644 index 61d97c0a4..000000000 --- a/Godeps/_workspace/src/github.com/AudriusButkevicius/lfu-go/lfu_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package lfu - -import ( - "fmt" - "testing" -) - -func TestLFU(t *testing.T) { - c := New() - c.Set("a", "a") - if v := c.Get("a"); v != "a" { - t.Errorf("Value was not saved: %v != 'a'", v) - } - if l := c.Len(); l != 1 { - t.Errorf("Length was not updated: %v != 1", l) - } - - c.Set("b", "b") - if v := c.Get("b"); v != "b" { - t.Errorf("Value was not saved: %v != 'b'", v) - } - if l := c.Len(); l != 2 { - t.Errorf("Length was not updated: %v != 2", l) - } - - c.Get("a") - evicted := c.Evict(1) - if v := c.Get("a"); v != "a" { - t.Errorf("Value was improperly evicted: %v != 'a'", v) - } - if v := c.Get("b"); v != nil { - t.Errorf("Value was not evicted: %v", v) - } - if l := c.Len(); l != 1 { - t.Errorf("Length was not updated: %v != 1", l) - } - if evicted != 1 { - t.Errorf("Number of evicted items is wrong: %v != 1", evicted) - } -} - -func TestBoundsMgmt(t *testing.T) { - c := New() - c.UpperBound = 10 - c.LowerBound = 5 - - for i := 0; i < 100; i++ { - c.Set(fmt.Sprintf("%v", i), i) - } - if c.Len() > 10 { - t.Errorf("Bounds management failed to evict properly: %v", c.Len()) - } -} - -func TestEviction(t *testing.T) { - ch := make(chan Eviction, 1) - - c := New() - c.EvictionChannel = ch - c.Set("a", "b") - c.Evict(1) - - ev := <-ch - - if ev.Key != "a" || ev.Value.(string) != "b" { - t.Error("Incorrect item") - } -} diff --git a/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/.gitignore b/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/.gitignore new file mode 100644 index 000000000..daf913b1b --- /dev/null +++ b/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/LICENSE b/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/LICENSE new file mode 100644 index 000000000..a84c39566 --- /dev/null +++ b/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/LICENSE @@ -0,0 +1,25 @@ +This is free and unencumbered software released into the public domain. + +Anyone is free to copy, modify, publish, use, compile, sell, or +distribute this software, either in source code form or as a compiled +binary, for any purpose, commercial or non-commercial, and by any +means. + +In jurisdictions that recognize copyright laws, the author or authors +of this software dedicate any and all copyright interest in the +software to the public domain. We make this dedication for the benefit +of the public at large and to the detriment of our heirs and +successors. We intend this dedication to be an overt act of +relinquishment in perpetuity of all present and future rights to this +software under copyright law. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +OTHER DEALINGS IN THE SOFTWARE. + +For more information, please refer to + diff --git a/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/README.md b/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/README.md new file mode 100644 index 000000000..388b04b73 --- /dev/null +++ b/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/README.md @@ -0,0 +1,4 @@ +lrufdcache +========== + +A LRU file descriptor cache diff --git a/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/lrufdcache.go b/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/lrufdcache.go new file mode 100644 index 000000000..acc5f3095 --- /dev/null +++ b/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/lrufdcache.go @@ -0,0 +1,88 @@ +// Package logger implements a LRU file descriptor cache for concurrent ReadAt +// calls. +package lrufdcache + +import ( + "os" + "sync" + + "github.com/golang/groupcache/lru" +) + +// A wrapper around *os.File which counts references +type CachedFile struct { + file *os.File + wg sync.WaitGroup + // Locking between file.Close and file.ReadAt + // (just to please the race detector...) + flock sync.RWMutex +} + +// Tells the cache that we are done using the file, but it's up to the cache +// to decide when this file will really be closed. The error, if any, will be +// lost. +func (f *CachedFile) Close() error { + f.wg.Done() + return nil +} + +// Read the file at the given offset. +func (f *CachedFile) ReadAt(buf []byte, at int64) (int, error) { + f.flock.RLock() + defer f.flock.RUnlock() + return f.file.ReadAt(buf, at) +} + +type FileCache struct { + cache *lru.Cache + mut sync.Mutex +} + +// Create a new cache with the number of entries to hold. +func NewCache(entries int) *FileCache { + c := FileCache{ + cache: lru.New(entries), + } + + c.cache.OnEvicted = func(key lru.Key, fdi interface{}) { + // The file might not have been closed by all openers yet, therefore + // spawn a routine which waits for that to happen and then closes the + // file. + go func(item *CachedFile) { + item.wg.Wait() + item.flock.Lock() + item.file.Close() + item.flock.Unlock() + }(fdi.(*CachedFile)) + } + return &c +} + +// Open and cache a file descriptor or use an existing cached descriptor for +// the given path. +func (c *FileCache) Open(path string) (*CachedFile, error) { + // Evictions can only happen during c.cache.Add, and there is a potential + // race between c.cache.Get and cfd.wg.Add where if not guarded by a mutex + // could result in cfd getting closed before the counter is incremented if + // a concurrent routine does a c.cache.Add + c.mut.Lock() + defer c.mut.Unlock() + fdi, ok := c.cache.Get(path) + if ok { + cfd := fdi.(*CachedFile) + cfd.wg.Add(1) + return cfd, nil + } + + fd, err := os.Open(path) + if err != nil { + return nil, err + } + cfd := &CachedFile{ + file: fd, + wg: sync.WaitGroup{}, + } + cfd.wg.Add(1) + c.cache.Add(path, cfd) + return cfd, nil +} diff --git a/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/lrufdcache_test.go b/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/lrufdcache_test.go new file mode 100644 index 000000000..8c2297707 --- /dev/null +++ b/Godeps/_workspace/src/github.com/AudriusButkevicius/lrufdcache/lrufdcache_test.go @@ -0,0 +1,195 @@ +package lrufdcache + +import ( + "io/ioutil" + "os" + "sync" + "time" + + "testing" +) + +func TestNoopReadFailsOnClosed(t *testing.T) { + fd, err := ioutil.TempFile("", "fdcache") + if err != nil { + t.Fatal(err) + return + } + fd.WriteString("test") + fd.Close() + buf := make([]byte, 4) + defer os.Remove(fd.Name()) + + _, err = fd.ReadAt(buf, 0) + if err == nil { + t.Fatal("Expected error") + } +} + +func TestSingleFileEviction(t *testing.T) { + c := NewCache(1) + + wg := sync.WaitGroup{} + + fd, err := ioutil.TempFile("", "fdcache") + if err != nil { + t.Fatal(err) + return + } + fd.WriteString("test") + fd.Close() + buf := make([]byte, 4) + defer os.Remove(fd.Name()) + + for k := 0; k < 100; k++ { + wg.Add(1) + go func() { + defer wg.Done() + + cfd, err := c.Open(fd.Name()) + if err != nil { + t.Fatal(err) + return + } + defer cfd.Close() + + _, err = cfd.ReadAt(buf, 0) + if err != nil { + t.Fatal(err) + } + }() + } + + wg.Wait() +} + +func TestMultifileEviction(t *testing.T) { + c := NewCache(1) + + wg := sync.WaitGroup{} + + for k := 0; k < 100; k++ { + wg.Add(1) + go func() { + defer wg.Done() + + fd, err := ioutil.TempFile("", "fdcache") + if err != nil { + t.Fatal(err) + return + } + fd.WriteString("test") + fd.Close() + buf := make([]byte, 4) + defer os.Remove(fd.Name()) + + cfd, err := c.Open(fd.Name()) + if err != nil { + t.Fatal(err) + return + } + defer cfd.Close() + + _, err = cfd.ReadAt(buf, 0) + if err != nil { + t.Fatal(err) + } + }() + } + + wg.Wait() +} + +func TestMixedEviction(t *testing.T) { + c := NewCache(1) + + wg := sync.WaitGroup{} + wg2 := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg2.Add(1) + go func() { + defer wg2.Done() + fd, err := ioutil.TempFile("", "fdcache") + if err != nil { + t.Fatal(err) + return + } + fd.WriteString("test") + fd.Close() + buf := make([]byte, 4) + + for k := 0; k < 100; k++ { + wg.Add(1) + go func() { + defer wg.Done() + + cfd, err := c.Open(fd.Name()) + if err != nil { + t.Fatal(err) + return + } + defer cfd.Close() + + _, err = cfd.ReadAt(buf, 0) + if err != nil { + t.Fatal(err) + } + }() + } + }() + } + + wg2.Wait() + wg.Wait() +} + +func TestLimit(t *testing.T) { + testcase := 50 + fd, err := ioutil.TempFile("", "fdcache") + if err != nil { + t.Fatal(err) + return + } + fd.Close() + defer os.Remove(fd.Name()) + + c := NewCache(testcase) + fds := make([]*CachedFile, testcase*2) + for i := 0; i < testcase*2; i++ { + fd, err := ioutil.TempFile("", "fdcache") + if err != nil { + t.Fatal(err) + return + } + fd.WriteString("test") + fd.Close() + defer os.Remove(fd.Name()) + + nfd, err := c.Open(fd.Name()) + if err != nil { + t.Fatal(err) + return + } + fds = append(fds, nfd) + nfd.Close() + } + + // Allow closes to happen + time.Sleep(time.Millisecond * 100) + + buf := make([]byte, 4) + ok := 0 + for _, fd := range fds { + if fd == nil { + continue + } + _, err := fd.ReadAt(buf, 0) + if err == nil { + ok++ + } + } + + if ok > testcase { + t.Fatal("More than", testcase, "fds open") + } +} diff --git a/Godeps/_workspace/src/github.com/golang/groupcache/lru/lru.go b/Godeps/_workspace/src/github.com/golang/groupcache/lru/lru.go new file mode 100644 index 000000000..cdfe2991f --- /dev/null +++ b/Godeps/_workspace/src/github.com/golang/groupcache/lru/lru.go @@ -0,0 +1,121 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package lru implements an LRU cache. +package lru + +import "container/list" + +// Cache is an LRU cache. It is not safe for concurrent access. +type Cache struct { + // MaxEntries is the maximum number of cache entries before + // an item is evicted. Zero means no limit. + MaxEntries int + + // OnEvicted optionally specificies a callback function to be + // executed when an entry is purged from the cache. + OnEvicted func(key Key, value interface{}) + + ll *list.List + cache map[interface{}]*list.Element +} + +// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators +type Key interface{} + +type entry struct { + key Key + value interface{} +} + +// New creates a new Cache. +// If maxEntries is zero, the cache has no limit and it's assumed +// that eviction is done by the caller. +func New(maxEntries int) *Cache { + return &Cache{ + MaxEntries: maxEntries, + ll: list.New(), + cache: make(map[interface{}]*list.Element), + } +} + +// Add adds a value to the cache. +func (c *Cache) Add(key Key, value interface{}) { + if c.cache == nil { + c.cache = make(map[interface{}]*list.Element) + c.ll = list.New() + } + if ee, ok := c.cache[key]; ok { + c.ll.MoveToFront(ee) + ee.Value.(*entry).value = value + return + } + ele := c.ll.PushFront(&entry{key, value}) + c.cache[key] = ele + if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries { + c.RemoveOldest() + } +} + +// Get looks up a key's value from the cache. +func (c *Cache) Get(key Key) (value interface{}, ok bool) { + if c.cache == nil { + return + } + if ele, hit := c.cache[key]; hit { + c.ll.MoveToFront(ele) + return ele.Value.(*entry).value, true + } + return +} + +// Remove removes the provided key from the cache. +func (c *Cache) Remove(key Key) { + if c.cache == nil { + return + } + if ele, hit := c.cache[key]; hit { + c.removeElement(ele) + } +} + +// RemoveOldest removes the oldest item from the cache. +func (c *Cache) RemoveOldest() { + if c.cache == nil { + return + } + ele := c.ll.Back() + if ele != nil { + c.removeElement(ele) + } +} + +func (c *Cache) removeElement(e *list.Element) { + c.ll.Remove(e) + kv := e.Value.(*entry) + delete(c.cache, kv.key) + if c.OnEvicted != nil { + c.OnEvicted(kv.key, kv.value) + } +} + +// Len returns the number of items in the cache. +func (c *Cache) Len() int { + if c.cache == nil { + return 0 + } + return c.ll.Len() +} diff --git a/Godeps/_workspace/src/github.com/golang/groupcache/lru/lru_test.go b/Godeps/_workspace/src/github.com/golang/groupcache/lru/lru_test.go new file mode 100644 index 000000000..98a2656e8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/golang/groupcache/lru/lru_test.go @@ -0,0 +1,73 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lru + +import ( + "testing" +) + +type simpleStruct struct { + int + string +} + +type complexStruct struct { + int + simpleStruct +} + +var getTests = []struct { + name string + keyToAdd interface{} + keyToGet interface{} + expectedOk bool +}{ + {"string_hit", "myKey", "myKey", true}, + {"string_miss", "myKey", "nonsense", false}, + {"simple_struct_hit", simpleStruct{1, "two"}, simpleStruct{1, "two"}, true}, + {"simeple_struct_miss", simpleStruct{1, "two"}, simpleStruct{0, "noway"}, false}, + {"complex_struct_hit", complexStruct{1, simpleStruct{2, "three"}}, + complexStruct{1, simpleStruct{2, "three"}}, true}, +} + +func TestGet(t *testing.T) { + for _, tt := range getTests { + lru := New(0) + lru.Add(tt.keyToAdd, 1234) + val, ok := lru.Get(tt.keyToGet) + if ok != tt.expectedOk { + t.Fatalf("%s: cache hit = %v; want %v", tt.name, ok, !ok) + } else if ok && val != 1234 { + t.Fatalf("%s expected get to return 1234 but got %v", tt.name, val) + } + } +} + +func TestRemove(t *testing.T) { + lru := New(0) + lru.Add("myKey", 1234) + if val, ok := lru.Get("myKey"); !ok { + t.Fatal("TestRemove returned no match") + } else if val != 1234 { + t.Fatalf("TestRemove failed. Expected %d, got %v", 1234, val) + } + + lru.Remove("myKey") + if _, ok := lru.Get("myKey"); ok { + t.Fatal("TestRemove returned a removed entry") + } +} diff --git a/internal/model/model.go b/internal/model/model.go index c47602c73..13135166b 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -41,6 +41,8 @@ import ( "github.com/syncthing/syncthing/internal/stats" "github.com/syncthing/syncthing/internal/symlinks" "github.com/syncthing/syncthing/internal/versioner" + + "github.com/AudriusButkevicius/lrufdcache" "github.com/syndtr/goleveldb/leveldb" ) @@ -86,6 +88,7 @@ type Model struct { db *leveldb.DB finder *files.BlockFinder progressEmitter *ProgressEmitter + cache *lrufdcache.FileCache deviceName string clientName string @@ -127,6 +130,7 @@ func NewModel(cfg *config.ConfigWrapper, deviceName, clientName, clientVersion s m := &Model{ cfg: cfg, db: db, + cache: lrufdcache.NewCache(25), deviceName: deviceName, clientName: clientName, clientVersion: clientVersion, @@ -695,12 +699,11 @@ func (m *Model) Request(deviceID protocol.DeviceID, folder, name string, offset } reader = strings.NewReader(target) } else { - reader, err = os.Open(fn) // XXX: Inefficient, should cache fd? + reader, err = m.cache.Open(fn) if err != nil { return nil, err } - - defer reader.(*os.File).Close() + defer reader.(*lrufdcache.CachedFile).Close() } buf := make([]byte, size) diff --git a/internal/model/puller.go b/internal/model/puller.go index 3124940d2..0aad3da23 100644 --- a/internal/model/puller.go +++ b/internal/model/puller.go @@ -26,8 +26,6 @@ import ( "sync" "time" - "github.com/AudriusButkevicius/lfu-go" - "github.com/syncthing/syncthing/internal/config" "github.com/syncthing/syncthing/internal/events" "github.com/syncthing/syncthing/internal/osutil" @@ -603,19 +601,6 @@ nextFile: p.progressEmitter.Register(state.sharedPullerState) } - evictionChan := make(chan lfu.Eviction) - - fdCache := lfu.New() - fdCache.UpperBound = 50 - fdCache.LowerBound = 20 - fdCache.EvictionChannel = evictionChan - - go func() { - for item := range evictionChan { - item.Value.(*os.File).Close() - } - }() - folderRoots := make(map[string]string) p.model.fmut.RLock() for folder, cfg := range p.model.folderCfgs { @@ -629,18 +614,11 @@ nextFile: found := p.model.finder.Iterate(block.Hash, func(folder, file string, index uint32) bool { path := filepath.Join(folderRoots[folder], file) - var fd *os.File - - fdi := fdCache.Get(path) - if fdi != nil { - fd = fdi.(*os.File) - } else { - fd, err = os.Open(path) - if err != nil { - return false - } - fdCache.Set(path, fd) + fd, err := p.model.cache.Open(path) + if err != nil { + return false } + defer fd.Close() _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index)) if err != nil { @@ -689,8 +667,6 @@ nextFile: state.copyDone() } } - fdCache.Evict(fdCache.Len()) - close(evictionChan) out <- state.sharedPullerState } }