diff --git a/tools/rsync/api.go b/tools/rsync/api.go index 2283a8855..04aae72f7 100644 --- a/tools/rsync/api.go +++ b/tools/rsync/api.go @@ -3,12 +3,11 @@ package rsync import ( + "encoding/binary" "encoding/json" "fmt" "io" "math" - "os" - "strings" "github.com/zeebo/xxh3" @@ -20,11 +19,12 @@ var _ = fmt.Print const MaxBlockSize int = 256 * 1024 type Api struct { - rsync RSync - signature []BlockHash - delta_output io.Writer - delta_input io.ReadSeeker - delta_buffer []byte + rsync RSync + signature []BlockHash + delta_output io.Writer + delta_input io.ReadSeeker + unconsumed_signature_data, unconsumed_delta_data []byte + expected_input_size_for_signature_generation int64 Strong_hash_name, Weak_hash_name string } @@ -35,10 +35,53 @@ type SignatureHeader struct { Block_size int `json:"block_size,omitempty"` } -func (self *Api) StartDelta(delta_output io.Writer, delta_input io.ReadSeeker) { - self.delta_output = delta_output - self.delta_input = delta_input - self.delta_buffer = self.delta_buffer[:0] +// internal implementation {{{ +func (self *Api) read_signature_header(data []byte) (consumed int, err error) { + if len(data) < 6 { + return -1, io.ErrShortBuffer + } + sz := int(binary.LittleEndian.Uint32(data)) + if len(data) < sz+4 { + return -1, io.ErrShortBuffer + } + consumed = 4 + sz + h := SignatureHeader{} + if err = json.Unmarshal(data[4:consumed], &h); err != nil { + return consumed, fmt.Errorf("Invalid JSON in signature header with error: %w", err) + } + if h.Block_size == 0 { + return consumed, fmt.Errorf("rsync signature header has no or zero block size") + } + if h.Block_size > MaxBlockSize { + return consumed, fmt.Errorf("rsync signature header has too large block size %d > %d", h.Block_size, MaxBlockSize) + } + self.rsync.BlockSize = h.Block_size + self.rsync.MaxDataOp = 10 * h.Block_size + if h.Weak_hash_name != "" && h.Weak_hash_name != "beta" { + return consumed, fmt.Errorf("rsync signature header has unknown weak hash algorithm: %#v", h.Weak_hash_name) + } + self.Weak_hash_name = h.Weak_hash_name + switch h.Strong_hash_name { + case "", "xxh3": + self.rsync.UniqueHasher = xxh3.New() + self.Strong_hash_name = "xxh3" + default: + return consumed, fmt.Errorf("rsync signature header has unknown strong hash algorithm: %#v", h.Strong_hash_name) + } + self.signature = make([]BlockHash, 0, 64) + return +} + +func (self *Api) read_signature_blocks(data []byte) (consumed int) { + hash_size := self.rsync.UniqueHasher.Size() + block_hash_size := hash_size + 12 + for ; len(data) >= block_hash_size; data = data[block_hash_size:] { + bl := BlockHash{} + bl.Unserialize(data[:block_hash_size], hash_size) + self.signature = append(self.signature, bl) + consumed += block_hash_size + } + return } func (self *Api) update_delta(data []byte) (consumed int, err error) { @@ -61,45 +104,47 @@ func (self *Api) update_delta(data []byte) (consumed int, err error) { return } +// }}} + +// Start applying serialized delta +func (self *Api) StartDelta(delta_output io.Writer, delta_input io.ReadSeeker) { + self.delta_output = delta_output + self.delta_input = delta_input + self.unconsumed_delta_data = nil +} + +// Apply a chunk of delta data func (self *Api) UpdateDelta(data []byte) (err error) { - if len(self.delta_buffer) == 0 { - consumed, err := self.update_delta(data) - if err != nil { - return err - } - data = data[consumed:] - if len(data) > 0 { - self.delta_buffer = append(self.delta_buffer, data...) - } - } else { - self.delta_buffer = append(self.delta_buffer, data...) - consumed, err := self.update_delta(self.delta_buffer) - if err != nil { - return err - } - leftover := len(self.delta_buffer) - consumed - copy(self.delta_buffer, self.delta_buffer[consumed:]) - self.delta_buffer = self.delta_buffer[:leftover] + if len(self.unconsumed_delta_data) > 0 { + data = append(self.unconsumed_delta_data, data...) + self.unconsumed_delta_data = nil + } + consumed, err := self.update_delta(data) + if err != nil { + return err + } + data = data[consumed:] + if len(data) > 0 { + self.unconsumed_delta_data = data } return } +// Finish applying delta data func (self *Api) FinishDelta() (err error) { - if len(self.delta_buffer) > 0 { - data := self.delta_buffer - self.delta_buffer = self.delta_buffer[:0] - if err = self.UpdateDelta(data); err != nil { - return err - } - if len(self.delta_buffer) > 0 { - return fmt.Errorf("There are %d leftover bytes in the delta", len(self.delta_buffer)) - } + if err = self.UpdateDelta([]byte{}); err != nil { + return err + } + if len(self.unconsumed_delta_data) > 0 { + return fmt.Errorf("There are %d leftover bytes in the delta", len(self.unconsumed_delta_data)) } self.delta_input = nil self.delta_output = nil + self.unconsumed_delta_data = nil return } +// Create a serialized delta based on the previously loaded signature func (self *Api) CreateDelta(src io.Reader, output_callback func(string) error) (err error) { if len(self.signature) == 0 { return fmt.Errorf("Cannot call CreateDelta() before loading a signature") @@ -110,65 +155,68 @@ func (self *Api) CreateDelta(src io.Reader, output_callback func(string) error) return } -func (self *Api) setup_from_signature(signature string) (err error) { - h := SignatureHeader{} - dec := json.NewDecoder(strings.NewReader(signature)) - if err = dec.Decode(&h); err != nil { - return fmt.Errorf("rsync signature header not valid JSON with error: %w", err) - } - signature = signature[dec.InputOffset():] - if h.Block_size == 0 { - return fmt.Errorf("rsync signature header has no or zero block size") - } - if h.Block_size > MaxBlockSize { - return fmt.Errorf("rsync signature header has too large block size %d > %d", h.Block_size, MaxBlockSize) - } - self.rsync.BlockSize = h.Block_size - self.rsync.MaxDataOp = 10 * h.Block_size - if h.Weak_hash_name != "" && h.Weak_hash_name != "beta" { - return fmt.Errorf("rsync signature header has unknown weak hash algorithm: %#v", h.Weak_hash_name) - } - self.Weak_hash_name = h.Weak_hash_name - switch h.Strong_hash_name { - case "", "xxh3": - self.rsync.UniqueHasher = xxh3.New() - self.Strong_hash_name = "xxh3" - default: - return fmt.Errorf("rsync signature header has unknown strong hash algorithm: %#v", h.Strong_hash_name) - } - self.signature = make([]BlockHash, 0, 64) - hash_size := self.rsync.UniqueHasher.Size() - block_hash_size := self.rsync.UniqueHasher.Size() + 12 - for ; len(signature) >= block_hash_size; signature = signature[block_hash_size:] { - data := utils.UnsafeStringToBytes(signature[:block_hash_size]) - bl := BlockHash{} - bl.Unserialize(data, hash_size) - self.signature = append(self.signature, bl) +// Create a signature for the data source in src +func (self *Api) CreateSignature(src io.Reader) (signature []BlockHash, err error) { + if self.expected_input_size_for_signature_generation > 0 { + signature = make([]BlockHash, 0, self.rsync.BlockHashCount(self.expected_input_size_for_signature_generation)) + } else { + signature = make([]BlockHash, 0, 1024) } + err = self.rsync.CreateSignature(src, func(bl BlockHash) error { + signature = append(signature, bl) + return nil + }) return } -func NewFromSignature(signature string) (ans *Api, err error) { - ans = &Api{} - err = ans.setup_from_signature(signature) - return -} - -func New(src io.Reader) (ans *Api, err error) { - bs := DefaultBlockSize - var sz int64 - if v, ok := src.(io.ReadSeeker); ok { - if pos, err := v.Seek(0, os.SEEK_CUR); err != nil { - return nil, err - } else { - if sz, err = v.Seek(0, os.SEEK_END); err != nil { - sz -= pos - bs = int(math.Round(math.Sqrt(float64(sz)))) - if _, err = v.Seek(pos, os.SEEK_SET); err != nil { - return nil, err - } +// Add more external signature data +func (self *Api) AddSignatureData(data []byte) (err error) { + if len(self.unconsumed_signature_data) > 0 { + data = append(self.unconsumed_signature_data, data...) + self.unconsumed_signature_data = nil + } + if self.rsync.UniqueHasher == nil { + consumed, err := self.read_signature_header(data) + if err != nil { + if consumed < 0 { + self.unconsumed_signature_data = data + return nil } + return err } + data = data[consumed:] + } + consumed := self.read_signature_blocks(data) + data = data[consumed:] + if len(data) > 0 { + self.unconsumed_signature_data = data + } + return nil +} + +// Finish adding external signature data +func (self *Api) FinishSignatureData() (err error) { + if len(self.unconsumed_signature_data) > 0 { + return fmt.Errorf("There were %d leftover bytes in the signature data", len(self.unconsumed_signature_data)) + } + self.unconsumed_signature_data = nil + if self.rsync.UniqueHasher == nil { + return fmt.Errorf("No header was found in the signature data") + } + return +} + +// Use to calculate a delta based on a supplied signature, via AddSignatureData +func NewToCreateDelta() *Api { + return &Api{} +} + +// Use to create a signature and possibly apply a delta +func NewToCreateSignature(expected_input_size int64) (ans *Api, err error) { + bs := DefaultBlockSize + sz := utils.Max(0, expected_input_size) + if sz > 0 { + bs = int(math.Round(math.Sqrt(float64(sz)))) } ans = &Api{Weak_hash_name: "beta", Strong_hash_name: "xxh3"} ans.rsync.BlockSize = utils.Min(bs, MaxBlockSize) @@ -179,12 +227,6 @@ func New(src io.Reader) (ans *Api, err error) { } ans.rsync.MaxDataOp = ans.rsync.BlockSize * 10 - if sz > 0 { - ans.signature = make([]BlockHash, 0, ans.rsync.BlockHashCount(sz)) - } - err = ans.rsync.CreateSignature(src, func(bl BlockHash) error { - ans.signature = append(ans.signature, bl) - return nil - }) + ans.expected_input_size_for_signature_generation = sz return }