Add overall file checksum verification to the rsync protocol

This commit is contained in:
Kovid Goyal
2023-07-08 11:05:47 +05:30
parent e125f803b3
commit 94b5bedfa5
2 changed files with 26 additions and 6 deletions

View File

@@ -9,6 +9,7 @@
package rsync package rsync
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"hash" "hash"
@@ -41,11 +42,12 @@ type xxh3_128 struct {
func (self *xxh3_128) Sum(b []byte) []byte { func (self *xxh3_128) Sum(b []byte) []byte {
s := self.Sum128() s := self.Sum128()
pos := len(b) pos := len(b)
if len(b)+16 < cap(b) { limit := pos + 16
if limit > cap(b) {
var x [16]byte var x [16]byte
b = append(b, x[:]...) b = append(b, x[:]...)
} else { } else {
b = b[:len(b)+16] b = b[:limit]
} }
binary.BigEndian.PutUint64(b[pos:], s.Hi) binary.BigEndian.PutUint64(b[pos:], s.Hi)
binary.BigEndian.PutUint64(b[pos+8:], s.Lo) binary.BigEndian.PutUint64(b[pos+8:], s.Lo)
@@ -267,8 +269,11 @@ func (r *rsync) ApplyDelta(alignedTarget io.Writer, target io.ReadSeeker, op Ope
r.set_buffer_to_size(r.BlockSize) r.set_buffer_to_size(r.BlockSize)
buffer := r.buffer buffer := r.buffer
if r.checksummer == nil {
r.checksummer = r.checksummer_constructor()
}
writeBlock := func(op Operation) error { write_block := func(op Operation) error {
if _, err = target.Seek(int64(r.BlockSize*int(op.BlockIndex)), os.SEEK_SET); err != nil { if _, err = target.Seek(int64(r.BlockSize*int(op.BlockIndex)), os.SEEK_SET); err != nil {
return err return err
} }
@@ -279,6 +284,7 @@ func (r *rsync) ApplyDelta(alignedTarget io.Writer, target io.ReadSeeker, op Ope
} }
} }
block = buffer[:n] block = buffer[:n]
r.checksummer.Write(block)
_, err = alignedTarget.Write(block) _, err = alignedTarget.Write(block)
if err != nil { if err != nil {
return err return err
@@ -289,7 +295,7 @@ func (r *rsync) ApplyDelta(alignedTarget io.Writer, target io.ReadSeeker, op Ope
switch op.Type { switch op.Type {
case OpBlockRange: case OpBlockRange:
for i := op.BlockIndex; i <= op.BlockIndexEnd; i++ { for i := op.BlockIndex; i <= op.BlockIndexEnd; i++ {
err = writeBlock(Operation{ err = write_block(Operation{
Type: OpBlock, Type: OpBlock,
BlockIndex: i, BlockIndex: i,
}) })
@@ -301,7 +307,7 @@ func (r *rsync) ApplyDelta(alignedTarget io.Writer, target io.ReadSeeker, op Ope
} }
} }
case OpBlock: case OpBlock:
err = writeBlock(op) err = write_block(op)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
break break
@@ -309,10 +315,16 @@ func (r *rsync) ApplyDelta(alignedTarget io.Writer, target io.ReadSeeker, op Ope
return err return err
} }
case OpData: case OpData:
r.checksummer.Write(op.Data)
_, err = alignedTarget.Write(op.Data) _, err = alignedTarget.Write(op.Data)
if err != nil { if err != nil {
return err return err
} }
case OpHash:
expected := r.checksummer.Sum(nil)
if !bytes.Equal(expected, op.Data) {
return fmt.Errorf("Failed to verify overall file checksum. This usually happens if some data was corrupted in transit or one of the involved files was altered while the transfer was in progress.")
}
} }
return nil return nil
} }
@@ -402,6 +414,7 @@ type diff struct {
hash_lookup map[uint32][]BlockHash hash_lookup map[uint32][]BlockHash
source io.Reader source io.Reader
hasher hash.Hash64 hasher hash.Hash64
checksummer hash.Hash
window, data struct{ pos, sz int } window, data struct{ pos, sz int }
block_size int block_size int
@@ -510,13 +523,16 @@ func (self *diff) ensure_idx_valid(idx int) (ok bool, err error) {
extra := idx - len(self.buffer) + 1 extra := idx - len(self.buffer) + 1
var n int var n int
n, err = io.ReadAtLeast(self.source, self.buffer[len(self.buffer):cap(self.buffer)], extra) n, err = io.ReadAtLeast(self.source, self.buffer[len(self.buffer):cap(self.buffer)], extra)
block := self.buffer[len(self.buffer):][:n]
switch err { switch err {
case nil: case nil:
ok = true ok = true
self.buffer = self.buffer[:len(self.buffer)+n] self.buffer = self.buffer[:len(self.buffer)+n]
self.checksummer.Write(block)
case io.ErrUnexpectedEOF, io.EOF: case io.ErrUnexpectedEOF, io.EOF:
err = nil err = nil
self.buffer = self.buffer[:len(self.buffer)+n] self.buffer = self.buffer[:len(self.buffer)+n]
self.checksummer.Write(block)
} }
return return
} }
@@ -526,6 +542,7 @@ func (self *diff) finish_up() {
self.data.pos = self.window.pos self.data.pos = self.window.pos
self.data.sz = len(self.buffer) - self.window.pos self.data.sz = len(self.buffer) - self.window.pos
self.send_data() self.send_data()
self.enqueue(Operation{Type: OpHash, Data: self.checksummer.Sum(nil)})
self.finished = true self.finished = true
} }
@@ -573,6 +590,7 @@ func (r *rsync) CreateDiff(source io.Reader, signature []BlockHash) func() (*Ope
block_size: r.BlockSize, buffer: make([]byte, 0, (r.BlockSize * 8)), block_size: r.BlockSize, buffer: make([]byte, 0, (r.BlockSize * 8)),
hash_lookup: make(map[uint32][]BlockHash, len(signature)), hash_lookup: make(map[uint32][]BlockHash, len(signature)),
source: source, hasher: r.hasher_constructor(), source: source, hasher: r.hasher_constructor(),
checksummer: r.checksummer_constructor(),
} }
for _, h := range signature { for _, h := range signature {
key := h.WeakHash key := h.WeakHash

View File

@@ -67,7 +67,9 @@ func run_roundtrip_test(t *testing.T, src_data, changed []byte, num_of_patches,
total_data_in_delta = 0 total_data_in_delta = 0
outputbuf := bytes.Buffer{} outputbuf := bytes.Buffer{}
for _, op := range delta_ops { for _, op := range delta_ops {
total_data_in_delta += len(op.Data) if op.Type == OpData {
total_data_in_delta += len(op.Data)
}
p.rsync.ApplyDelta(&outputbuf, bytes.NewReader(changed), op) p.rsync.ApplyDelta(&outputbuf, bytes.NewReader(changed), op)
} }
return outputbuf.Bytes() return outputbuf.Bytes()