Change delta creation API to use io.Writer

Simplifies logic and avoids an extra copy if the writer doesnt need to
make its own copy
This commit is contained in:
Kovid Goyal
2023-07-10 22:00:28 +05:30
parent 1adaafbbb9
commit 4b5216adcf
3 changed files with 134 additions and 144 deletions

View File

@@ -17,6 +17,7 @@ import (
"os"
"github.com/zeebo/xxh3"
"golang.org/x/exp/slices"
)
// If no BlockSize is specified in the rsync instance, this value is used.
@@ -72,8 +73,6 @@ type Operation struct {
BlockIndex uint64
BlockIndexEnd uint64
Data []byte
serialized_repr []byte
}
var bin = binary.LittleEndian
@@ -93,9 +92,6 @@ func (self Operation) SerializeSize() int {
}
func (self Operation) Serialize(ans []byte) {
if self.serialized_repr != nil {
copy(ans, self.serialized_repr)
}
switch self.Type {
case OpBlock:
bin.PutUint64(ans[1:], self.BlockIndex)
@@ -187,9 +183,6 @@ func (self *BlockHash) Unserialize(data []byte) (err error) {
return
}
// Write signatures as they are generated.
type OperationWriter func(op Operation) error
// Properties to use while working with the rsync algorithm.
// A single rsync should not be used concurrently as it may contain
// internal buffers and hash sums.
@@ -337,50 +330,6 @@ func (r *rsync) set_buffer_to_size(sz int) {
}
}
type node struct {
op *Operation
next *node
}
type list struct {
head *node
}
func (self *list) push_back(op *Operation) {
n := &node{op: op}
n.next = self.head
self.head = n
}
func (self *list) is_empty() bool { return self.head == nil }
func (self *list) front() *Operation {
for c := self.head; c != nil; c = c.next {
if c.next == nil {
return c.op
}
}
return nil
}
func (self *list) pop_front() *Operation {
c := self.head
var prev *node
for c != nil {
if c.next == nil {
if prev == nil {
self.head = nil
} else {
prev.next = nil
}
return c.op
}
prev = c
c = c.next
}
return nil
}
// see https://rsync.samba.org/tech_report/node3.html
type rolling_checksum struct {
alpha, beta, val, l uint32
@@ -409,29 +358,25 @@ func (self *rolling_checksum) add_one_byte(first_byte, last_byte byte) {
}
type diff struct {
buffer []byte
buffer []byte
op_write_buf [32]byte
// A single β hash may correlate with many unique hashes.
hash_lookup map[uint32][]BlockHash
source io.Reader
hasher hash.Hash64
checksummer hash.Hash
output io.Writer
window, data struct{ pos, sz int }
block_size int
finished bool
rc rolling_checksum
window, data struct{ pos, sz int }
block_size int
finished, written bool
rc rolling_checksum
pending_op *Operation
ready_ops list
}
func (self *diff) Next() (op *Operation, err error) {
if self.ready_ops.is_empty() {
if err = self.pump_till_op_available(); err != nil {
return
}
}
return self.ready_ops.pop_front(), nil
func (self *diff) Next() (err error) {
return self.pump_till_op_written()
}
func (self *diff) hash(b []byte) uint64 {
@@ -442,7 +387,15 @@ func (self *diff) hash(b []byte) uint64 {
// Combine OpBlock into OpBlockRange. To do this store the previous
// non-data operation and determine if it can be extended.
func (self *diff) enqueue(op Operation) {
func (self *diff) send_pending() (err error) {
if self.pending_op != nil {
err = self.send_op(self.pending_op)
self.pending_op = nil
}
return
}
func (self *diff) enqueue(op Operation) (err error) {
switch op.Type {
case OpBlock:
if self.pending_op != nil {
@@ -462,44 +415,68 @@ func (self *diff) enqueue(op Operation) {
return
}
}
self.ready_ops.push_back(self.pending_op)
self.pending_op = nil
if err = self.send_pending(); err != nil {
return err
}
}
self.pending_op = &op
case OpData, OpHash:
if self.pending_op != nil {
self.ready_ops.push_back(self.pending_op)
self.pending_op = nil
case OpHash:
if err = self.send_pending(); err != nil {
return
}
if err = self.send_op(&op); err != nil {
return
}
self.ready_ops.push_back(&op)
}
return
}
func (self *diff) send_data() {
func (self *diff) send_op(op *Operation) error {
b := self.op_write_buf[:op.SerializeSize()]
op.Serialize(b)
self.written = true
_, err := self.output.Write(b)
return err
}
func (self *diff) send_data() error {
if self.data.sz > 0 {
if err := self.send_pending(); err != nil {
return err
}
self.written = true
data := self.buffer[self.data.pos : self.data.pos+self.data.sz]
srepr := make([]byte, len(data)+5)
copy(srepr[5:], data)
bin.PutUint32(srepr[1:], uint32(len(data)))
srepr[0] = byte(OpData)
op := Operation{Type: OpData, Data: srepr[5:], serialized_repr: srepr}
self.enqueue(op)
var buf [5]byte
bin.PutUint32(buf[1:], uint32(len(data)))
buf[0] = byte(OpData)
if _, err := self.output.Write(buf[:]); err != nil {
return err
}
if _, err := self.output.Write(data); err != nil {
return err
}
self.data.pos += self.data.sz
self.data.sz = 0
}
return nil
}
func (self *diff) pump_till_op_available() error {
for self.ready_ops.is_empty() && !self.finished {
func (self *diff) pump_till_op_written() error {
self.written = false
for !self.finished && !self.written {
if err := self.read_at_least_one_operation(); err != nil {
return err
}
}
if self.finished && self.pending_op != nil {
self.ready_ops.push_back(self.pending_op)
self.pending_op = nil
if self.finished {
if self.pending_op != nil {
if err := self.send_op(self.pending_op); err != nil {
return err
}
self.pending_op = nil
}
return io.EOF
}
return nil
}
@@ -510,7 +487,9 @@ func (self *diff) ensure_idx_valid(idx int) (ok bool, err error) {
}
if idx >= cap(self.buffer) {
// need to wrap the buffer, so send off any data present behind the window
self.send_data()
if err = self.send_data(); err != nil {
return
}
// copy the window and any data present after it to the start of the buffer
distance_from_window_pos := idx - self.window.pos
amt_to_copy := len(self.buffer) - self.window.pos
@@ -537,13 +516,18 @@ func (self *diff) ensure_idx_valid(idx int) (ok bool, err error) {
return
}
func (self *diff) finish_up() {
self.send_data()
func (self *diff) finish_up() (err error) {
if err = self.send_data(); err != nil {
return
}
self.data.pos = self.window.pos
self.data.sz = len(self.buffer) - self.window.pos
self.send_data()
if err = self.send_data(); err != nil {
return
}
self.enqueue(Operation{Type: OpHash, Data: self.checksummer.Sum(nil)})
self.finished = true
return
}
// See https://rsync.samba.org/tech_report/node4.html for the design of this algorithm
@@ -553,8 +537,7 @@ func (self *diff) read_at_least_one_operation() (err error) {
if err != nil {
return err
}
self.finish_up()
return nil
return self.finish_up()
}
self.window.pos++
self.data.sz++
@@ -564,8 +547,7 @@ func (self *diff) read_at_least_one_operation() (err error) {
if err != nil {
return err
}
self.finish_up()
return nil
return self.finish_up()
}
self.window.sz = self.block_size
self.rc.full(self.buffer[self.window.pos : self.window.pos+self.window.sz])
@@ -576,7 +558,9 @@ func (self *diff) read_at_least_one_operation() (err error) {
block_index, found_hash = find_hash(hh, self.hash(self.buffer[self.window.pos:self.window.pos+self.window.sz]))
}
if found_hash {
self.send_data()
if err = self.send_data(); err != nil {
return
}
self.enqueue(Operation{Type: OpBlock, BlockIndex: block_index})
self.window.pos += self.window.sz
self.data.pos = self.window.pos
@@ -585,12 +569,53 @@ func (self *diff) read_at_least_one_operation() (err error) {
return nil
}
func (r *rsync) CreateDiff(source io.Reader, signature []BlockHash) func() (*Operation, error) {
type OperationWriter struct {
Operations []Operation
expecting_data bool
}
func (self *OperationWriter) Write(p []byte) (n int, err error) {
if self.expecting_data {
self.expecting_data = false
self.Operations = append(self.Operations, Operation{Type: OpData, Data: slices.Clone(p)})
} else {
switch OpType(p[0]) {
case OpData:
self.expecting_data = true
case OpBlock, OpBlockRange, OpHash:
op := Operation{}
if n, err = op.Unserialize(p); err != nil {
return 0, err
} else if n < len(p) {
return 0, io.ErrShortWrite
}
self.Operations = append(self.Operations, op)
default:
return 0, fmt.Errorf("Unknown OpType: %d", p[0])
}
}
return
}
func (r *rsync) CreateDelta(source io.Reader, signature []BlockHash) ([]Operation, error) {
w := OperationWriter{}
it := r.CreateDiff(source, signature, &w)
for {
if err := it(); err != nil {
if err == io.EOF {
return w.Operations, nil
}
return nil, err
}
}
}
func (r *rsync) CreateDiff(source io.Reader, signature []BlockHash, output io.Writer) func() error {
ans := &diff{
block_size: r.BlockSize, buffer: make([]byte, 0, (r.BlockSize * 8)),
hash_lookup: make(map[uint32][]BlockHash, len(signature)),
source: source, hasher: r.hasher_constructor(),
checksummer: r.checksummer_constructor(),
checksummer: r.checksummer_constructor(), output: output,
}
for _, h := range signature {
key := h.WeakHash
@@ -600,20 +625,6 @@ func (r *rsync) CreateDiff(source io.Reader, signature []BlockHash) func() (*Ope
return ans.Next
}
func (r *rsync) CreateDelta(source io.Reader, signature []BlockHash, ops OperationWriter) (err error) {
diff := r.CreateDiff(source, signature)
var op *Operation
for {
op, err = diff()
if op == nil {
return
}
if err = ops(*op); err != nil {
return err
}
}
}
// Use a more unique way to identify a set of bytes.
func (r *rsync) hash(v []byte) uint64 {
r.hasher.Reset()

View File

@@ -249,35 +249,16 @@ func (self *Patcher) CreateSignatureIterator(src io.Reader) OutputIterator {
}
// Create a serialized delta based on the previously loaded signature
func (self *Differ) CreateDelta(src io.Reader) OutputIterator {
func (self *Differ) CreateDelta(src io.Reader, output io.Writer) func() error {
if err := self.finish_signature_data(); err != nil {
return func([]byte) ([]byte, error) { return nil, err }
return func() error { return err }
}
if self.signature == nil {
return func([]byte) ([]byte, error) {
return nil, fmt.Errorf("Cannot call CreateDelta() before loading a signature")
}
}
it := self.rsync.CreateDiff(src, self.signature)
gbf := self.Grow_buffer_func
if gbf == nil {
gbf = double_size
}
return func(output []byte) ([]byte, error) {
for {
op, err := it()
if op == nil {
if err == nil {
err = io.EOF
}
return output, err
}
output, p := ensure_size(output, op.SerializeSize(), gbf)
op.Serialize(p)
return output, nil
return func() error {
return fmt.Errorf("Cannot call CreateDelta() before loading a signature")
}
}
return self.rsync.CreateDiff(src, self.signature, output)
}
// Add more external signature data

View File

@@ -59,11 +59,10 @@ func run_roundtrip_test(t *testing.T, src_data, changed []byte, num_of_patches,
total_data_in_delta := 0
apply_delta := func(signature []BlockHash) []byte {
delta_ops := make([]Operation, 0, 1024)
p.rsync.CreateDelta(bytes.NewReader(src_data), signature, func(op Operation) error {
delta_ops = append(delta_ops, op)
return nil
})
delta_ops, err := p.rsync.CreateDelta(bytes.NewReader(src_data), signature)
if err != nil {
t.Fatal(err)
}
total_data_in_delta = 0
outputbuf := bytes.Buffer{}
for _, op := range delta_ops {
@@ -98,18 +97,17 @@ func run_roundtrip_test(t *testing.T, src_data, changed []byte, num_of_patches,
if err := d.AddSignatureData(signature_of_changed); err != nil {
t.Fatal(err)
}
deltabuf := make([]byte, 0, 8192)
it := d.CreateDelta(bytes.NewBuffer(src_data))
db := bytes.Buffer{}
it := d.CreateDelta(bytes.NewBuffer(src_data), &db)
for {
b, err := it(deltabuf)
if err != nil {
if err := it(); err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}
deltabuf = b
}
deltabuf := db.Bytes()
outputbuf := bytes.Buffer{}
p.StartDelta(&outputbuf, bytes.NewReader(changed))
for len(deltabuf) > 0 {