diff --git a/kittens/transfer/receive.py b/kittens/transfer/receive.py index 2b51470a9..eed061c30 100644 --- a/kittens/transfer/receive.py +++ b/kittens/transfer/receive.py @@ -9,13 +9,14 @@ from contextlib import suppress from enum import auto from itertools import count from time import monotonic -from typing import Deque, Dict, Iterator, List, Optional +from typing import IO, Deque, Dict, Iterator, List, Optional, Union from kitty.cli_stub import TransferCLIOptions from kitty.fast_data_types import FILE_TRANSFER_CODE, wcswidth from kitty.file_transmission import ( - Action, Compression, FileTransmissionCommand, FileType, NameReprEnum, - TransmissionType, encode_bypass, split_for_transfer + Action, Compression, FileTransmissionCommand, FileType, + IdentityDecompressor, NameReprEnum, TransmissionType, ZlibDecompressor, + encode_bypass, split_for_transfer ) from kitty.typing import KeyEventType from kitty.utils import sanitize_control_codes @@ -25,7 +26,7 @@ from ..tui.loop import Loop, debug from ..tui.operations import styled, without_line_wrap from ..tui.spinners import Spinner from ..tui.utils import human_size -from .librsync import signature_of_file +from .librsync import PatchFile, signature_of_file from .send import Transfer from .utils import ( expand_home, random_id, render_progress_in_width, safe_divide, @@ -47,6 +48,7 @@ class File: def __init__(self, ftc: FileTransmissionCommand): self.expected_size = ftc.size + self.expect_diff = False self.transmit_started_at = self.done_at = 0. self.transmitted_bytes = 0 self.ftype = ftc.ftype @@ -61,26 +63,31 @@ class File: self.expanded_local_path = '' self.file_id = str(next(file_counter)) self.compression_capable = self.ftype is FileType.regular and self.expected_size > 4096 and should_be_compressed(self.expanded_local_path) + self.decompressor: Union[ZlibDecompressor, IdentityDecompressor] = ZlibDecompressor() if self.compression_capable else IdentityDecompressor() self.remote_symlink_value = b'' - self.local_write_started = False + self.actual_file: Union[None, PatchFile, IO[bytes]] = None def __repr__(self) -> str: return f'File(rpath={self.remote_path!r}, lpath={self.expanded_local_path!r})' def write_data(self, data: bytes, is_last: bool) -> int: + data = self.decompressor(data, is_last) if self.ftype is FileType.symlink: self.remote_symlink_value += data return 0 if self.ftype is FileType.regular: - if not self.local_write_started: + if self.actual_file is None: parent = os.path.dirname(self.expanded_local_path) if parent: os.makedirs(parent, exist_ok=True) - self.local_write_started = True - with open(self.expanded_local_path, 'ab') as f: - base = f.tell() - f.write(data) - return f.tell() - base + self.actual_file = PatchFile(self.expanded_local_path) if self.expect_diff else open(self.expanded_local_path, 'wb') + base = self.actual_file.tell() + self.actual_file.write(data) + ans = self.actual_file.tell() - base + if is_last: + self.actual_file.close() + self.actual_file = None + return ans return 0 def apply_metadata(self) -> None: @@ -289,6 +296,7 @@ class Manager: compression=Compression.zlib if f.compression_capable else Compression.none ).serialize() if read_signature: + f.expect_diff = True fs = signature_of_file(f.expanded_local_path) for chunk in fs: for data in split_for_transfer(chunk, file_id=f.file_id): diff --git a/kitty/file_transmission.py b/kitty/file_transmission.py index bb64ad98d..454612a6a 100644 --- a/kitty/file_transmission.py +++ b/kitty/file_transmission.py @@ -19,7 +19,9 @@ from typing import ( Optional, Tuple, Union, cast ) -from kittens.transfer.librsync import PatchFile, signature_of_file +from kittens.transfer.librsync import ( + LoadSignature, PatchFile, delta_for_file, signature_of_file +) from kittens.transfer.utils import ( IdentityCompressor, ZlibCompressor, abspath, expand_home, home_path ) @@ -551,6 +553,8 @@ class SourceFile: self.open_file = open(self.path, 'rb') if ftc.compression is Compression.zlib: self.compressor = ZlibCompressor() + self.signature_loader = LoadSignature() if self.waiting_for_signature else None + self.delta_loader: Optional[Iterator[memoryview]] = None @property def ready_to_transmit(self) -> bool: @@ -560,14 +564,23 @@ class SourceFile: if hasattr(self, 'open_file'): self.open_file.close() del self.open_file + self.signature_loader = None + self.delta_loader = None def next_chunk(self, sz: int = 1024 * 1024) -> Tuple[bytes, int]: if hasattr(self, 'target'): self.transmitted = True return self.target.encode('utf-8'), len(self.target) - data = self.open_file.read(sz) - if not data or self.open_file.tell() >= self.stat.st_size: - self.transmitted = True + if self.delta_loader is None: + data = self.open_file.read(sz) + if not data or self.open_file.tell() >= self.stat.st_size: + self.transmitted = True + else: + try: + data = next(self.delta_loader) + except StopIteration: + self.transmitted = True + data = b'' uncompressed_sz = len(data) cchunk = self.compressor.compress(data) if self.transmitted and not isinstance(self.compressor, IdentityCompressor): @@ -592,7 +605,7 @@ class ActiveSend: self.send_errors = quiet < 2 self.last_activity_at = monotonic() self.file_specs: List[Tuple[str, str]] = [] - self.queued_files: List[SourceFile] = [] + self.queued_files_map: Dict[str, SourceFile] = {} self.active_file: Optional[SourceFile] = None self.pending_chunks: Deque[FileTransmissionCommand] = deque() self.metadata_sent = False @@ -609,9 +622,23 @@ class ActiveSend: def add_send_file(self, cmd: FileTransmissionCommand) -> None: self.last_activity_at = monotonic() - if len(self.queued_files) > 32768: + if len(self.queued_files_map) > 32768: raise TransmissionError(ErrorCode.EINVAL, 'Too many queued files') - self.queued_files.append(SourceFile(cmd)) + self.queued_files_map[cmd.file_id] = SourceFile(cmd) + + def add_signature_data(self, cmd: FileTransmissionCommand) -> None: + self.last_activity_at = monotonic() + af = self.queued_files_map.get(cmd.file_id) + if af is None: + raise TransmissionError(ErrorCode.EINVAL, f'Signature data for unknown file_id: {cmd.file_id}') + sl = af.signature_loader + if sl is None: + raise TransmissionError(ErrorCode.EINVAL, f'Signature data for file that is not using rsync: {cmd.file_id}') + sl.add_chunk(cmd.data) + if cmd.action is Action.end_data: + sl.commit() + af.waiting_for_signature = False + af.delta_loader = delta_for_file(af.path, sl.signature) @property def is_expired(self) -> bool: @@ -628,13 +655,13 @@ class ActiveSend: return self.pending_chunks.popleft() af = self.active_file if af is None: - for f in self.queued_files: + for f in self.queued_files_map.values(): if f.ready_to_transmit: self.active_file = af = f break if af is None: return None - self.queued_files.remove(af) + self.queued_files_map.pop(af.file_id, None) chunk, uncompressed_sz = af.next_chunk() if af.transmitted: self.active_file = None @@ -747,7 +774,16 @@ class FileTransmission: if asd.spec_complete and asd.accepted: self.send_metadata_for_send_transfer(asd) return - if cmd.action is Action.status: + if cmd.action in (Action.data, Action.end_data): + try: + asd.add_signature_data(cmd) + except TransmissionError as err: + self.drop_send(asd.id) + if asd.send_errors: + self.send_transmission_error(asd.id, err) + else: + self.pump_send_chunks(asd) + elif cmd.action is Action.status: self.drop_send(asd.id) return if not asd.accepted: