More rsync integration work

This commit is contained in:
Kovid Goyal
2021-11-16 13:06:16 +05:30
parent 63399fe975
commit 0eac514e52
2 changed files with 65 additions and 21 deletions

View File

@@ -9,13 +9,14 @@ from contextlib import suppress
from enum import auto
from itertools import count
from time import monotonic
from typing import Deque, Dict, Iterator, List, Optional
from typing import IO, Deque, Dict, Iterator, List, Optional, Union
from kitty.cli_stub import TransferCLIOptions
from kitty.fast_data_types import FILE_TRANSFER_CODE, wcswidth
from kitty.file_transmission import (
Action, Compression, FileTransmissionCommand, FileType, NameReprEnum,
TransmissionType, encode_bypass, split_for_transfer
Action, Compression, FileTransmissionCommand, FileType,
IdentityDecompressor, NameReprEnum, TransmissionType, ZlibDecompressor,
encode_bypass, split_for_transfer
)
from kitty.typing import KeyEventType
from kitty.utils import sanitize_control_codes
@@ -25,7 +26,7 @@ from ..tui.loop import Loop, debug
from ..tui.operations import styled, without_line_wrap
from ..tui.spinners import Spinner
from ..tui.utils import human_size
from .librsync import signature_of_file
from .librsync import PatchFile, signature_of_file
from .send import Transfer
from .utils import (
expand_home, random_id, render_progress_in_width, safe_divide,
@@ -47,6 +48,7 @@ class File:
def __init__(self, ftc: FileTransmissionCommand):
self.expected_size = ftc.size
self.expect_diff = False
self.transmit_started_at = self.done_at = 0.
self.transmitted_bytes = 0
self.ftype = ftc.ftype
@@ -61,26 +63,31 @@ class File:
self.expanded_local_path = ''
self.file_id = str(next(file_counter))
self.compression_capable = self.ftype is FileType.regular and self.expected_size > 4096 and should_be_compressed(self.expanded_local_path)
self.decompressor: Union[ZlibDecompressor, IdentityDecompressor] = ZlibDecompressor() if self.compression_capable else IdentityDecompressor()
self.remote_symlink_value = b''
self.local_write_started = False
self.actual_file: Union[None, PatchFile, IO[bytes]] = None
def __repr__(self) -> str:
return f'File(rpath={self.remote_path!r}, lpath={self.expanded_local_path!r})'
def write_data(self, data: bytes, is_last: bool) -> int:
data = self.decompressor(data, is_last)
if self.ftype is FileType.symlink:
self.remote_symlink_value += data
return 0
if self.ftype is FileType.regular:
if not self.local_write_started:
if self.actual_file is None:
parent = os.path.dirname(self.expanded_local_path)
if parent:
os.makedirs(parent, exist_ok=True)
self.local_write_started = True
with open(self.expanded_local_path, 'ab') as f:
base = f.tell()
f.write(data)
return f.tell() - base
self.actual_file = PatchFile(self.expanded_local_path) if self.expect_diff else open(self.expanded_local_path, 'wb')
base = self.actual_file.tell()
self.actual_file.write(data)
ans = self.actual_file.tell() - base
if is_last:
self.actual_file.close()
self.actual_file = None
return ans
return 0
def apply_metadata(self) -> None:
@@ -289,6 +296,7 @@ class Manager:
compression=Compression.zlib if f.compression_capable else Compression.none
).serialize()
if read_signature:
f.expect_diff = True
fs = signature_of_file(f.expanded_local_path)
for chunk in fs:
for data in split_for_transfer(chunk, file_id=f.file_id):

View File

