From d46f2676634f0d64f5866e5599d20112ec225413 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sat, 21 Nov 2015 16:30:53 +0100 Subject: [PATCH] Handle sparse files (fixes #245) --- lib/config/folderconfiguration.go | 1 + lib/model/rwfolder.go | 23 +++++++++++ lib/model/sharedpullerstate.go | 10 +++++ lib/protocol/message.go | 13 +++++- lib/rc/rc.go | 24 +++++++++++ test/sync_test.go | 66 +++++++++++++++++++++++++++++++ 6 files changed, 136 insertions(+), 1 deletion(-) diff --git a/lib/config/folderconfiguration.go b/lib/config/folderconfiguration.go index bcd6dcfca..848a99778 100644 --- a/lib/config/folderconfiguration.go +++ b/lib/config/folderconfiguration.go @@ -35,6 +35,7 @@ type FolderConfiguration struct { PullerSleepS int `xml:"pullerSleepS" json:"pullerSleepS"` PullerPauseS int `xml:"pullerPauseS" json:"pullerPauseS"` MaxConflicts int `xml:"maxConflicts" json:"maxConflicts"` + DisableSparseFiles bool `xml:"disableSparseFiles" json:"disableSparseFiles"` Invalid string `xml:"-" json:"invalid"` // Set at runtime when there is an error, not saved cachedPath string diff --git a/lib/model/rwfolder.go b/lib/model/rwfolder.go index 2791e66a4..86ee26148 100644 --- a/lib/model/rwfolder.go +++ b/lib/model/rwfolder.go @@ -91,6 +91,7 @@ type rwFolder struct { maxConflicts int sleep time.Duration pause time.Duration + allowSparse bool stop chan struct{} queue *jobQueue @@ -125,6 +126,7 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo shortID: shortID, order: cfg.Order, maxConflicts: cfg.MaxConflicts, + allowSparse: !cfg.DisableSparseFiles, stop: make(chan struct{}), queue: newJobQueue(), @@ -1027,6 +1029,7 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks ignorePerms: p.ignorePermissions(file), version: curFile.Version, mut: sync.NewMutex(), + sparse: p.allowSparse, } l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused) @@ -1113,6 +1116,18 @@ func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull p.model.fmut.RUnlock() for _, block := range state.blocks { + if p.allowSparse && state.reused == 0 && block.IsEmpty() { + // The block is a block of all zeroes, and we are not reusing + // a temp file, so there is no need to do anything with it. + // If we were reusing a temp file and had this block to copy, + // it would be because the block in the temp file was *not* a + // block of all zeroes, so then we should not skip it. + + // Pretend we copied it. + state.copiedFromOrigin() + continue + } + buf = buf[:int(block.Size)] found := p.model.finder.Iterate(folders, block.Hash, func(folder, file string, index int32) bool { fd, err := os.Open(filepath.Join(folderRoots[folder], file)) @@ -1185,6 +1200,14 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul continue } + if p.allowSparse && state.reused == 0 && state.block.IsEmpty() { + // There is no need to request a block of all zeroes. Pretend we + // requested it and handled it correctly. + state.pullDone() + out <- state.sharedPullerState + continue + } + var lastError error potentialDevices := p.model.Availability(p.folder, state.file.Name) for { diff --git a/lib/model/sharedpullerstate.go b/lib/model/sharedpullerstate.go index f8ac78adb..99fadcf55 100644 --- a/lib/model/sharedpullerstate.go +++ b/lib/model/sharedpullerstate.go @@ -27,6 +27,7 @@ type sharedPullerState struct { reused int // Number of blocks reused from temporary file ignorePerms bool version protocol.Vector // The current (old) version + sparse bool // Mutable, must be locked for access err error // The first error we hit @@ -138,6 +139,15 @@ func (s *sharedPullerState) tempFile() (io.WriterAt, error) { return nil, err } + if s.sparse { + // Truncate sets the size of the file. This creates a sparse file or a + // space reservation, depending on the underlying filesystem. + if err := fd.Truncate(s.file.Size()); err != nil { + s.failLocked("dst truncate", err) + return nil, err + } + } + // Same fd will be used by all writers s.fd = fd diff --git a/lib/protocol/message.go b/lib/protocol/message.go index f3791e3f5..f6ef33f6d 100644 --- a/lib/protocol/message.go +++ b/lib/protocol/message.go @@ -5,7 +5,13 @@ package protocol -import "fmt" +import ( + "bytes" + "crypto/sha256" + "fmt" +) + +var sha256OfEmptyBlock = sha256.Sum256(make([]byte, BlockSize)) type IndexMessage struct { Folder string // max:256 @@ -98,6 +104,11 @@ func (b BlockInfo) String() string { return fmt.Sprintf("Block{%d/%d/%x}", b.Offset, b.Size, b.Hash) } +// IsEmpty returns true if the block is a full block of zeroes. +func (b BlockInfo) IsEmpty() bool { + return b.Size == BlockSize && bytes.Equal(b.Hash, sha256OfEmptyBlock[:]) +} + type RequestMessage struct { Folder string // max:256 Name string // max:8192 diff --git a/lib/rc/rc.go b/lib/rc/rc.go index 7e0fd7fc4..2af1df2b9 100644 --- a/lib/rc/rc.go +++ b/lib/rc/rc.go @@ -550,3 +550,27 @@ func (p *Process) eventLoop() { } } } + +type ConnectionStats struct { + Address string + Type string + Connected bool + Paused bool + ClientVersion string + InBytesTotal int64 + OutBytesTotal int64 +} + +func (p *Process) Connections() (map[string]ConnectionStats, error) { + bs, err := p.Get("/rest/system/connections") + if err != nil { + return nil, err + } + + var res map[string]ConnectionStats + if err := json.Unmarshal(bs, &res); err != nil { + return nil, err + } + + return res, nil +} diff --git a/test/sync_test.go b/test/sync_test.go index 7b02ccf1f..e67fb262a 100644 --- a/test/sync_test.go +++ b/test/sync_test.go @@ -466,3 +466,69 @@ func scSyncAndCompare(p []*rc.Process, expected [][]fileInfo) error { return nil } + +func TestSyncSparseFile(t *testing.T) { + // This test verifies that when syncing a file that consists mostly of + // zeroes, those blocks are not transferred. It doesn't verify whether + // the resulting file is actually *sparse* or not.alterFiles + + log.Println("Cleaning...") + err := removeAll("s1", "s12-1", + "s2", "s12-2", "s23-2", + "s3", "s23-3", + "h1/index*", "h2/index*", "h3/index*") + if err != nil { + t.Fatal(err) + } + + log.Println("Generating files...") + + if err := os.Mkdir("s1", 0755); err != nil { + t.Fatal(err) + } + + fd, err := os.Create("s1/testfile") + if err != nil { + t.Fatal(err) + } + if _, err := fd.Write([]byte("Start")); err != nil { + t.Fatal(err) + } + kib := make([]byte, 1024) + for i := 0; i < 8192; i++ { + if _, err := fd.Write(kib); err != nil { + t.Fatal(err) + } + } + if _, err := fd.Write([]byte("End")); err != nil { + t.Fatal(err) + } + fd.Close() + + // Start the syncers + + log.Println("Syncing...") + + p0 := startInstance(t, 1) + defer checkedStop(t, p0) + p1 := startInstance(t, 2) + defer checkedStop(t, p1) + + rc.AwaitSync("default", p0, p1) + + log.Println("Comparing...") + + if err := compareDirectories("s1", "s2"); err != nil { + t.Fatal(err) + } + + conns, err := p0.Connections() + if err != nil { + t.Fatal(err) + } + + tot := conns["total"] + if tot.OutBytesTotal > 256<<10 { + t.Fatal("Sending side has sent", tot.OutBytesTotal, "bytes, which is too much") + } +}