syncthing/model/model.go

850 lines
19 KiB
Go
Raw Normal View History

package model
2013-12-15 11:43:31 +01:00
/*
Locking
=======
The model has read and write locks. These must be acquired as appropriate by
public methods. To prevent deadlock situations, private methods should never
acquire locks, but document what locks they require.
*/
import (
"crypto/sha1"
"errors"
2013-12-23 18:12:44 +01:00
"fmt"
"io"
"log"
2014-01-05 23:54:57 +01:00
"net"
2013-12-15 11:43:31 +01:00
"os"
"path"
"sync"
"time"
"github.com/calmh/syncthing/buffers"
"github.com/calmh/syncthing/protocol"
)
type Model struct {
2013-12-30 15:30:29 +01:00
dir string
2014-01-09 13:58:35 +01:00
global map[string]File // the latest version of each file as it exists in the cluster
2014-01-18 04:06:44 +01:00
gmut sync.RWMutex // protects global
2014-01-09 13:58:35 +01:00
local map[string]File // the files we currently have locally on disk
2014-01-18 04:06:44 +01:00
lmut sync.RWMutex // protects local
2014-01-09 13:58:35 +01:00
remote map[string]map[string]File
2014-01-18 04:06:44 +01:00
rmut sync.RWMutex // protects remote
2014-01-09 13:58:35 +01:00
protoConn map[string]Connection
rawConn map[string]io.Closer
2014-01-18 04:06:44 +01:00
pmut sync.RWMutex // protects protoConn and rawConn
2013-12-30 15:30:29 +01:00
2014-01-18 04:06:44 +01:00
fq FileQueue // queue for files to fetch
dq chan File // queue for files to delete
2013-12-24 21:21:03 +01:00
2014-01-18 04:06:44 +01:00
updatedLocal int64 // timestamp of last update to local
updateGlobal int64 // timestamp of last update to remote
2013-12-24 21:21:03 +01:00
lastIdxBcast time.Time
lastIdxBcastRequest time.Time
2014-01-18 04:06:44 +01:00
umut sync.RWMutex // provides updated* and lastIdx*
rwRunning bool
delete bool
2014-01-18 04:06:44 +01:00
initmut sync.Mutex // protects rwRunning and delete
trace map[string]bool
fileLastChanged map[string]time.Time
fileWasSuppressed map[string]int
2014-01-18 04:06:44 +01:00
fmut sync.Mutex // protects fileLastChanged and fileWasSuppressed
2014-01-12 16:59:35 +01:00
parallellRequests int
limitRequestRate chan struct{}
2014-01-18 04:06:44 +01:00
imut sync.Mutex // protects Index
2013-12-15 11:43:31 +01:00
}
2014-01-09 13:58:35 +01:00
type Connection interface {
ID() string
Index([]protocol.FileInfo)
Request(name string, offset int64, size uint32, hash []byte) ([]byte, error)
2014-01-09 13:58:35 +01:00
Statistics() protocol.Statistics
}
2013-12-15 11:43:31 +01:00
const (
2013-12-28 14:10:36 +01:00
idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
minFileHoldTimeS = 60 // Never allow file changes more often than this
maxFileHoldTimeS = 600 // Always allow file changes at least this often
2013-12-15 11:43:31 +01:00
)
var (
ErrNoSuchFile = errors.New("no such file")
ErrInvalid = errors.New("file is invalid")
)
// NewModel creates and starts a new model. The model starts in read-only mode,
// where it sends index information to connected peers and responds to requests
// for file data without altering the local repository in any way.
2013-12-15 11:43:31 +01:00
func NewModel(dir string) *Model {
m := &Model{
dir: dir,
global: make(map[string]File),
local: make(map[string]File),
remote: make(map[string]map[string]File),
2014-01-09 13:58:35 +01:00
protoConn: make(map[string]Connection),
rawConn: make(map[string]io.Closer),
lastIdxBcast: time.Now(),
trace: make(map[string]bool),
fileLastChanged: make(map[string]time.Time),
fileWasSuppressed: make(map[string]int),
dq: make(chan File),
2013-12-15 11:43:31 +01:00
}
2013-12-24 21:21:03 +01:00
go m.broadcastIndexLoop()
2013-12-15 11:43:31 +01:00
return m
}
2014-01-12 16:59:35 +01:00
func (m *Model) LimitRate(kbps int) {
m.limitRequestRate = make(chan struct{}, kbps)
n := kbps/10 + 1
go func() {
for {
time.Sleep(100 * time.Millisecond)
for i := 0; i < n; i++ {
select {
case m.limitRequestRate <- struct{}{}:
}
}
}
}()
}
// Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
func (m *Model) Trace(t string) {
m.trace[t] = true
}
// StartRW starts read/write processing on the current model. When in
// read/write mode the model will attempt to keep in sync with the cluster by
// pulling needed files from peer nodes.
func (m *Model) StartRW(del bool, threads int) {
2014-01-18 04:06:44 +01:00
m.initmut.Lock()
defer m.initmut.Unlock()
if m.rwRunning {
panic("starting started model")
}
m.rwRunning = true
m.delete = del
m.parallellRequests = threads
go m.cleanTempFiles()
if del {
2014-01-15 01:47:27 +01:00
go m.deleteLoop()
}
2013-12-15 11:43:31 +01:00
}
// Generation returns an opaque integer that is guaranteed to increment on
// every change to the local repository or global model.
2014-01-05 16:16:37 +01:00
func (m *Model) Generation() int64 {
2014-01-18 04:06:44 +01:00
m.umut.RLock()
defer m.umut.RUnlock()
2014-01-05 16:16:37 +01:00
return m.updatedLocal + m.updateGlobal
2013-12-23 18:12:44 +01:00
}
2014-01-05 23:54:57 +01:00
type ConnectionInfo struct {
protocol.Statistics
Address string
}
// ConnectionStats returns a map with connection statistics for each connected node.
2014-01-05 23:54:57 +01:00
func (m *Model) ConnectionStats() map[string]ConnectionInfo {
type remoteAddrer interface {
RemoteAddr() net.Addr
}
2014-01-18 04:06:44 +01:00
m.pmut.RLock()
2014-01-05 16:16:37 +01:00
2014-01-05 23:54:57 +01:00
var res = make(map[string]ConnectionInfo)
2014-01-09 13:58:35 +01:00
for node, conn := range m.protoConn {
2014-01-05 23:54:57 +01:00
ci := ConnectionInfo{
Statistics: conn.Statistics(),
}
if nc, ok := m.rawConn[node].(remoteAddrer); ok {
ci.Address = nc.RemoteAddr().String()
}
res[node] = ci
2013-12-30 15:30:29 +01:00
}
2014-01-18 04:06:44 +01:00
m.pmut.RUnlock()
2014-01-05 16:16:37 +01:00
return res
2013-12-30 15:30:29 +01:00
}
// LocalSize returns the number of files, deleted files and total bytes for all
// files in the global model.
2014-01-05 23:54:57 +01:00
func (m *Model) GlobalSize() (files, deleted, bytes int) {
2014-01-18 04:06:44 +01:00
m.gmut.RLock()
2014-01-05 16:16:37 +01:00
2013-12-30 15:30:29 +01:00
for _, f := range m.global {
if f.Flags&protocol.FlagDeleted == 0 {
2014-01-05 23:54:57 +01:00
files++
bytes += f.Size()
} else {
deleted++
}
2013-12-30 15:30:29 +01:00
}
2014-01-18 04:06:44 +01:00
m.gmut.RUnlock()
2014-01-05 16:16:37 +01:00
return
}
2013-12-30 15:30:29 +01:00
// LocalSize returns the number of files, deleted files and total bytes for all
// files in the local repository.
2014-01-05 23:54:57 +01:00
func (m *Model) LocalSize() (files, deleted, bytes int) {
2014-01-18 04:06:44 +01:00
m.lmut.RLock()
2013-12-30 15:30:29 +01:00
2014-01-05 16:16:37 +01:00
for _, f := range m.local {
if f.Flags&protocol.FlagDeleted == 0 {
2014-01-05 23:54:57 +01:00
files++
bytes += f.Size()
} else {
deleted++
}
2013-12-30 15:30:29 +01:00
}
2014-01-18 04:06:44 +01:00
m.lmut.RUnlock()
2014-01-05 16:16:37 +01:00
return
2013-12-30 15:30:29 +01:00
}
// InSyncSize returns the number and total byte size of the local files that
// are in sync with the global model.
2014-01-06 06:38:01 +01:00
func (m *Model) InSyncSize() (files, bytes int) {
2014-01-18 04:06:44 +01:00
m.gmut.RLock()
m.lmut.RLock()
2014-01-06 06:38:01 +01:00
for n, f := range m.local {
if gf, ok := m.global[n]; ok && f.Equals(gf) {
2014-01-06 06:38:01 +01:00
files++
bytes += f.Size()
}
}
2014-01-18 04:06:44 +01:00
m.lmut.RUnlock()
m.gmut.RUnlock()
2014-01-06 06:38:01 +01:00
return
}
// NeedFiles returns the list of currently needed files and the total size.
func (m *Model) NeedFiles() (files []File, bytes int) {
2014-01-18 04:06:44 +01:00
m.gmut.RLock()
2014-01-05 16:16:37 +01:00
for _, n := range m.fq.QueuedFiles() {
2014-01-05 23:54:57 +01:00
f := m.global[n]
files = append(files, f)
bytes += f.Size()
2013-12-23 18:12:44 +01:00
}
2014-01-18 04:06:44 +01:00
m.gmut.RUnlock()
2014-01-05 16:16:37 +01:00
return
2013-12-23 18:12:44 +01:00
}
2013-12-30 15:30:29 +01:00
// Index is called when a new node is connected and we receive their full index.
// Implements the protocol.Model interface.
2013-12-15 11:43:31 +01:00
func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
2014-01-18 04:06:44 +01:00
m.imut.Lock()
defer m.imut.Unlock()
2013-12-15 11:43:31 +01:00
if m.trace["net"] {
log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs))
2013-12-15 11:43:31 +01:00
}
repo := make(map[string]File)
2013-12-15 11:43:31 +01:00
for _, f := range fs {
m.indexUpdate(repo, f)
2013-12-28 14:10:36 +01:00
}
2014-01-18 04:06:44 +01:00
m.rmut.Lock()
m.remote[nodeID] = repo
2014-01-18 04:06:44 +01:00
m.rmut.Unlock()
2013-12-28 14:10:36 +01:00
m.recomputeGlobal()
m.recomputeNeed()
}
2013-12-30 15:30:29 +01:00
// IndexUpdate is called for incremental updates to connected nodes' indexes.
// Implements the protocol.Model interface.
2013-12-28 14:10:36 +01:00
func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
2014-01-18 04:06:44 +01:00
m.imut.Lock()
defer m.imut.Unlock()
2013-12-28 14:10:36 +01:00
if m.trace["net"] {
log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
2013-12-28 14:10:36 +01:00
}
2014-01-18 04:06:44 +01:00
m.rmut.Lock()
2013-12-28 14:10:36 +01:00
repo, ok := m.remote[nodeID]
if !ok {
log.Printf("WARNING: Index update from node %s that does not have an index", nodeID)
2014-01-18 04:06:44 +01:00
m.rmut.Unlock()
2013-12-28 14:10:36 +01:00
return
}
for _, f := range fs {
m.indexUpdate(repo, f)
2013-12-15 11:43:31 +01:00
}
2014-01-18 04:06:44 +01:00
m.rmut.Unlock()
2013-12-15 11:43:31 +01:00
m.recomputeGlobal()
m.recomputeNeed()
}
func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) {
if m.trace["idx"] {
var flagComment string
if f.Flags&protocol.FlagDeleted != 0 {
flagComment = " (deleted)"
}
log.Printf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
}
if extraFlags := f.Flags &^ (protocol.FlagInvalid | protocol.FlagDeleted | 0xfff); extraFlags != 0 {
log.Printf("WARNING: IDX(in): Unknown flags 0x%x in index record %+v", extraFlags, f)
return
}
repo[f.Name] = fileFromFileInfo(f)
}
// Close removes the peer from the model and closes the underlyign connection if possible.
// Implements the protocol.Model interface.
func (m *Model) Close(node string, err error) {
2014-01-18 04:06:44 +01:00
m.pmut.Lock()
m.rmut.Lock()
2013-12-15 11:43:31 +01:00
conn, ok := m.rawConn[node]
if ok {
conn.Close()
}
2013-12-15 11:43:31 +01:00
delete(m.remote, node)
2014-01-09 13:58:35 +01:00
delete(m.protoConn, node)
delete(m.rawConn, node)
m.fq.RemoveAvailable(node)
2013-12-15 11:43:31 +01:00
2014-01-18 04:06:44 +01:00
m.rmut.Unlock()
m.pmut.Unlock()
2013-12-15 11:43:31 +01:00
m.recomputeGlobal()
m.recomputeNeed()
}
// Request returns the specified data segment by reading it from local disk.
// Implements the protocol.Model interface.
func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
// Verify that the requested file exists in the local and global model.
2014-01-18 04:06:44 +01:00
m.lmut.RLock()
lf, localOk := m.local[name]
2014-01-18 04:06:44 +01:00
m.lmut.RUnlock()
m.gmut.RLock()
_, globalOk := m.global[name]
2014-01-18 04:06:44 +01:00
m.gmut.RUnlock()
if !localOk || !globalOk {
log.Printf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
return nil, ErrNoSuchFile
}
if lf.Flags&protocol.FlagInvalid != 0 {
return nil, ErrInvalid
}
if m.trace["net"] && nodeID != "<local>" {
log.Printf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
2013-12-15 11:43:31 +01:00
}
fn := path.Join(m.dir, name)
fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
if err != nil {
return nil, err
}
defer fd.Close()
buf := buffers.Get(int(size))
_, err = fd.ReadAt(buf, offset)
2013-12-15 11:43:31 +01:00
if err != nil {
return nil, err
}
2014-01-12 16:59:35 +01:00
if m.limitRequestRate != nil {
for s := 0; s < len(buf); s += 1024 {
<-m.limitRequestRate
}
}
2013-12-15 11:43:31 +01:00
return buf, nil
}
// ReplaceLocal replaces the local repository index with the given list of files.
// Change suppression is applied to files changing too often.
2013-12-15 11:43:31 +01:00
func (m *Model) ReplaceLocal(fs []File) {
var updated bool
var newLocal = make(map[string]File)
2014-01-18 04:06:44 +01:00
m.lmut.RLock()
2013-12-15 11:43:31 +01:00
for _, f := range fs {
newLocal[f.Name] = f
if ef := m.local[f.Name]; !ef.Equals(f) {
2013-12-15 11:43:31 +01:00
updated = true
}
}
2014-01-18 04:06:44 +01:00
m.lmut.RUnlock()
2013-12-15 11:43:31 +01:00
if m.markDeletedLocals(newLocal) {
updated = true
}
2014-01-18 04:06:44 +01:00
m.lmut.RLock()
2013-12-15 11:43:31 +01:00
if len(newLocal) != len(m.local) {
updated = true
}
2014-01-18 04:06:44 +01:00
m.lmut.RUnlock()
2013-12-15 11:43:31 +01:00
if updated {
2014-01-18 04:06:44 +01:00
m.lmut.Lock()
2013-12-15 11:43:31 +01:00
m.local = newLocal
2014-01-18 04:06:44 +01:00
m.lmut.Unlock()
2013-12-15 11:43:31 +01:00
m.recomputeGlobal()
m.recomputeNeed()
2014-01-18 04:06:44 +01:00
m.umut.Lock()
2013-12-30 15:30:29 +01:00
m.updatedLocal = time.Now().Unix()
2013-12-25 02:31:25 +01:00
m.lastIdxBcastRequest = time.Now()
2014-01-18 04:06:44 +01:00
m.umut.Unlock()
2013-12-15 11:43:31 +01:00
}
}
// SeedLocal replaces the local repository index with the given list of files,
// in protocol data types. Does not track deletes, should only be used to seed
// the local index from a cache file at startup.
func (m *Model) SeedLocal(fs []protocol.FileInfo) {
2014-01-18 04:06:44 +01:00
m.lmut.Lock()
m.local = make(map[string]File)
for _, f := range fs {
m.local[f.Name] = fileFromFileInfo(f)
}
2014-01-18 04:06:44 +01:00
m.lmut.Unlock()
m.recomputeGlobal()
m.recomputeNeed()
}
// ConnectedTo returns true if we are connected to the named node.
func (m *Model) ConnectedTo(nodeID string) bool {
2014-01-18 04:06:44 +01:00
m.pmut.RLock()
2014-01-09 13:58:35 +01:00
_, ok := m.protoConn[nodeID]
2014-01-18 04:06:44 +01:00
m.pmut.RUnlock()
return ok
}
// RepoID returns a unique ID representing the current repository location.
func (m *Model) RepoID() string {
return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir)))
}
// AddConnection adds a new peer connection to the model. An initial index will
// be sent to the connected peer, thereafter index updates whenever the local
// repository changes.
2014-01-09 13:58:35 +01:00
func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
nodeID := protoConn.ID()
2014-01-18 04:06:44 +01:00
m.pmut.Lock()
2014-01-09 13:58:35 +01:00
m.protoConn[nodeID] = protoConn
m.rawConn[nodeID] = rawConn
2014-01-18 04:06:44 +01:00
m.pmut.Unlock()
go func() {
2014-01-18 04:06:44 +01:00
idx := m.ProtocolIndex()
2014-01-09 13:58:35 +01:00
protoConn.Index(idx)
}()
if m.rwRunning {
for i := 0; i < m.parallellRequests; i++ {
i := i
go func() {
if m.trace["pull"] {
log.Println("PULL: Starting", nodeID, i)
}
for {
2014-01-18 04:06:44 +01:00
m.pmut.RLock()
if _, ok := m.protoConn[nodeID]; !ok {
if m.trace["pull"] {
log.Println("PULL: Exiting", nodeID, i)
}
2014-01-18 04:06:44 +01:00
m.pmut.RUnlock()
return
}
2014-01-18 04:06:44 +01:00
m.pmut.RUnlock()
qb, ok := m.fq.Get(nodeID)
if ok {
if m.trace["pull"] {
log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset)
}
data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash)
m.fq.Done(qb.name, qb.block.Offset, data)
} else {
time.Sleep(1 * time.Second)
}
}
}()
}
}
}
func (m *Model) shouldSuppressChange(name string) bool {
2014-01-18 04:06:44 +01:00
m.fmut.Lock()
sup := shouldSuppressChange(m.fileLastChanged[name], m.fileWasSuppressed[name])
if sup {
m.fileWasSuppressed[name]++
} else {
m.fileWasSuppressed[name] = 0
m.fileLastChanged[name] = time.Now()
}
2014-01-18 04:06:44 +01:00
m.fmut.Unlock()
return sup
}
func shouldSuppressChange(lastChange time.Time, numChanges int) bool {
sinceLast := time.Since(lastChange)
if sinceLast > maxFileHoldTimeS*time.Second {
return false
}
if sinceLast < time.Duration((numChanges+2)*minFileHoldTimeS)*time.Second {
return true
}
return false
}
2014-01-18 04:06:44 +01:00
// ProtocolIndex returns the current local index in protocol data types.
// Must be called with the read lock held.
2014-01-18 04:06:44 +01:00
func (m *Model) ProtocolIndex() []protocol.FileInfo {
var index []protocol.FileInfo
2014-01-18 04:06:44 +01:00
m.lmut.RLock()
for _, f := range m.local {
mf := fileInfoFromFile(f)
if m.trace["idx"] {
var flagComment string
if mf.Flags&protocol.FlagDeleted != 0 {
flagComment = " (deleted)"
}
log.Printf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
}
index = append(index, mf)
}
2014-01-18 04:06:44 +01:00
m.lmut.RUnlock()
return index
}
func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
2014-01-18 04:06:44 +01:00
m.pmut.RLock()
2014-01-09 13:58:35 +01:00
nc, ok := m.protoConn[nodeID]
2014-01-18 04:06:44 +01:00
m.pmut.RUnlock()
if !ok {
return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
}
if m.trace["net"] {
log.Printf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
}
return nc.Request(name, offset, size, hash)
}
2013-12-24 21:21:03 +01:00
func (m *Model) broadcastIndexLoop() {
for {
2014-01-18 04:06:44 +01:00
m.umut.RLock()
2013-12-24 21:21:03 +01:00
bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
2014-01-18 04:06:44 +01:00
m.umut.RUnlock()
2013-12-28 14:10:36 +01:00
2013-12-24 21:21:03 +01:00
maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
2014-01-18 04:06:44 +01:00
idx := m.ProtocolIndex()
2013-12-28 14:10:36 +01:00
var indexWg sync.WaitGroup
2014-01-09 13:58:35 +01:00
indexWg.Add(len(m.protoConn))
2014-01-18 04:06:44 +01:00
m.umut.Lock()
2013-12-28 14:10:36 +01:00
m.lastIdxBcast = time.Now()
2014-01-18 04:06:44 +01:00
m.umut.Unlock()
m.pmut.RLock()
2014-01-09 13:58:35 +01:00
for _, node := range m.protoConn {
2013-12-24 21:21:03 +01:00
node := node
if m.trace["net"] {
log.Printf("NET IDX(out/loop): %s: %d files", node.ID(), len(idx))
2013-12-24 21:21:03 +01:00
}
2013-12-28 14:10:36 +01:00
go func() {
node.Index(idx)
indexWg.Done()
}()
2013-12-24 21:21:03 +01:00
}
2014-01-18 04:06:44 +01:00
m.pmut.RUnlock()
2013-12-28 14:10:36 +01:00
indexWg.Wait()
2013-12-15 11:43:31 +01:00
}
2013-12-28 14:10:36 +01:00
time.Sleep(idxBcastHoldtime)
2013-12-15 11:43:31 +01:00
}
}
// markDeletedLocals sets the deleted flag on files that have gone missing locally.
// Must be called with the write lock held.
func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
// For every file in the existing local table, check if they are also
// present in the new local table. If they are not, check that we already
// had the newest version available according to the global table and if so
// note the file as having been deleted.
var updated bool
2014-01-18 04:06:44 +01:00
m.gmut.RLock()
m.lmut.RLock()
2013-12-15 11:43:31 +01:00
for n, f := range m.local {
if _, ok := newLocal[n]; !ok {
if gf := m.global[n]; !gf.NewerThan(f) {
if f.Flags&protocol.FlagDeleted == 0 {
f.Flags = protocol.FlagDeleted
f.Version++
2013-12-15 11:43:31 +01:00
f.Blocks = nil
updated = true
}
newLocal[n] = f
}
}
}
2014-01-18 04:06:44 +01:00
m.lmut.RUnlock()
m.gmut.RUnlock()
return updated
}
func (m *Model) updateLocal(f File) {
2014-01-18 04:06:44 +01:00
var updated bool
m.lmut.Lock()
if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) {
2013-12-15 11:43:31 +01:00
m.local[f.Name] = f
2014-01-18 04:06:44 +01:00
updated = true
}
m.lmut.Unlock()
if updated {
2013-12-15 11:43:31 +01:00
m.recomputeGlobal()
2014-01-18 04:06:44 +01:00
// We don't recomputeNeed here for two reasons:
// - a need shouldn't have arisen due to having a newer local file
// - recomputeNeed might call into fq.Add but we might have been called by
// fq which would be a deadlock on fq
m.umut.Lock()
2013-12-30 15:30:29 +01:00
m.updatedLocal = time.Now().Unix()
2013-12-25 02:31:25 +01:00
m.lastIdxBcastRequest = time.Now()
2014-01-18 04:06:44 +01:00
m.umut.Unlock()
2013-12-15 11:43:31 +01:00
}
}
// Must be called with the write lock held.
func (m *Model) recomputeGlobal() {
var newGlobal = make(map[string]File)
2014-01-18 04:06:44 +01:00
m.lmut.RLock()
2013-12-15 11:43:31 +01:00
for n, f := range m.local {
newGlobal[n] = f
}
2014-01-18 04:06:44 +01:00
m.lmut.RUnlock()
2013-12-15 11:43:31 +01:00
2014-01-18 04:06:44 +01:00
m.rmut.RLock()
2014-01-09 13:58:35 +01:00
var highestMod int64
2014-01-13 18:29:23 +01:00
for nodeID, fs := range m.remote {
for n, nf := range fs {
if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
newGlobal[n] = nf
2014-01-13 18:29:23 +01:00
m.fq.SetAvailable(n, nodeID)
2014-01-09 13:58:35 +01:00
if nf.Modified > highestMod {
highestMod = nf.Modified
}
2014-01-13 18:29:23 +01:00
} else if lf.Equals(nf) {
m.fq.AddAvailable(n, nodeID)
2013-12-15 11:43:31 +01:00
}
}
}
2014-01-18 04:06:44 +01:00
m.rmut.RUnlock()
2013-12-15 11:43:31 +01:00
2013-12-30 15:30:29 +01:00
// Figure out if anything actually changed
2014-01-18 04:06:44 +01:00
m.gmut.RLock()
2013-12-30 15:30:29 +01:00
var updated bool
2014-01-09 13:58:35 +01:00
if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) {
2013-12-30 15:30:29 +01:00
updated = true
} else {
for n, f0 := range newGlobal {
if f1, ok := m.global[n]; !ok || !f0.Equals(f1) {
2013-12-30 15:30:29 +01:00
updated = true
break
}
}
}
2014-01-18 04:06:44 +01:00
m.gmut.RUnlock()
2013-12-30 15:30:29 +01:00
if updated {
2014-01-18 04:06:44 +01:00
m.gmut.Lock()
m.umut.Lock()
2013-12-30 15:30:29 +01:00
m.global = newGlobal
2014-01-18 04:06:44 +01:00
m.updateGlobal = time.Now().Unix()
m.umut.Unlock()
m.gmut.Unlock()
2013-12-30 15:30:29 +01:00
}
2013-12-15 11:43:31 +01:00
}
func (m *Model) recomputeNeed() {
2014-01-18 04:06:44 +01:00
type addOrder struct {
n string
remote []Block
fm *fileMonitor
}
2014-01-15 01:47:27 +01:00
var toDelete []File
2014-01-18 04:06:44 +01:00
var toAdd []addOrder
m.gmut.RLock()
2014-01-15 01:47:27 +01:00
for n, gf := range m.global {
if m.fq.Queued(n) {
continue
}
2014-01-18 04:06:44 +01:00
m.lmut.RLock()
lf, ok := m.local[n]
2014-01-18 04:06:44 +01:00
m.lmut.RUnlock()
if !ok || gf.NewerThan(lf) {
if gf.Flags&protocol.FlagInvalid != 0 {
// Never attempt to sync invalid files
continue
}
if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
2014-01-06 06:12:33 +01:00
// Don't want to delete files, so forget this need
continue
}
if gf.Flags&protocol.FlagDeleted != 0 && !ok {
2014-01-06 06:12:33 +01:00
// Don't have the file, so don't need to delete it
continue
}
if m.trace["need"] {
log.Printf("NEED: lf:%v gf:%v", lf, gf)
}
if gf.Flags&protocol.FlagDeleted != 0 {
2014-01-15 01:47:27 +01:00
toDelete = append(toDelete, gf)
} else {
local, remote := BlockDiff(lf.Blocks, gf.Blocks)
fm := fileMonitor{
name: n,
path: path.Clean(path.Join(m.dir, n)),
global: gf,
model: m,
localBlocks: local,
}
2014-01-18 04:06:44 +01:00
toAdd = append(toAdd, addOrder{n, remote, &fm})
2014-01-06 06:12:40 +01:00
}
2013-12-15 11:43:31 +01:00
}
}
2014-01-15 01:47:27 +01:00
2014-01-18 04:06:44 +01:00
m.gmut.RUnlock()
2013-12-15 11:43:31 +01:00
2014-01-18 04:06:44 +01:00
for _, ao := range toAdd {
m.fq.Add(ao.n, ao.remote, ao.fm)
}
for _, gf := range toDelete {
m.dq <- gf
}
2014-01-13 18:29:23 +01:00
}
2014-01-18 04:06:44 +01:00
func (m *Model) WhoHas(name string) []string {
2013-12-15 11:43:31 +01:00
var remote []string
2014-01-18 04:06:44 +01:00
m.gmut.RLock()
m.rmut.RLock()
2013-12-15 11:43:31 +01:00
gf := m.global[name]
for node, files := range m.remote {
if file, ok := files[name]; ok && file.Equals(gf) {
2013-12-15 11:43:31 +01:00
remote = append(remote, node)
}
}
2014-01-18 04:06:44 +01:00
m.rmut.RUnlock()
m.gmut.RUnlock()
2013-12-15 11:43:31 +01:00
return remote
}
2014-01-15 01:47:27 +01:00
func (m *Model) deleteLoop() {
for file := range m.dq {
if m.trace["file"] {
log.Println("FILE: Delete", file.Name)
}
path := path.Clean(path.Join(m.dir, file.Name))
err := os.Remove(path)
if err != nil {
log.Printf("WARNING: %s: %v", file.Name, err)
}
2014-01-18 04:06:44 +01:00
m.updateLocal(file)
}
}
2013-12-30 02:33:57 +01:00
func fileFromFileInfo(f protocol.FileInfo) File {
2014-01-09 13:58:35 +01:00
var blocks = make([]Block, len(f.Blocks))
var offset int64
2014-01-09 13:58:35 +01:00
for i, b := range f.Blocks {
blocks[i] = Block{
2013-12-28 14:10:36 +01:00
Offset: offset,
Size: b.Size,
2013-12-28 14:10:36 +01:00
Hash: b.Hash,
2014-01-09 13:58:35 +01:00
}
offset += int64(b.Size)
2013-12-28 14:10:36 +01:00
}
2013-12-30 01:49:40 +01:00
return File{
Name: f.Name,
Flags: f.Flags,
Modified: f.Modified,
Version: f.Version,
2013-12-30 01:49:40 +01:00
Blocks: blocks,
}
2013-12-28 14:10:36 +01:00
}
2013-12-30 02:33:57 +01:00
func fileInfoFromFile(f File) protocol.FileInfo {
2014-01-09 13:58:35 +01:00
var blocks = make([]protocol.BlockInfo, len(f.Blocks))
for i, b := range f.Blocks {
blocks[i] = protocol.BlockInfo{
Size: b.Size,
Hash: b.Hash,
2014-01-09 13:58:35 +01:00
}
2013-12-30 02:33:57 +01:00
}
return protocol.FileInfo{
Name: f.Name,
Flags: f.Flags,
Modified: f.Modified,
Version: f.Version,
2013-12-30 02:33:57 +01:00
Blocks: blocks,
}
}