@@ -19,7 +19,9 @@ from typing import (
Optional, Tuple, Union, cast
)
from kittens.transfer.librsync import PatchFile, signature_of_file
from kittens.transfer.librsync import (
LoadSignature, PatchFile, delta_for_file, signature_of_file
)
from kittens.transfer.utils import (
IdentityCompressor, ZlibCompressor, abspath, expand_home, home_path
)
@@ -551,6 +553,8 @@ class SourceFile:
self.open_file = open(self.path, 'rb')
if ftc.compression is Compression.zlib:
self.compressor = ZlibCompressor()
self.signature_loader = LoadSignature() if self.waiting_for_signature else None
self.delta_loader: Optional[Iterator[memoryview]] = None
@property
def ready_to_transmit(self) -> bool:
@@ -560,14 +564,23 @@ class SourceFile:
if hasattr(self, 'open_file'):
self.open_file.close()
del self.open_file
self.signature_loader = None
self.delta_loader = None
def next_chunk(self, sz: int = 1024 * 1024) -> Tuple[bytes, int]:
if hasattr(self, 'target'):
self.transmitted = True
return self.target.encode('utf-8'), len(self.target)
data = self.open_file.read(sz)
if not data or self.open_file.tell() >= self.stat.st_size:
self.transmitted = True
if self.delta_loader is None:
data = self.open_file.read(sz)
if not data or self.open_file.tell() >= self.stat.st_size:
self.transmitted = True
else:
try:
data = next(self.delta_loader)
except StopIteration:
self.transmitted = True
data = b''
uncompressed_sz = len(data)
cchunk = self.compressor.compress(data)
if self.transmitted and not isinstance(self.compressor, IdentityCompressor):
@@ -592,7 +605,7 @@ class ActiveSend:
self.send_errors = quiet < 2
self.last_activity_at = monotonic()
self.file_specs: List[Tuple[str, str]] = []
self.queued_files: List[SourceFile] = []
self.queued_files_map: Dict[str, SourceFile] = {}
self.active_file: Optional[SourceFile] = None
self.pending_chunks: Deque[FileTransmissionCommand] = deque()
self.metadata_sent = False
@@ -609,9 +622,23 @@ class ActiveSend:
def add_send_file(self, cmd: FileTransmissionCommand) -> None:
self.last_activity_at = monotonic()
if len(self.queued_files) > 32768:
if len(self.queued_files_map) > 32768:
raise TransmissionError(ErrorCode.EINVAL, 'Too many queued files')
self.queued_files.append(SourceFile(cmd))
self.queued_files_map[cmd.file_id] = SourceFile(cmd)
def add_signature_data(self, cmd: FileTransmissionCommand) -> None:
self.last_activity_at = monotonic()
af = self.queued_files_map.get(cmd.file_id)
if af is None:
raise TransmissionError(ErrorCode.EINVAL, f'Signature data for unknown file_id: {cmd.file_id}')
sl = af.signature_loader
if sl is None:
raise TransmissionError(ErrorCode.EINVAL, f'Signature data for file that is not using rsync: {cmd.file_id}')
sl.add_chunk(cmd.data)
if cmd.action is Action.end_data:
sl.commit()
af.waiting_for_signature = False
af.delta_loader = delta_for_file(af.path, sl.signature)
@property
def is_expired(self) -> bool:
@@ -628,13 +655,13 @@ class ActiveSend:
return self.pending_chunks.popleft()
af = self.active_file
if af is None:
for f in self.queued_files:
for f in self.queued_files_map.values():
if f.ready_to_transmit:
self.active_file = af = f
break
if af is None:
return None
self.queued_files.remove(af)
self.queued_files_map.pop(af.file_id, None)
chunk, uncompressed_sz = af.next_chunk()
if af.transmitted:
self.active_file = None
@@ -747,7 +774,16 @@ class FileTransmission:
if asd.spec_complete and asd.accepted:
self.send_metadata_for_send_transfer(asd)
return
if cmd.action is Action.status:
if cmd.action in (Action.data, Action.end_data):
try:
asd.add_signature_data(cmd)
except TransmissionError as err:
self.drop_send(asd.id)
if asd.send_errors:
self.send_transmission_error(asd.id, err)
else:
self.pump_send_chunks(asd)
elif cmd.action is Action.status:
self.drop_send(asd.id)
return
if not asd.accepted: