Make loading of external signature data also streaming

This commit is contained in:
Kovid Goyal
2023-07-02 21:27:57 +05:30
parent 7e12972414
commit 71a1050b9f

View File

@@ -3,12 +3,11 @@
package rsync
import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"math"
"os"
"strings"
"github.com/zeebo/xxh3"
@@ -20,11 +19,12 @@ var _ = fmt.Print
const MaxBlockSize int = 256 * 1024
type Api struct {
rsync RSync
signature []BlockHash
delta_output io.Writer
delta_input io.ReadSeeker
delta_buffer []byte
rsync RSync
signature []BlockHash
delta_output io.Writer
delta_input io.ReadSeeker
unconsumed_signature_data, unconsumed_delta_data []byte
expected_input_size_for_signature_generation int64
Strong_hash_name, Weak_hash_name string
}
@@ -35,10 +35,53 @@ type SignatureHeader struct {
Block_size int `json:"block_size,omitempty"`
}
func (self *Api) StartDelta(delta_output io.Writer, delta_input io.ReadSeeker) {
self.delta_output = delta_output
self.delta_input = delta_input
self.delta_buffer = self.delta_buffer[:0]
// internal implementation {{{
func (self *Api) read_signature_header(data []byte) (consumed int, err error) {
if len(data) < 6 {
return -1, io.ErrShortBuffer
}
sz := int(binary.LittleEndian.Uint32(data))
if len(data) < sz+4 {
return -1, io.ErrShortBuffer
}
consumed = 4 + sz
h := SignatureHeader{}
if err = json.Unmarshal(data[4:consumed], &h); err != nil {
return consumed, fmt.Errorf("Invalid JSON in signature header with error: %w", err)
}
if h.Block_size == 0 {
return consumed, fmt.Errorf("rsync signature header has no or zero block size")
}
if h.Block_size > MaxBlockSize {
return consumed, fmt.Errorf("rsync signature header has too large block size %d > %d", h.Block_size, MaxBlockSize)
}
self.rsync.BlockSize = h.Block_size
self.rsync.MaxDataOp = 10 * h.Block_size
if h.Weak_hash_name != "" && h.Weak_hash_name != "beta" {
return consumed, fmt.Errorf("rsync signature header has unknown weak hash algorithm: %#v", h.Weak_hash_name)
}
self.Weak_hash_name = h.Weak_hash_name
switch h.Strong_hash_name {
case "", "xxh3":
self.rsync.UniqueHasher = xxh3.New()
self.Strong_hash_name = "xxh3"
default:
return consumed, fmt.Errorf("rsync signature header has unknown strong hash algorithm: %#v", h.Strong_hash_name)
}
self.signature = make([]BlockHash, 0, 64)
return
}
func (self *Api) read_signature_blocks(data []byte) (consumed int) {
hash_size := self.rsync.UniqueHasher.Size()
block_hash_size := hash_size + 12
for ; len(data) >= block_hash_size; data = data[block_hash_size:] {
bl := BlockHash{}
bl.Unserialize(data[:block_hash_size], hash_size)
self.signature = append(self.signature, bl)
consumed += block_hash_size
}
return
}
func (self *Api) update_delta(data []byte) (consumed int, err error) {
@@ -61,45 +104,47 @@ func (self *Api) update_delta(data []byte) (consumed int, err error) {
return
}
// }}}
// Start applying serialized delta
func (self *Api) StartDelta(delta_output io.Writer, delta_input io.ReadSeeker) {
self.delta_output = delta_output
self.delta_input = delta_input
self.unconsumed_delta_data = nil
}
// Apply a chunk of delta data
func (self *Api) UpdateDelta(data []byte) (err error) {
if len(self.delta_buffer) == 0 {
consumed, err := self.update_delta(data)
if err != nil {
return err
}
data = data[consumed:]
if len(data) > 0 {
self.delta_buffer = append(self.delta_buffer, data...)
}
} else {
self.delta_buffer = append(self.delta_buffer, data...)
consumed, err := self.update_delta(self.delta_buffer)
if err != nil {
return err
}
leftover := len(self.delta_buffer) - consumed
copy(self.delta_buffer, self.delta_buffer[consumed:])
self.delta_buffer = self.delta_buffer[:leftover]
if len(self.unconsumed_delta_data) > 0 {
data = append(self.unconsumed_delta_data, data...)
self.unconsumed_delta_data = nil
}
consumed, err := self.update_delta(data)
if err != nil {
return err
}
data = data[consumed:]
if len(data) > 0 {
self.unconsumed_delta_data = data
}
return
}
// Finish applying delta data
func (self *Api) FinishDelta() (err error) {
if len(self.delta_buffer) > 0 {
data := self.delta_buffer
self.delta_buffer = self.delta_buffer[:0]
if err = self.UpdateDelta(data); err != nil {
return err
}
if len(self.delta_buffer) > 0 {
return fmt.Errorf("There are %d leftover bytes in the delta", len(self.delta_buffer))
}
if err = self.UpdateDelta([]byte{}); err != nil {
return err
}
if len(self.unconsumed_delta_data) > 0 {
return fmt.Errorf("There are %d leftover bytes in the delta", len(self.unconsumed_delta_data))
}
self.delta_input = nil
self.delta_output = nil
self.unconsumed_delta_data = nil
return
}
// Create a serialized delta based on the previously loaded signature
func (self *Api) CreateDelta(src io.Reader, output_callback func(string) error) (err error) {
if len(self.signature) == 0 {
return fmt.Errorf("Cannot call CreateDelta() before loading a signature")
@@ -110,65 +155,68 @@ func (self *Api) CreateDelta(src io.Reader, output_callback func(string) error)
return
}
func (self *Api) setup_from_signature(signature string) (err error) {
h := SignatureHeader{}
dec := json.NewDecoder(strings.NewReader(signature))
if err = dec.Decode(&h); err != nil {
return fmt.Errorf("rsync signature header not valid JSON with error: %w", err)
}
signature = signature[dec.InputOffset():]
if h.Block_size == 0 {
return fmt.Errorf("rsync signature header has no or zero block size")
}
if h.Block_size > MaxBlockSize {
return fmt.Errorf("rsync signature header has too large block size %d > %d", h.Block_size, MaxBlockSize)
}
self.rsync.BlockSize = h.Block_size
self.rsync.MaxDataOp = 10 * h.Block_size
if h.Weak_hash_name != "" && h.Weak_hash_name != "beta" {
return fmt.Errorf("rsync signature header has unknown weak hash algorithm: %#v", h.Weak_hash_name)
}
self.Weak_hash_name = h.Weak_hash_name
switch h.Strong_hash_name {
case "", "xxh3":
self.rsync.UniqueHasher = xxh3.New()
self.Strong_hash_name = "xxh3"
default:
return fmt.Errorf("rsync signature header has unknown strong hash algorithm: %#v", h.Strong_hash_name)
}
self.signature = make([]BlockHash, 0, 64)
hash_size := self.rsync.UniqueHasher.Size()
block_hash_size := self.rsync.UniqueHasher.Size() + 12
for ; len(signature) >= block_hash_size; signature = signature[block_hash_size:] {
data := utils.UnsafeStringToBytes(signature[:block_hash_size])
bl := BlockHash{}
bl.Unserialize(data, hash_size)
self.signature = append(self.signature, bl)
// Create a signature for the data source in src
func (self *Api) CreateSignature(src io.Reader) (signature []BlockHash, err error) {
if self.expected_input_size_for_signature_generation > 0 {
signature = make([]BlockHash, 0, self.rsync.BlockHashCount(self.expected_input_size_for_signature_generation))
} else {
signature = make([]BlockHash, 0, 1024)
}
err = self.rsync.CreateSignature(src, func(bl BlockHash) error {
signature = append(signature, bl)
return nil
})
return
}
func NewFromSignature(signature string) (ans *Api, err error) {
ans = &Api{}
err = ans.setup_from_signature(signature)
return
}
func New(src io.Reader) (ans *Api, err error) {
bs := DefaultBlockSize
var sz int64
if v, ok := src.(io.ReadSeeker); ok {
if pos, err := v.Seek(0, os.SEEK_CUR); err != nil {
return nil, err
} else {
if sz, err = v.Seek(0, os.SEEK_END); err != nil {
sz -= pos
bs = int(math.Round(math.Sqrt(float64(sz))))
if _, err = v.Seek(pos, os.SEEK_SET); err != nil {
return nil, err
}
// Add more external signature data
func (self *Api) AddSignatureData(data []byte) (err error) {
if len(self.unconsumed_signature_data) > 0 {
data = append(self.unconsumed_signature_data, data...)
self.unconsumed_signature_data = nil
}
if self.rsync.UniqueHasher == nil {
consumed, err := self.read_signature_header(data)
if err != nil {
if consumed < 0 {
self.unconsumed_signature_data = data
return nil
}
return err
}
data = data[consumed:]
}
consumed := self.read_signature_blocks(data)
data = data[consumed:]
if len(data) > 0 {
self.unconsumed_signature_data = data
}
return nil
}
// Finish adding external signature data
func (self *Api) FinishSignatureData() (err error) {
if len(self.unconsumed_signature_data) > 0 {
return fmt.Errorf("There were %d leftover bytes in the signature data", len(self.unconsumed_signature_data))
}
self.unconsumed_signature_data = nil
if self.rsync.UniqueHasher == nil {
return fmt.Errorf("No header was found in the signature data")
}
return
}
// Use to calculate a delta based on a supplied signature, via AddSignatureData
func NewToCreateDelta() *Api {
return &Api{}
}
// Use to create a signature and possibly apply a delta
func NewToCreateSignature(expected_input_size int64) (ans *Api, err error) {
bs := DefaultBlockSize
sz := utils.Max(0, expected_input_size)
if sz > 0 {
bs = int(math.Round(math.Sqrt(float64(sz))))
}
ans = &Api{Weak_hash_name: "beta", Strong_hash_name: "xxh3"}
ans.rsync.BlockSize = utils.Min(bs, MaxBlockSize)
@@ -179,12 +227,6 @@ func New(src io.Reader) (ans *Api, err error) {
}
ans.rsync.MaxDataOp = ans.rsync.BlockSize * 10
if sz > 0 {
ans.signature = make([]BlockHash, 0, ans.rsync.BlockHashCount(sz))
}
err = ans.rsync.CreateSignature(src, func(bl BlockHash) error {
ans.signature = append(ans.signature, bl)
return nil
})
ans.expected_input_size_for_signature_generation = sz
return
}