mirror of
https://github.com/kovidgoyal/kitty
synced 2026-06-06 09:15:57 +02:00
Remove python send/receive implementations
This commit is contained in:
@@ -9,9 +9,6 @@ from typing import List, Tuple
|
||||
from kitty.cli import parse_args
|
||||
from kitty.cli_stub import TransferCLIOptions
|
||||
|
||||
from .receive import receive_main
|
||||
from .send import send_main
|
||||
|
||||
usage = 'source_files_or_directories destination_path'
|
||||
help_text = 'Transfer files over the TTY device'
|
||||
|
||||
@@ -83,16 +80,7 @@ def read_bypass(loc: str) -> str:
|
||||
|
||||
|
||||
def main(args: List[str]) -> None:
|
||||
cli_opts, items = parse_transfer_args(args)
|
||||
if cli_opts.permissions_bypass:
|
||||
cli_opts.permissions_bypass = read_bypass(cli_opts.permissions_bypass).strip()
|
||||
|
||||
if not items:
|
||||
raise SystemExit('Usage: kitty +kitten transfer file_or_directory ...')
|
||||
if cli_opts.direction == 'send':
|
||||
send_main(cli_opts, items)
|
||||
else:
|
||||
receive_main(cli_opts, items)
|
||||
raise SystemExit('This should be run as kitten transfer')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -1,664 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# License: GPLv3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
import os
|
||||
import posixpath
|
||||
from asyncio import TimerHandle
|
||||
from collections import deque
|
||||
from contextlib import suppress
|
||||
from enum import auto
|
||||
from itertools import count
|
||||
from time import monotonic
|
||||
from typing import IO, Deque, Dict, Iterator, List, Optional, Union
|
||||
|
||||
from kitty.cli_stub import TransferCLIOptions
|
||||
from kitty.fast_data_types import FILE_TRANSFER_CODE, wcswidth
|
||||
from kitty.file_transmission import (
|
||||
Action,
|
||||
Compression,
|
||||
FileTransmissionCommand,
|
||||
FileType,
|
||||
IdentityDecompressor,
|
||||
NameReprEnum,
|
||||
TransmissionType,
|
||||
ZlibDecompressor,
|
||||
encode_bypass,
|
||||
split_for_transfer,
|
||||
)
|
||||
from kitty.typing import KeyEventType, ScreenSize
|
||||
from kitty.utils import sanitize_control_codes
|
||||
|
||||
from ..tui.handler import Handler
|
||||
from ..tui.loop import Loop, debug
|
||||
from ..tui.operations import styled, without_line_wrap
|
||||
from ..tui.spinners import Spinner
|
||||
from ..tui.utils import human_size
|
||||
from .send import Transfer
|
||||
from .utils import expand_home, print_rsync_stats, random_id, render_progress_in_width, safe_divide, should_be_compressed
|
||||
|
||||
debug
|
||||
file_counter = count(1)
|
||||
|
||||
|
||||
class State(NameReprEnum):
|
||||
waiting_for_permission = auto()
|
||||
waiting_for_file_metadata = auto()
|
||||
transferring = auto()
|
||||
canceled = auto()
|
||||
|
||||
|
||||
class File:
|
||||
|
||||
def __init__(self, ftc: FileTransmissionCommand):
|
||||
self.expected_size = ftc.size
|
||||
self.expect_diff = False
|
||||
self.transmit_started_at = self.done_at = 0.
|
||||
self.written_bytes = 0
|
||||
self.received_bytes = 0
|
||||
self.sent_bytes = 0
|
||||
self.ftype = ftc.ftype
|
||||
self.mtime = ftc.mtime
|
||||
self.spec_id = int(ftc.file_id)
|
||||
self.permissions = ftc.permissions
|
||||
self.remote_path = ftc.name
|
||||
self.display_name = sanitize_control_codes(self.remote_path)
|
||||
self.remote_id = ftc.status
|
||||
self.remote_target = ftc.data.decode('utf-8')
|
||||
self.parent = ftc.parent
|
||||
self.expanded_local_path = ''
|
||||
self.file_id = str(next(file_counter))
|
||||
compression_capable = self.ftype is FileType.regular and self.expected_size > 4096 and should_be_compressed(self.remote_path)
|
||||
self.decompressor: Union[ZlibDecompressor, IdentityDecompressor] = ZlibDecompressor() if compression_capable else IdentityDecompressor()
|
||||
self.remote_symlink_value = b''
|
||||
self.actual_file: Union[None, PatchFile, IO[bytes]] = None
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'File(rpath={self.remote_path!r}, lpath={self.expanded_local_path!r})'
|
||||
|
||||
def close(self) -> None:
|
||||
if self.actual_file is not None:
|
||||
self.actual_file.close()
|
||||
self.actual_file = None
|
||||
|
||||
def write_data(self, data: bytes, is_last: bool) -> int:
|
||||
self.received_bytes += len(data)
|
||||
data = self.decompressor(data, is_last)
|
||||
if self.ftype is FileType.symlink:
|
||||
self.remote_symlink_value += data
|
||||
return 0
|
||||
if self.ftype is FileType.regular:
|
||||
if self.actual_file is None:
|
||||
parent = os.path.dirname(self.expanded_local_path)
|
||||
if parent:
|
||||
os.makedirs(parent, exist_ok=True)
|
||||
self.actual_file = PatchFile(self.expanded_local_path) if self.expect_diff else open(self.expanded_local_path, 'wb')
|
||||
base = self.actual_file.tell()
|
||||
if data:
|
||||
self.actual_file.write(data)
|
||||
ans = self.actual_file.tell() - base
|
||||
if is_last:
|
||||
self.actual_file.close()
|
||||
self.actual_file = None
|
||||
return ans
|
||||
return 0
|
||||
|
||||
def apply_metadata(self) -> None:
|
||||
if self.ftype is FileType.symlink:
|
||||
with suppress(NotImplementedError):
|
||||
os.chmod(self.expanded_local_path, self.permissions, follow_symlinks=False)
|
||||
os.utime(self.expanded_local_path, ns=(self.mtime, self.mtime), follow_symlinks=False)
|
||||
else:
|
||||
os.chmod(self.expanded_local_path, self.permissions)
|
||||
os.utime(self.expanded_local_path, ns=(self.mtime, self.mtime))
|
||||
|
||||
|
||||
class TreeNode:
|
||||
|
||||
def __init__(self, file: File, local_name: str, parent: Optional['TreeNode'] = None):
|
||||
self.entry = file
|
||||
self.entry.expanded_local_path = local_name
|
||||
self.parent = parent
|
||||
self.added_files: Dict[int, TreeNode] = {}
|
||||
|
||||
def add_child(self, file: File) -> 'TreeNode':
|
||||
q = self.added_files.get(id(file))
|
||||
if q is not None:
|
||||
return q
|
||||
c = TreeNode(file, os.path.join(self.entry.expanded_local_path, os.path.basename(file.remote_path)), self)
|
||||
self.added_files[id(file)] = c
|
||||
return c
|
||||
|
||||
def __iter__(self) -> Iterator['TreeNode']:
|
||||
for c in self.added_files.values():
|
||||
yield c
|
||||
yield from c
|
||||
|
||||
|
||||
def make_tree(all_files: List[File], local_base: str) -> TreeNode:
|
||||
fid_map = {f.remote_id: f for f in all_files}
|
||||
node_map: Dict[str, TreeNode] = {}
|
||||
root_node = TreeNode(File(FileTransmissionCommand(file_id='-1')), local_base)
|
||||
|
||||
def ensure_parent(f: File) -> TreeNode:
|
||||
if not f.parent:
|
||||
return root_node
|
||||
parent = node_map.get(f.parent)
|
||||
if parent is None:
|
||||
fp = fid_map[f.parent]
|
||||
gp = ensure_parent(fp)
|
||||
parent = gp.add_child(fp)
|
||||
return parent
|
||||
|
||||
for f in all_files:
|
||||
p = ensure_parent(f)
|
||||
p.add_child(f)
|
||||
return root_node
|
||||
|
||||
|
||||
def files_for_receive(cli_opts: TransferCLIOptions, dest: str, files: List[File], remote_home: str, specs: List[str]) -> Iterator[File]:
|
||||
spec_map: Dict[int, List[File]] = {i: [] for i in range(len(specs))}
|
||||
for f in files:
|
||||
spec_map[f.spec_id].append(f)
|
||||
spec_paths = [spec_map[i][0].remote_path for i in range(len(specs))]
|
||||
if cli_opts.mode == 'mirror':
|
||||
try:
|
||||
common_path = posixpath.commonpath(spec_paths)
|
||||
except ValueError:
|
||||
common_path = ''
|
||||
home = remote_home.rstrip('/')
|
||||
if common_path and common_path.startswith(home + '/'):
|
||||
spec_paths = [posixpath.join('~', posixpath.relpath(x, home)) for x in spec_paths]
|
||||
for spec_id, files_for_spec in spec_map.items():
|
||||
spec = spec_paths[spec_id]
|
||||
tree = make_tree(files_for_spec, os.path.dirname(expand_home(spec)))
|
||||
for x in tree:
|
||||
yield x.entry
|
||||
else:
|
||||
number_of_source_files = sum(map(len, spec_map.values()))
|
||||
dest_is_dir = dest[-1] in (os.sep, os.altsep) or number_of_source_files > 1 or os.path.isdir(dest)
|
||||
for spec_id, files_for_spec in spec_map.items():
|
||||
if dest_is_dir:
|
||||
dest_path = os.path.join(dest, posixpath.basename(files_for_spec[0].remote_path))
|
||||
tree = make_tree(files_for_spec, os.path.dirname(expand_home(dest_path)))
|
||||
for x in tree:
|
||||
yield x.entry
|
||||
else:
|
||||
f = files_for_spec[0]
|
||||
f.expanded_local_path = dest
|
||||
yield f
|
||||
|
||||
|
||||
class ProgressTracker:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.total_size_of_all_files = 0
|
||||
self.total_bytes_to_transfer = 0
|
||||
self.active_file: Optional[File] = None
|
||||
self.total_transferred = 0
|
||||
self.transfers: Deque[Transfer] = deque()
|
||||
self.transfered_stats_amt = 0
|
||||
self.transfered_stats_interval = 0.
|
||||
self.started_at = 0.
|
||||
self.done_files: List[File] = []
|
||||
|
||||
def change_active_file(self, nf: File) -> None:
|
||||
now = monotonic()
|
||||
self.active_file = nf
|
||||
nf.transmit_started_at = now
|
||||
|
||||
def start_transfer(self) -> None:
|
||||
self.transfers.append(Transfer())
|
||||
self.started_at = monotonic()
|
||||
|
||||
def file_written(self, af: File, amt: int, is_done: bool) -> None:
|
||||
if self.active_file is not af:
|
||||
self.change_active_file(af)
|
||||
af.written_bytes += amt
|
||||
self.total_transferred += amt
|
||||
self.transfers.append(Transfer(amt))
|
||||
now = self.transfers[-1].at
|
||||
while len(self.transfers) > 2 and self.transfers[0].is_too_old(now):
|
||||
self.transfers.popleft()
|
||||
self.transfered_stats_interval = now - self.transfers[0].at
|
||||
self.transfered_stats_amt = sum(t.amt for t in self.transfers)
|
||||
if is_done:
|
||||
af.done_at = monotonic()
|
||||
self.done_files.append(af)
|
||||
|
||||
|
||||
class Manager:
|
||||
|
||||
def __init__(
|
||||
self, request_id: str, spec: List[str], dest: str,
|
||||
bypass: Optional[str] = None, use_rsync: bool = False
|
||||
):
|
||||
self.request_id = request_id
|
||||
self.spec = spec
|
||||
self.failed_specs: Dict[int, str] = {}
|
||||
self.spec_counts = dict.fromkeys(range(len(self.spec)), 0)
|
||||
self.dest = dest
|
||||
self.remote_home = ''
|
||||
self.bypass = encode_bypass(request_id, bypass) if bypass else ''
|
||||
self.prefix = f'\x1b]{FILE_TRANSFER_CODE};id={self.request_id};'
|
||||
self.suffix = '\x1b\\'
|
||||
self.state = State.waiting_for_permission
|
||||
self.files: List[File] = []
|
||||
self.progress_tracker = ProgressTracker()
|
||||
self.transfer_done = False
|
||||
self.use_rsync = use_rsync
|
||||
|
||||
@property
|
||||
def finish_code(self) -> str:
|
||||
return FileTransmissionCommand(action=Action.finish).serialize()
|
||||
|
||||
def start_transfer(self) -> Iterator[str]:
|
||||
yield FileTransmissionCommand(action=Action.receive, bypass=self.bypass, size=len(self.spec)).serialize()
|
||||
for i, x in enumerate(self.spec):
|
||||
yield FileTransmissionCommand(action=Action.file, file_id=str(i), name=x).serialize()
|
||||
self.progress_tracker.start_transfer()
|
||||
|
||||
def finalize_transfer(self) -> str:
|
||||
self.transfer_done = True
|
||||
rid_map = {f.remote_id: f for f in self.files}
|
||||
for f in self.files:
|
||||
if f.ftype is FileType.directory:
|
||||
try:
|
||||
os.makedirs(f.expanded_local_path, exist_ok=True)
|
||||
except OSError as err:
|
||||
return f'Failed to create directory with error: {err}'
|
||||
elif f.ftype is FileType.link:
|
||||
target = rid_map.get(f.remote_target)
|
||||
if target is None:
|
||||
return f'Hard link with remote id: {f.remote_target} not found'
|
||||
try:
|
||||
os.makedirs(os.path.dirname(f.expanded_local_path), exist_ok=True)
|
||||
with suppress(FileNotFoundError):
|
||||
os.remove(f.expanded_local_path)
|
||||
os.link(target.expanded_local_path, f.expanded_local_path)
|
||||
except OSError as err:
|
||||
return f'Failed to create hardlink with error: {err}'
|
||||
elif f.ftype is FileType.symlink:
|
||||
if f.remote_target:
|
||||
target = rid_map.get(f.remote_target)
|
||||
if target is None:
|
||||
return f'Symbolic link with remote id: {f.remote_target} not found'
|
||||
lt = target.expanded_local_path
|
||||
if not f.remote_symlink_value.startswith(b'/'):
|
||||
lt = os.path.relpath(lt, os.path.dirname(f.expanded_local_path))
|
||||
else:
|
||||
lt = f.remote_symlink_value.decode('utf-8')
|
||||
with suppress(FileNotFoundError):
|
||||
os.remove(f.expanded_local_path)
|
||||
try:
|
||||
os.symlink(lt, f.expanded_local_path)
|
||||
except OSError as err:
|
||||
return f'Failed to create symlink with error: {err}'
|
||||
with suppress(OSError):
|
||||
f.apply_metadata()
|
||||
return ''
|
||||
|
||||
def request_files(self) -> Iterator[str]:
|
||||
for f in self.files:
|
||||
if f.ftype is FileType.directory or (f.ftype is FileType.link and f.remote_target):
|
||||
continue
|
||||
read_signature = self.use_rsync
|
||||
if read_signature and f.ftype is FileType.regular:
|
||||
try:
|
||||
sr = os.stat(f.expanded_local_path, follow_symlinks=False)
|
||||
except OSError:
|
||||
read_signature = False
|
||||
else:
|
||||
read_signature = sr.st_size > 4096
|
||||
yield FileTransmissionCommand(
|
||||
action=Action.file, name=f.remote_path, file_id=f.file_id, ttype=TransmissionType.rsync if read_signature else TransmissionType.simple,
|
||||
compression=Compression.zlib if isinstance(f.decompressor, ZlibDecompressor) else Compression.none
|
||||
).serialize()
|
||||
if read_signature:
|
||||
f.expect_diff = True
|
||||
fs = signature_of_file(f.expanded_local_path)
|
||||
for chunk in fs:
|
||||
f.sent_bytes += len(chunk)
|
||||
for data in split_for_transfer(chunk, file_id=f.file_id):
|
||||
yield data.serialize()
|
||||
yield FileTransmissionCommand(file_id=f.file_id, action=Action.end_data).serialize()
|
||||
|
||||
def collect_files(self, cli_opts: TransferCLIOptions) -> None:
|
||||
self.files = list(files_for_receive(cli_opts, self.dest, self.files, self.remote_home, self.spec))
|
||||
self.files_to_be_transferred = {f.file_id: f for f in self.files if f.ftype not in (FileType.directory, FileType.link)}
|
||||
self.progress_tracker.total_size_of_all_files = sum(max(0, f.expected_size) for f in self.files_to_be_transferred.values())
|
||||
self.progress_tracker.total_bytes_to_transfer = self.progress_tracker.total_size_of_all_files
|
||||
|
||||
def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> str:
|
||||
if self.state is State.waiting_for_permission:
|
||||
if ftc.action is Action.status:
|
||||
if ftc.status == 'OK':
|
||||
self.state = State.waiting_for_file_metadata
|
||||
else:
|
||||
return 'Permission for transfer denied'
|
||||
else:
|
||||
return f'Unexpected response from terminal: {ftc}'
|
||||
elif self.state is State.waiting_for_file_metadata:
|
||||
if ftc.action is Action.status:
|
||||
if ftc.file_id:
|
||||
try:
|
||||
fid = int(ftc.file_id)
|
||||
except Exception:
|
||||
return f'Unexpected response from terminal: {ftc}'
|
||||
if fid < 0 or fid >= len(self.spec):
|
||||
return f'Unexpected response from terminal: {ftc}'
|
||||
self.failed_specs[fid] = ftc.status
|
||||
else:
|
||||
if ftc.status == 'OK':
|
||||
self.state = State.transferring
|
||||
self.remote_home = ftc.name
|
||||
return ''
|
||||
else:
|
||||
return ftc.status
|
||||
elif ftc.action is Action.file:
|
||||
try:
|
||||
fid = int(ftc.file_id)
|
||||
except Exception:
|
||||
return f'Unexpected response from terminal: {ftc}'
|
||||
if fid < 0 or fid >= len(self.spec):
|
||||
return f'Unexpected response from terminal: {ftc}'
|
||||
self.spec_counts[fid] += 1
|
||||
self.files.append(File(ftc))
|
||||
else:
|
||||
return f'Unexpected response from terminal: {ftc}'
|
||||
elif self.state is State.transferring:
|
||||
if ftc.action in (Action.data, Action.end_data):
|
||||
f = self.files_to_be_transferred.get(ftc.file_id)
|
||||
if f is None:
|
||||
return f'Got data for unknown file id: {ftc.file_id}'
|
||||
is_last = ftc.action is Action.end_data
|
||||
try:
|
||||
amt_written = f.write_data(ftc.data, is_last)
|
||||
except Exception as err:
|
||||
return str(err)
|
||||
self.progress_tracker.file_written(f, amt_written, is_last)
|
||||
if is_last:
|
||||
del self.files_to_be_transferred[ftc.file_id]
|
||||
if not self.files_to_be_transferred:
|
||||
return self.finalize_transfer()
|
||||
return ''
|
||||
|
||||
|
||||
class Receive(Handler):
|
||||
use_alternate_screen = False
|
||||
|
||||
def __init__(self, cli_opts: TransferCLIOptions, spec: List[str], dest: str = ''):
|
||||
self.cli_opts = cli_opts
|
||||
self.manager = Manager(random_id(), spec, dest, bypass=cli_opts.permissions_bypass, use_rsync=cli_opts.transmit_deltas)
|
||||
self.quit_after_write_code: Optional[int] = None
|
||||
self.check_paths_printed = False
|
||||
self.transmit_started = False
|
||||
self.max_name_length = 0
|
||||
self.spinner = Spinner()
|
||||
self.progress_update_call: Optional[TimerHandle] = None
|
||||
self.progress_drawn = False
|
||||
self.transmit_iterator: Optional[Iterator[str]] = None
|
||||
|
||||
def send_payload(self, payload: str) -> None:
|
||||
self.write(self.manager.prefix)
|
||||
self.write(payload)
|
||||
self.write(self.manager.suffix)
|
||||
|
||||
def initialize(self) -> None:
|
||||
self.cmd.set_cursor_visible(False)
|
||||
self.print('Scanning files…')
|
||||
for x in self.manager.start_transfer():
|
||||
self.send_payload(x)
|
||||
|
||||
def finalize(self) -> None:
|
||||
self.cmd.set_cursor_visible(True)
|
||||
|
||||
def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None:
|
||||
if ftc.id != self.manager.request_id:
|
||||
return
|
||||
if ftc.status == 'CANCELED' and ftc.action is Action.status:
|
||||
self.quit_loop(1)
|
||||
return
|
||||
if self.quit_after_write_code is not None or self.manager.state is State.canceled:
|
||||
return
|
||||
transfer_started = self.manager.state is State.transferring
|
||||
err = self.manager.on_file_transfer_response(ftc)
|
||||
if err:
|
||||
self.print_err(err)
|
||||
self.print('Waiting to ensure terminal cancels transfer, will quit in a few seconds')
|
||||
self.abort_transfer()
|
||||
return
|
||||
if not transfer_started and self.manager.state is State.transferring:
|
||||
if self.manager.failed_specs:
|
||||
self.print_err('Failed to process some sources')
|
||||
for spec_id, msg in self.manager.failed_specs.items():
|
||||
spec = self.manager.spec[spec_id]
|
||||
self.print(f'{spec}: {msg}')
|
||||
self.quit_loop(1)
|
||||
return
|
||||
if 0 in self.manager.spec_counts.values():
|
||||
self.print_err('No matches found for: ' + ', '.join(self.manager.spec[k] for k, v in self.manager.spec_counts.items() if v == 0))
|
||||
self.quit_loop(1)
|
||||
return
|
||||
self.manager.collect_files(self.cli_opts)
|
||||
if self.cli_opts.confirm_paths:
|
||||
self.confirm_paths()
|
||||
else:
|
||||
self.start_transfer()
|
||||
if self.manager.transfer_done:
|
||||
self.send_payload(self.manager.finish_code)
|
||||
self.quit_after_write_code = 0
|
||||
self.refresh_progress()
|
||||
elif self.transmit_started:
|
||||
self.refresh_progress()
|
||||
|
||||
def confirm_paths(self) -> None:
|
||||
self.print_check_paths()
|
||||
|
||||
def print_check_paths(self) -> None:
|
||||
if self.check_paths_printed:
|
||||
return
|
||||
self.check_paths_printed = True
|
||||
self.print('The following file transfers will be performed. A red destination means an existing file will be overwritten.')
|
||||
for df in self.manager.files:
|
||||
self.cmd.styled(df.ftype.short_text, fg=df.ftype.color)
|
||||
self.print(end=' ')
|
||||
self.print(df.display_name, '→', end=' ')
|
||||
self.cmd.styled(df.expanded_local_path, fg='red' if os.path.lexists(df.expanded_local_path) else None)
|
||||
self.print()
|
||||
self.print(f'Transferring {len(self.manager.files)} file(s) of total size: {human_size(self.manager.progress_tracker.total_size_of_all_files)}')
|
||||
self.print()
|
||||
self.print_continue_msg()
|
||||
|
||||
def on_text(self, text: str, in_bracketed_paste: bool = False) -> None:
|
||||
if self.quit_after_write_code is not None:
|
||||
return
|
||||
if self.check_paths_printed and not self.transmit_started:
|
||||
if text.lower() == 'y':
|
||||
self.start_transfer()
|
||||
return
|
||||
if text.lower() == 'n':
|
||||
self.abort_transfer()
|
||||
self.print('Sending cancel request to terminal')
|
||||
return
|
||||
self.print_continue_msg()
|
||||
|
||||
def on_key(self, key_event: KeyEventType) -> None:
|
||||
if self.quit_after_write_code is not None:
|
||||
return
|
||||
if key_event.matches('esc'):
|
||||
if self.check_paths_printed and not self.transmit_started:
|
||||
self.abort_transfer()
|
||||
self.print('Sending cancel request to terminal')
|
||||
else:
|
||||
self.on_interrupt()
|
||||
|
||||
def print_continue_msg(self) -> None:
|
||||
self.print(
|
||||
'Press', styled('y', fg='green', bold=True, fg_intense=True), 'to continue or',
|
||||
styled('n', fg='red', bold=True, fg_intense=True), 'to abort')
|
||||
|
||||
def start_transfer(self) -> None:
|
||||
self.transmit_started = True
|
||||
n = len(self.manager.files)
|
||||
msg = 'Transmitting signature of' if self.manager.use_rsync else 'Queueing transfer of'
|
||||
msg += ' one file' if n == 1 else f'{n} files'
|
||||
self.print(msg)
|
||||
names = (f.display_name for f in self.manager.files)
|
||||
self.max_name_length = max(6, max(map(wcswidth, names)))
|
||||
self.transmit_iterator = self.manager.request_files()
|
||||
self.transmit_one()
|
||||
|
||||
def transmit_one(self) -> None:
|
||||
if self.transmit_iterator is None:
|
||||
return
|
||||
try:
|
||||
data = next(self.transmit_iterator)
|
||||
except StopIteration:
|
||||
self.transmit_iterator = None
|
||||
except Exception as err:
|
||||
self.print_err(str(err))
|
||||
self.print('Waiting to ensure terminal cancels transfer, will quit in a few seconds')
|
||||
self.abort_transfer()
|
||||
else:
|
||||
self.send_payload(data)
|
||||
|
||||
def print_err(self, msg: str) -> None:
|
||||
self.cmd.styled(msg, fg='red')
|
||||
self.print()
|
||||
|
||||
def on_term(self) -> None:
|
||||
if self.quit_after_write_code is not None:
|
||||
return
|
||||
self.print_err('Terminate requested, cancelling transfer, transferred files are in undefined state')
|
||||
self.abort_transfer(delay=2)
|
||||
|
||||
def on_interrupt(self) -> None:
|
||||
if self.quit_after_write_code is not None:
|
||||
return
|
||||
if self.manager.state is State.canceled:
|
||||
self.print('Waiting for canceled acknowledgement from terminal, will abort in a few seconds if no response received')
|
||||
return
|
||||
self.print_err('Interrupt requested, cancelling transfer, transferred files are in undefined state')
|
||||
self.abort_transfer()
|
||||
|
||||
def abort_transfer(self, delay: float = 5) -> None:
|
||||
self.send_payload(FileTransmissionCommand(action=Action.cancel).serialize())
|
||||
self.manager.state = State.canceled
|
||||
self.asyncio_loop.call_later(delay, self.quit_loop, 1)
|
||||
|
||||
def render_progress(
|
||||
self, name: str, spinner_char: str = ' ', bytes_so_far: int = 0, total_bytes: int = 0,
|
||||
secs_so_far: float = 0., bytes_per_sec: float = 0., is_complete: bool = False
|
||||
) -> None:
|
||||
if is_complete:
|
||||
bytes_so_far = total_bytes
|
||||
self.write(render_progress_in_width(
|
||||
name, width=self.screen_size.cols, max_path_length=self.max_name_length, spinner_char=spinner_char,
|
||||
bytes_so_far=bytes_so_far, total_bytes=total_bytes, secs_so_far=secs_so_far,
|
||||
bytes_per_sec=bytes_per_sec, is_complete=is_complete
|
||||
))
|
||||
|
||||
def draw_progress_for_current_file(self, af: File, spinner_char: str = ' ', is_complete: bool = False) -> None:
|
||||
p = self.manager.progress_tracker
|
||||
now = monotonic()
|
||||
self.render_progress(
|
||||
af.display_name, spinner_char=spinner_char, is_complete=is_complete,
|
||||
bytes_so_far=af.written_bytes, total_bytes=af.expected_size,
|
||||
secs_so_far=(af.done_at or now) - af.transmit_started_at,
|
||||
bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval)
|
||||
)
|
||||
|
||||
def erase_progress(self) -> None:
|
||||
if self.progress_drawn:
|
||||
self.cmd.move_cursor_by(2, 'up')
|
||||
self.write('\r')
|
||||
self.cmd.clear_to_end_of_screen()
|
||||
self.progress_drawn = False
|
||||
|
||||
def refresh_progress(self) -> None:
|
||||
self.erase_progress()
|
||||
self.draw_progress()
|
||||
|
||||
def schedule_progress_update(self, delay: float = 0.1) -> None:
|
||||
if self.progress_update_call is None:
|
||||
self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress)
|
||||
elif self.asyncio_loop.time() + delay < self.progress_update_call.when():
|
||||
self.progress_update_call.cancel()
|
||||
self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress)
|
||||
|
||||
@Handler.atomic_update
|
||||
def draw_progress(self) -> None:
|
||||
if self.manager.state is State.canceled:
|
||||
return
|
||||
with without_line_wrap(self.write):
|
||||
for df in self.manager.progress_tracker.done_files:
|
||||
sc = styled('✔', fg='green')
|
||||
if df.ftype is FileType.regular:
|
||||
self.draw_progress_for_current_file(df, spinner_char=sc, is_complete=True)
|
||||
else:
|
||||
self.write(f'{sc} {df.display_name} {styled(df.ftype.name, dim=True, italic=True)}')
|
||||
self.print()
|
||||
del self.manager.progress_tracker.done_files[:]
|
||||
is_complete = self.quit_after_write_code is not None
|
||||
if is_complete:
|
||||
sc = styled('✔', fg='green') if self.quit_after_write_code == 0 else styled('✘', fg='red')
|
||||
else:
|
||||
sc = self.spinner()
|
||||
p = self.manager.progress_tracker
|
||||
now = monotonic()
|
||||
if is_complete:
|
||||
self.cmd.repeat('─', self.screen_size.width)
|
||||
else:
|
||||
af = p.active_file
|
||||
if af is not None:
|
||||
self.draw_progress_for_current_file(af, spinner_char=sc)
|
||||
self.print()
|
||||
if p.total_transferred > 0:
|
||||
self.render_progress(
|
||||
'Total', spinner_char=sc,
|
||||
bytes_so_far=p.total_transferred, total_bytes=p.total_bytes_to_transfer,
|
||||
secs_so_far=now - p.started_at, is_complete=is_complete,
|
||||
bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval)
|
||||
)
|
||||
else:
|
||||
self.print('File data transfer has not yet started', end='')
|
||||
self.print()
|
||||
self.schedule_progress_update(self.spinner.interval)
|
||||
self.progress_drawn = True
|
||||
|
||||
def on_writing_finished(self) -> None:
|
||||
if self.quit_after_write_code is not None:
|
||||
self.quit_loop(self.quit_after_write_code)
|
||||
elif self.transmit_iterator is not None:
|
||||
self.transmit_one()
|
||||
|
||||
def on_resize(self, screen_size: ScreenSize) -> None:
|
||||
super().on_resize(screen_size)
|
||||
if self.progress_drawn:
|
||||
self.refresh_progress()
|
||||
|
||||
|
||||
def receive_main(cli_opts: TransferCLIOptions, args: List[str]) -> None:
|
||||
dest = ''
|
||||
if cli_opts.mode == 'mirror':
|
||||
if len(args) < 1:
|
||||
raise SystemExit('Must specify at least one file to transfer')
|
||||
spec = list(args)
|
||||
else:
|
||||
if len(args) < 2:
|
||||
raise SystemExit('Must specify at least one source and a destination file to transfer')
|
||||
spec, dest = args[:-1], args[-1]
|
||||
|
||||
loop = Loop()
|
||||
handler = Receive(cli_opts, spec, dest)
|
||||
loop.loop(handler)
|
||||
for f in handler.manager.files:
|
||||
f.close()
|
||||
tsf = dsz = ssz = 0
|
||||
for f in handler.manager.files:
|
||||
if f.expect_diff:
|
||||
tsf += f.expected_size
|
||||
dsz += f.received_bytes
|
||||
ssz += f.sent_bytes
|
||||
if tsf and dsz + ssz:
|
||||
print_rsync_stats(tsf, dsz, ssz)
|
||||
@@ -1,751 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# License: GPLv3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
|
||||
import os
|
||||
import stat
|
||||
from asyncio import TimerHandle
|
||||
from collections import deque
|
||||
from enum import auto
|
||||
from itertools import count
|
||||
from time import monotonic
|
||||
from typing import IO, Callable, Deque, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Tuple, Union
|
||||
|
||||
from kitty.cli_stub import TransferCLIOptions
|
||||
from kitty.fast_data_types import FILE_TRANSFER_CODE, wcswidth
|
||||
from kitty.file_transmission import Action, Compression, FileTransmissionCommand, FileType, NameReprEnum, TransmissionType, encode_bypass, split_for_transfer
|
||||
from kitty.typing import KeyEventType, ScreenSize
|
||||
from kitty.utils import sanitize_control_codes
|
||||
|
||||
from ..tui.handler import Handler
|
||||
from ..tui.loop import Loop, debug
|
||||
from ..tui.operations import styled, without_line_wrap
|
||||
from ..tui.spinners import Spinner
|
||||
from ..tui.utils import human_size
|
||||
from .utils import (
|
||||
IdentityCompressor,
|
||||
ZlibCompressor,
|
||||
abspath,
|
||||
expand_home,
|
||||
home_path,
|
||||
print_rsync_stats,
|
||||
random_id,
|
||||
render_progress_in_width,
|
||||
safe_divide,
|
||||
should_be_compressed,
|
||||
)
|
||||
|
||||
debug
|
||||
|
||||
|
||||
def get_remote_path(local_path: str, remote_base: str) -> str:
|
||||
if not remote_base:
|
||||
return local_path.replace(os.sep, '/')
|
||||
if remote_base.endswith('/'):
|
||||
return os.path.join(remote_base, os.path.basename(local_path))
|
||||
return remote_base
|
||||
|
||||
|
||||
class FileState(NameReprEnum):
|
||||
waiting_for_start = auto()
|
||||
waiting_for_data = auto()
|
||||
transmitting = auto()
|
||||
finished = auto()
|
||||
acknowledged = auto()
|
||||
|
||||
|
||||
class File:
|
||||
|
||||
def __init__(
|
||||
self, local_path: str, expanded_local_path: str, file_id: int, stat_result: os.stat_result,
|
||||
remote_base: str, file_type: FileType,
|
||||
) -> None:
|
||||
self.state = FileState.waiting_for_start
|
||||
self.local_path = local_path
|
||||
self.display_name = sanitize_control_codes(local_path)
|
||||
self.expanded_local_path = expanded_local_path
|
||||
self.permissions = stat.S_IMODE(stat_result.st_mode)
|
||||
self.mtime = stat_result.st_mtime_ns
|
||||
self.file_size = self.bytes_to_transmit = stat_result.st_size
|
||||
self.file_hash = stat_result.st_dev, stat_result.st_ino
|
||||
self.remote_path = get_remote_path(self.local_path, remote_base)
|
||||
self.remote_path = self.remote_path.replace(os.sep, '/')
|
||||
self.file_id = hex(file_id)[2:]
|
||||
self.hard_link_target = ''
|
||||
self.symbolic_link_target = ''
|
||||
self.stat_result = stat_result
|
||||
self.file_type = file_type
|
||||
self.rsync_capable = self.file_type is FileType.regular and self.file_size > 4096
|
||||
self.compression_capable = self.file_type is FileType.regular and self.file_size > 4096 and should_be_compressed(self.expanded_local_path)
|
||||
self.remote_final_path = ''
|
||||
self.remote_initial_size = -1
|
||||
self.err_msg = ''
|
||||
self.actual_file: Optional[IO[bytes]] = None
|
||||
self.transmitted_bytes = 0
|
||||
self.reported_progress = 0
|
||||
self.transmit_started_at = self.transmit_ended_at = self.done_at = 0.
|
||||
self.signature_loader: Optional[LoadSignature] = None
|
||||
self.delta_loader: Optional[Iterator[memoryview]] = None
|
||||
|
||||
def start_delta_calculation(self) -> None:
|
||||
sl = self.signature_loader
|
||||
assert sl is not None
|
||||
self.state = FileState.transmitting
|
||||
self.delta_loader = delta_for_file(self.expanded_local_path, sl.signature)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'File(name={self.display_name}, ft={self.file_type}, state={self.state})'
|
||||
|
||||
def next_chunk(self, sz: int = 1024 * 1024) -> Tuple[bytes, int]:
|
||||
if self.file_type is FileType.symlink:
|
||||
self.state = FileState.finished
|
||||
ans = self.symbolic_link_target.encode('utf-8')
|
||||
return ans, len(ans)
|
||||
if self.file_type is FileType.link:
|
||||
self.state = FileState.finished
|
||||
ans = self.hard_link_target.encode('utf-8')
|
||||
return ans, len(ans)
|
||||
is_last = False
|
||||
if self.delta_loader is not None:
|
||||
try:
|
||||
chunk: Union[bytes, memoryview] = next(self.delta_loader)
|
||||
except StopIteration:
|
||||
is_last = True
|
||||
self.delta_loader = None
|
||||
chunk = b''
|
||||
else:
|
||||
if self.actual_file is None:
|
||||
self.actual_file = open(self.expanded_local_path, 'rb')
|
||||
chunk = self.actual_file.read(sz)
|
||||
is_last = not chunk or self.actual_file.tell() >= self.file_size
|
||||
uncompressed_sz = len(chunk)
|
||||
cchunk = self.compressor.compress(chunk)
|
||||
if is_last and not isinstance(self.compressor, IdentityCompressor):
|
||||
cchunk += self.compressor.flush()
|
||||
if is_last:
|
||||
self.state = FileState.finished
|
||||
if self.actual_file is not None:
|
||||
self.actual_file.close()
|
||||
self.actual_file = None
|
||||
return cchunk, uncompressed_sz
|
||||
|
||||
def metadata_command(self, use_rsync: bool = False) -> FileTransmissionCommand:
|
||||
self.ttype = TransmissionType.rsync if self.rsync_capable and use_rsync else TransmissionType.simple
|
||||
self.compression = Compression.zlib if self.compression_capable else Compression.none
|
||||
self.compressor: Union[ZlibCompressor, IdentityCompressor] = ZlibCompressor() if self.compression is Compression.zlib else IdentityCompressor()
|
||||
return FileTransmissionCommand(
|
||||
action=Action.file, compression=self.compression, ftype=self.file_type,
|
||||
name=self.remote_path, permissions=self.permissions, mtime=self.mtime,
|
||||
file_id=self.file_id, ttype=self.ttype
|
||||
)
|
||||
|
||||
|
||||
def process(cli_opts: TransferCLIOptions, paths: Iterable[str], remote_base: str, counter: Iterator[int]) -> Iterator[File]:
|
||||
for x in paths:
|
||||
expanded = expand_home(x)
|
||||
try:
|
||||
s = os.stat(expanded, follow_symlinks=False)
|
||||
except OSError as err:
|
||||
raise SystemExit(f'Failed to stat {x} with error: {err}') from err
|
||||
if stat.S_ISDIR(s.st_mode):
|
||||
yield File(x, expanded, next(counter), s, remote_base, FileType.directory)
|
||||
new_remote_base = remote_base
|
||||
if new_remote_base:
|
||||
new_remote_base = new_remote_base.rstrip('/') + '/' + os.path.basename(x) + '/'
|
||||
else:
|
||||
new_remote_base = x.replace(os.sep, '/').rstrip('/') + '/'
|
||||
yield from process(cli_opts, [os.path.join(x, y) for y in os.listdir(expanded)], new_remote_base, counter)
|
||||
elif stat.S_ISLNK(s.st_mode):
|
||||
yield File(x, expanded, next(counter), s, remote_base, FileType.symlink)
|
||||
elif stat.S_ISREG(s.st_mode):
|
||||
yield File(x, expanded, next(counter), s, remote_base, FileType.regular)
|
||||
|
||||
|
||||
def process_mirrored_files(cli_opts: TransferCLIOptions, args: Sequence[str]) -> Iterator[File]:
|
||||
paths = [abspath(x) for x in args]
|
||||
try:
|
||||
common_path = os.path.commonpath(paths)
|
||||
except ValueError:
|
||||
common_path = ''
|
||||
home = home_path().rstrip(os.sep)
|
||||
if common_path and common_path.startswith(home + os.sep):
|
||||
paths = [os.path.join('~', os.path.relpath(x, home)) for x in paths]
|
||||
yield from process(cli_opts, paths, '', count(1))
|
||||
|
||||
|
||||
def process_normal_files(cli_opts: TransferCLIOptions, args: Sequence[str]) -> Iterator[File]:
|
||||
if len(args) < 2:
|
||||
raise SystemExit('Must specify at least one local path and one remote path')
|
||||
args = list(args)
|
||||
remote_base = args.pop().replace(os.sep, '/')
|
||||
if len(args) > 1 and not remote_base.endswith('/'):
|
||||
remote_base += '/'
|
||||
paths = [abspath(x) for x in args]
|
||||
yield from process(cli_opts, paths, remote_base, count(1))
|
||||
|
||||
|
||||
def files_for_send(cli_opts: TransferCLIOptions, args: List[str]) -> Tuple[File, ...]:
|
||||
if cli_opts.mode == 'mirror':
|
||||
files = list(process_mirrored_files(cli_opts, args))
|
||||
else:
|
||||
files = list(process_normal_files(cli_opts, args))
|
||||
groups: Dict[Tuple[int, int], List[File]] = {}
|
||||
|
||||
# detect hard links
|
||||
for f in files:
|
||||
groups.setdefault(f.file_hash, []).append(f)
|
||||
for group in groups.values():
|
||||
if len(group) > 1:
|
||||
for lf in group[1:]:
|
||||
lf.file_type = FileType.link
|
||||
lf.hard_link_target = group[0].file_id
|
||||
|
||||
# detect symlinks to other transferred files
|
||||
for f in tuple(files):
|
||||
if f.file_type is FileType.symlink:
|
||||
try:
|
||||
link_dest = os.readlink(f.local_path)
|
||||
except OSError:
|
||||
files.remove(f)
|
||||
continue
|
||||
f.symbolic_link_target = f'path:{link_dest}'
|
||||
is_abs = os.path.isabs(link_dest)
|
||||
q = link_dest if is_abs else os.path.join(os.path.dirname(f.local_path), link_dest)
|
||||
try:
|
||||
st = os.stat(q)
|
||||
except OSError:
|
||||
pass
|
||||
else:
|
||||
fh = st.st_dev, st.st_ino
|
||||
if fh in groups:
|
||||
g = tuple(x for x in groups[fh] if os.path.samestat(st, x.stat_result))
|
||||
if g:
|
||||
t = g[0]
|
||||
prefix = 'fid_abs' if is_abs else 'fid'
|
||||
f.symbolic_link_target = f'{prefix}:{t.file_id}'
|
||||
return tuple(files)
|
||||
|
||||
|
||||
class SendState(NameReprEnum):
|
||||
waiting_for_permission = auto()
|
||||
permission_granted = auto()
|
||||
permission_denied = auto()
|
||||
canceled = auto()
|
||||
|
||||
|
||||
class Transfer:
|
||||
|
||||
def __init__(self, amt: int = 0):
|
||||
self.amt = amt
|
||||
self.at = monotonic()
|
||||
|
||||
def is_too_old(self, now: float) -> bool:
|
||||
return now - self.at > 30
|
||||
|
||||
|
||||
class ProgressTracker:
|
||||
|
||||
def __init__(self, total_size_of_all_files: int):
|
||||
self.total_size_of_all_files = total_size_of_all_files
|
||||
self.total_bytes_to_transfer = total_size_of_all_files
|
||||
self.active_file: Optional[File] = None
|
||||
self.total_transferred = 0
|
||||
self.transfers: Deque[Transfer] = deque()
|
||||
self.transfered_stats_amt = 0
|
||||
self.transfered_stats_interval = 0.
|
||||
self.started_at = 0.
|
||||
self.signature_bytes = 0
|
||||
self.total_reported_progress = 0
|
||||
|
||||
def change_active_file(self, nf: File) -> None:
|
||||
now = monotonic()
|
||||
self.active_file = nf
|
||||
nf.transmit_started_at = now
|
||||
|
||||
def start_transfer(self) -> None:
|
||||
self.transfers.append(Transfer())
|
||||
self.started_at = monotonic()
|
||||
|
||||
def on_transmit(self, amt: int) -> None:
|
||||
if self.active_file is not None:
|
||||
self.active_file.transmitted_bytes += amt
|
||||
self.total_transferred += amt
|
||||
self.transfers.append(Transfer(amt))
|
||||
now = self.transfers[-1].at
|
||||
while len(self.transfers) > 2 and self.transfers[0].is_too_old(now):
|
||||
self.transfers.popleft()
|
||||
self.transfered_stats_interval = now - self.transfers[0].at
|
||||
self.transfered_stats_amt = sum(t.amt for t in self.transfers)
|
||||
|
||||
def on_file_progress(self, af: File, delta: int) -> None:
|
||||
if delta > 0:
|
||||
self.total_reported_progress += delta
|
||||
|
||||
def on_file_done(self, af: File) -> None:
|
||||
af.done_at = monotonic()
|
||||
|
||||
|
||||
class SendManager:
|
||||
|
||||
def __init__(
|
||||
self, request_id: str, files: Tuple[File, ...],
|
||||
bypass: Optional[str] = None, use_rsync: bool = False,
|
||||
file_progress: Callable[[File, int], None] = lambda f, i: None,
|
||||
file_done: Callable[[File], None] = lambda f: None,
|
||||
):
|
||||
self.use_rsync = use_rsync
|
||||
self.files = files
|
||||
self.bypass = encode_bypass(request_id, bypass) if bypass else ''
|
||||
self.fid_map = {f.file_id: f for f in self.files}
|
||||
self.request_id = request_id
|
||||
self.state = SendState.waiting_for_permission
|
||||
self.all_acknowledged = self.all_started = self.has_transmitting = self.has_rsync = False
|
||||
self.active_idx: Optional[int] = None
|
||||
self.current_chunk_uncompressed_sz: Optional[int] = None
|
||||
self.prefix = f'\x1b]{FILE_TRANSFER_CODE};id={self.request_id};'
|
||||
self.suffix = '\x1b\\'
|
||||
self.progress = ProgressTracker(sum(df.file_size for df in self.files if df.file_size >= 0))
|
||||
self.file_done = file_done
|
||||
self.file_progress = file_progress
|
||||
self.last_progress_file: Optional[File] = None
|
||||
|
||||
@property
|
||||
def active_file(self) -> Optional[File]:
|
||||
if self.active_idx is not None:
|
||||
ans = self.files[self.active_idx]
|
||||
if ans.state is FileState.transmitting:
|
||||
return ans
|
||||
return None
|
||||
|
||||
def activate_next_ready_file(self) -> Optional[File]:
|
||||
if self.active_idx is not None:
|
||||
paf = self.files[self.active_idx]
|
||||
paf.transmit_ended_at = monotonic()
|
||||
for i, f in enumerate(self.files):
|
||||
if f.state is FileState.transmitting:
|
||||
self.active_idx = i
|
||||
self.update_collective_statuses()
|
||||
self.progress.change_active_file(f)
|
||||
return f
|
||||
self.active_idx = None
|
||||
self.update_collective_statuses()
|
||||
return None
|
||||
|
||||
def update_collective_statuses(self) -> None:
|
||||
found_not_started = found_not_done = False
|
||||
has_rsync = has_transmitting = False
|
||||
for f in self.files:
|
||||
if f.state is not FileState.acknowledged:
|
||||
found_not_done = True
|
||||
if f.state is FileState.waiting_for_start:
|
||||
found_not_started = True
|
||||
elif f.state is FileState.transmitting:
|
||||
has_transmitting = True
|
||||
if f.ttype is TransmissionType.rsync:
|
||||
has_rsync = True
|
||||
|
||||
self.all_acknowledged = not found_not_done
|
||||
self.all_started = not found_not_started
|
||||
self.has_rsync = has_rsync
|
||||
self.has_transmitting = has_transmitting
|
||||
|
||||
def start_transfer(self) -> str:
|
||||
return FileTransmissionCommand(action=Action.send, bypass=self.bypass).serialize()
|
||||
|
||||
def next_chunks(self) -> Iterator[str]:
|
||||
if self.active_file is None:
|
||||
self.activate_next_ready_file()
|
||||
af = self.active_file
|
||||
if af is None:
|
||||
return
|
||||
chunk = b''
|
||||
self.current_chunk_uncompressed_sz = 0
|
||||
while af.state is not FileState.finished and not chunk:
|
||||
chunk, usz = af.next_chunk()
|
||||
self.current_chunk_uncompressed_sz += usz
|
||||
is_last = af.state is FileState.finished
|
||||
if len(chunk):
|
||||
for ftc in split_for_transfer(chunk, file_id=af.file_id, mark_last=is_last):
|
||||
yield ftc.serialize()
|
||||
elif is_last:
|
||||
yield FileTransmissionCommand(action=Action.end_data, file_id=af.file_id, data=b'').serialize()
|
||||
|
||||
def send_file_metadata(self) -> Iterator[str]:
|
||||
for f in self.files:
|
||||
yield f.metadata_command(self.use_rsync).serialize()
|
||||
|
||||
def on_file_status_update(self, ftc: FileTransmissionCommand) -> None:
|
||||
file = self.fid_map.get(ftc.file_id)
|
||||
if file is None:
|
||||
return
|
||||
if ftc.status == 'STARTED':
|
||||
file.remote_final_path = ftc.name
|
||||
file.remote_initial_size = ftc.size
|
||||
if file.file_type is FileType.directory:
|
||||
file.state = FileState.finished
|
||||
else:
|
||||
file.state = FileState.waiting_for_data if ftc.ttype is TransmissionType.rsync else FileState.transmitting
|
||||
if file.state is FileState.waiting_for_data:
|
||||
file.signature_loader = LoadSignature()
|
||||
self.update_collective_statuses()
|
||||
elif ftc.status == 'PROGRESS':
|
||||
self.last_progress_file = file
|
||||
change = ftc.size - file.reported_progress
|
||||
file.reported_progress = ftc.size
|
||||
self.progress.on_file_progress(file, change)
|
||||
self.file_progress(file, change)
|
||||
else:
|
||||
if ftc.name and not file.remote_final_path:
|
||||
file.remote_final_path = ftc.name
|
||||
file.state = FileState.acknowledged
|
||||
if ftc.status == 'OK':
|
||||
if ftc.size > 0:
|
||||
change = ftc.size - file.reported_progress
|
||||
file.reported_progress = ftc.size
|
||||
self.progress.on_file_progress(file, change)
|
||||
self.file_progress(file, change)
|
||||
else:
|
||||
file.err_msg = ftc.status
|
||||
self.progress.on_file_done(file)
|
||||
self.file_done(file)
|
||||
if self.active_idx is not None and file is self.files[self.active_idx]:
|
||||
self.active_idx = None
|
||||
self.update_collective_statuses()
|
||||
|
||||
def on_signature_data_received(self, ftc: FileTransmissionCommand) -> None:
|
||||
file = self.fid_map.get(ftc.file_id)
|
||||
if file is None or file.state is not FileState.waiting_for_data:
|
||||
return
|
||||
sl = file.signature_loader
|
||||
assert sl is not None
|
||||
sl.add_chunk(ftc.data)
|
||||
self.progress.signature_bytes += len(ftc.data)
|
||||
if ftc.action is Action.end_data:
|
||||
sl.commit()
|
||||
file.start_delta_calculation()
|
||||
self.update_collective_statuses()
|
||||
|
||||
def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None:
|
||||
if ftc.action is Action.status:
|
||||
if ftc.file_id:
|
||||
self.on_file_status_update(ftc)
|
||||
else:
|
||||
self.state = SendState.permission_granted if ftc.status == 'OK' else SendState.permission_denied
|
||||
elif ftc.action in (Action.data, Action.end_data):
|
||||
if ftc.file_id:
|
||||
self.on_signature_data_received(ftc)
|
||||
|
||||
|
||||
class Send(Handler):
|
||||
use_alternate_screen = False
|
||||
|
||||
def __init__(self, cli_opts: TransferCLIOptions, files: Tuple[File, ...]):
|
||||
Handler.__init__(self)
|
||||
self.manager = SendManager(
|
||||
random_id(), files, cli_opts.permissions_bypass, cli_opts.transmit_deltas, self.on_file_progress, self.on_file_done)
|
||||
self.cli_opts = cli_opts
|
||||
self.transmit_started = False
|
||||
self.file_metadata_sent = False
|
||||
self.quit_after_write_code: Optional[int] = None
|
||||
self.check_paths_printed = False
|
||||
names = tuple(x.display_name for x in self.manager.files)
|
||||
self.max_name_length = max(6, max(map(wcswidth, names)))
|
||||
self.spinner = Spinner()
|
||||
self.progress_drawn = True
|
||||
self.done_files: List[File] = []
|
||||
self.done_file_ids: Set[str] = set()
|
||||
self.failed_files: List[File] = []
|
||||
self.transmit_ok_checked = False
|
||||
self.progress_update_call: Optional[TimerHandle] = None
|
||||
|
||||
def send_payload(self, payload: str) -> None:
|
||||
self.write(self.manager.prefix)
|
||||
self.write(payload)
|
||||
self.write(self.manager.suffix)
|
||||
|
||||
def on_file_transfer_response(self, ftc: FileTransmissionCommand) -> None:
|
||||
if ftc.id != self.manager.request_id:
|
||||
return
|
||||
if ftc.status == 'CANCELED' and ftc.action is Action.status:
|
||||
self.quit_loop(1)
|
||||
return
|
||||
if self.quit_after_write_code is not None or self.manager.state is SendState.canceled:
|
||||
return
|
||||
before = self.manager.state
|
||||
self.manager.on_file_transfer_response(ftc)
|
||||
if before == SendState.waiting_for_permission:
|
||||
if self.manager.state == SendState.permission_denied:
|
||||
self.cmd.styled('Permission denied for this transfer', fg='red')
|
||||
self.print()
|
||||
self.quit_loop(1)
|
||||
return
|
||||
if self.manager.state == SendState.permission_granted:
|
||||
self.cmd.styled('Permission granted for this transfer', fg='green')
|
||||
self.print()
|
||||
self.send_file_metadata()
|
||||
self.asyncio_loop.call_soon(self.loop_tick)
|
||||
|
||||
def start_transfer(self) -> None:
|
||||
if self.manager.active_file is None:
|
||||
self.manager.activate_next_ready_file()
|
||||
if self.manager.active_file is not None:
|
||||
self.transmit_started = True
|
||||
self.manager.progress.start_transfer()
|
||||
self.transmit_next_chunk()
|
||||
self.draw_progress()
|
||||
|
||||
def print_check_paths(self) -> None:
|
||||
if self.check_paths_printed:
|
||||
return
|
||||
self.check_paths_printed = True
|
||||
self.print('The following file transfers will be performed. A red destination means an existing file will be overwritten.')
|
||||
for df in self.manager.files:
|
||||
self.cmd.styled(df.file_type.short_text, fg=df.file_type.color)
|
||||
self.print(end=' ')
|
||||
self.print(df.display_name, '→', end=' ')
|
||||
self.cmd.styled(df.remote_final_path, fg='red' if df.remote_initial_size > -1 else None)
|
||||
self.print()
|
||||
self.print(f'Transferring {len(self.manager.files)} files of total size: {human_size(self.manager.progress.total_bytes_to_transfer)}')
|
||||
self.print()
|
||||
self.print_continue_msg()
|
||||
|
||||
def print_continue_msg(self) -> None:
|
||||
self.print(
|
||||
'Press', styled('y', fg='green', bold=True, fg_intense=True), 'to continue or',
|
||||
styled('n', fg='red', bold=True, fg_intense=True), 'to abort')
|
||||
|
||||
def on_text(self, text: str, in_bracketed_paste: bool = False) -> None:
|
||||
if self.quit_after_write_code is not None:
|
||||
return
|
||||
if self.check_paths_printed and not self.transmit_started:
|
||||
if text.lower() == 'y':
|
||||
self.start_transfer()
|
||||
if self.manager.all_acknowledged:
|
||||
self.refresh_progress()
|
||||
self.transfer_finished()
|
||||
return
|
||||
if text.lower() == 'n':
|
||||
del self.failed_files[:]
|
||||
self.abort_transfer()
|
||||
self.print('Sending cancel request to terminal')
|
||||
return
|
||||
self.print_continue_msg()
|
||||
|
||||
def on_key(self, key_event: KeyEventType) -> None:
|
||||
if self.quit_after_write_code is not None:
|
||||
return
|
||||
if key_event.matches('esc'):
|
||||
if self.check_paths_printed and not self.transmit_started:
|
||||
del self.failed_files[:]
|
||||
self.abort_transfer()
|
||||
self.print('Sending cancel request to terminal')
|
||||
else:
|
||||
self.on_interrupt()
|
||||
|
||||
def check_for_transmit_ok(self) -> None:
|
||||
if self.transmit_ok_checked:
|
||||
return self.start_transfer()
|
||||
if self.manager.state is not SendState.permission_granted:
|
||||
return
|
||||
if self.cli_opts.confirm_paths:
|
||||
if self.manager.all_started:
|
||||
self.print_check_paths()
|
||||
return
|
||||
self.transmit_ok_checked = True
|
||||
self.start_transfer()
|
||||
|
||||
def transmit_next_chunk(self) -> None:
|
||||
found_chunk = False
|
||||
for chunk in self.manager.next_chunks():
|
||||
self.send_payload(chunk)
|
||||
found_chunk = True
|
||||
if not found_chunk:
|
||||
if self.manager.all_acknowledged:
|
||||
self.transfer_finished()
|
||||
|
||||
def transfer_finished(self) -> None:
|
||||
self.send_payload(FileTransmissionCommand(action=Action.finish).serialize())
|
||||
self.quit_after_write_code = 1 if self.failed_files else 0
|
||||
|
||||
def on_writing_finished(self) -> None:
|
||||
chunk_transmitted = self.manager.current_chunk_uncompressed_sz is not None
|
||||
if chunk_transmitted:
|
||||
self.manager.progress.on_transmit(self.manager.current_chunk_uncompressed_sz or 0)
|
||||
self.manager.current_chunk_uncompressed_sz = None
|
||||
if self.quit_after_write_code is not None:
|
||||
self.quit_loop(self.quit_after_write_code)
|
||||
return
|
||||
if self.manager.state is SendState.permission_granted and (not self.transmit_started or chunk_transmitted):
|
||||
self.asyncio_loop.call_soon(self.loop_tick)
|
||||
|
||||
def loop_tick(self) -> None:
|
||||
if self.manager.state is SendState.waiting_for_permission:
|
||||
return
|
||||
if self.transmit_started:
|
||||
self.transmit_next_chunk()
|
||||
self.refresh_progress()
|
||||
else:
|
||||
self.check_for_transmit_ok()
|
||||
|
||||
def initialize(self) -> None:
|
||||
self.send_payload(self.manager.start_transfer())
|
||||
if self.cli_opts.permissions_bypass:
|
||||
# dont wait for permission, not needed with a bypass and
|
||||
# avoids a roundtrip
|
||||
self.send_file_metadata()
|
||||
self.cmd.set_cursor_visible(False)
|
||||
|
||||
def finalize(self) -> None:
|
||||
self.cmd.set_cursor_visible(True)
|
||||
|
||||
def send_file_metadata(self) -> None:
|
||||
if not self.file_metadata_sent:
|
||||
for payload in self.manager.send_file_metadata():
|
||||
self.send_payload(payload)
|
||||
self.file_metadata_sent = True
|
||||
|
||||
def on_term(self) -> None:
|
||||
if self.quit_after_write_code is not None:
|
||||
return
|
||||
self.cmd.styled('Terminate requested, cancelling transfer, transferred files are in undefined state', fg='red')
|
||||
self.print()
|
||||
self.abort_transfer(delay=2)
|
||||
|
||||
def on_interrupt(self) -> None:
|
||||
if self.quit_after_write_code is not None:
|
||||
return
|
||||
if self.manager.state is SendState.canceled:
|
||||
self.print('Waiting for canceled acknowledgement from terminal, will abort in a few seconds if no response received')
|
||||
return
|
||||
self.cmd.styled('Interrupt requested, cancelling transfer, transferred files are in undefined state', fg='red')
|
||||
self.print()
|
||||
self.abort_transfer()
|
||||
|
||||
def abort_transfer(self, delay: float = 5) -> None:
|
||||
self.send_payload(FileTransmissionCommand(action=Action.cancel).serialize())
|
||||
self.manager.state = SendState.canceled
|
||||
self.asyncio_loop.call_later(delay, self.quit_loop, 1)
|
||||
|
||||
def render_progress(
|
||||
self, name: str, spinner_char: str = ' ', bytes_so_far: int = 0, total_bytes: int = 0,
|
||||
secs_so_far: float = 0., bytes_per_sec: float = 0., is_complete: bool = False
|
||||
) -> None:
|
||||
if is_complete:
|
||||
bytes_so_far = total_bytes
|
||||
self.write(render_progress_in_width(
|
||||
name, width=self.screen_size.cols, max_path_length=self.max_name_length, spinner_char=spinner_char,
|
||||
bytes_so_far=bytes_so_far, total_bytes=total_bytes, secs_so_far=secs_so_far,
|
||||
bytes_per_sec=bytes_per_sec, is_complete=is_complete
|
||||
))
|
||||
|
||||
def erase_progress(self) -> None:
|
||||
if self.progress_drawn:
|
||||
self.cmd.move_cursor_by(2, 'up')
|
||||
self.write('\r')
|
||||
self.cmd.clear_to_end_of_screen()
|
||||
self.progress_drawn = False
|
||||
|
||||
def schedule_progress_update(self, delay: float = 0.1) -> None:
|
||||
if self.progress_update_call is None:
|
||||
self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress)
|
||||
elif self.asyncio_loop.time() + delay < self.progress_update_call.when():
|
||||
self.progress_update_call.cancel()
|
||||
self.progress_update_call = self.asyncio_loop.call_later(delay, self.refresh_progress)
|
||||
|
||||
def on_file_progress(self, file: File, change: int) -> None:
|
||||
self.schedule_progress_update()
|
||||
|
||||
def on_file_done(self, file: File) -> None:
|
||||
self.done_files.append(file)
|
||||
if file.err_msg:
|
||||
self.failed_files.append(file)
|
||||
self.schedule_progress_update()
|
||||
|
||||
@Handler.atomic_update
|
||||
def draw_progress(self) -> None:
|
||||
with without_line_wrap(self.write):
|
||||
for df in self.done_files:
|
||||
sc = styled('✔', fg='green') if not df.err_msg else styled('✘', fg='red')
|
||||
if df.file_type is FileType.regular:
|
||||
self.draw_progress_for_current_file(df, spinner_char=sc, is_complete=True)
|
||||
else:
|
||||
self.write(f'{sc} {df.display_name} {styled(df.file_type.name, dim=True, italic=True)}')
|
||||
self.print()
|
||||
self.done_file_ids.add(df.file_id)
|
||||
del self.done_files[:]
|
||||
is_complete = self.quit_after_write_code is not None
|
||||
if is_complete:
|
||||
sc = styled('✔', fg='green') if self.quit_after_write_code == 0 else styled('✘', fg='red')
|
||||
else:
|
||||
sc = self.spinner()
|
||||
p = self.manager.progress
|
||||
now = monotonic()
|
||||
if is_complete:
|
||||
self.cmd.repeat('─', self.screen_size.width)
|
||||
else:
|
||||
af = self.manager.last_progress_file
|
||||
if af is None or af.file_id in self.done_file_ids:
|
||||
if self.manager.has_rsync and not self.manager.has_transmitting:
|
||||
self.print(sc, 'Transferring rsync signatures...', end='')
|
||||
else:
|
||||
self.print(sc, 'Transferring metadata...', end='')
|
||||
else:
|
||||
self.draw_progress_for_current_file(af, spinner_char=sc)
|
||||
self.print()
|
||||
if p.total_reported_progress > 0:
|
||||
self.render_progress(
|
||||
'Total', spinner_char=sc,
|
||||
bytes_so_far=p.total_reported_progress, total_bytes=p.total_bytes_to_transfer,
|
||||
secs_so_far=now - p.started_at, is_complete=is_complete,
|
||||
bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval)
|
||||
)
|
||||
else:
|
||||
self.print('File data transfer has not yet started', end='')
|
||||
self.print()
|
||||
self.schedule_progress_update(self.spinner.interval)
|
||||
self.progress_drawn = True
|
||||
|
||||
def on_resize(self, screen_size: ScreenSize) -> None:
|
||||
super().on_resize(screen_size)
|
||||
if self.progress_drawn:
|
||||
self.refresh_progress()
|
||||
|
||||
def refresh_progress(self) -> None:
|
||||
if not self.transmit_started:
|
||||
return
|
||||
self.erase_progress()
|
||||
self.draw_progress()
|
||||
|
||||
def draw_progress_for_current_file(self, af: File, spinner_char: str = ' ', is_complete: bool = False) -> None:
|
||||
p = self.manager.progress
|
||||
now = monotonic()
|
||||
self.render_progress(
|
||||
af.display_name, spinner_char=spinner_char, is_complete=is_complete,
|
||||
bytes_so_far=af.reported_progress, total_bytes=af.bytes_to_transmit,
|
||||
secs_so_far=(af.done_at or now) - af.transmit_started_at,
|
||||
bytes_per_sec=safe_divide(p.transfered_stats_amt, p.transfered_stats_interval)
|
||||
)
|
||||
|
||||
|
||||
def send_main(cli_opts: TransferCLIOptions, args: List[str]) -> None:
|
||||
print('Scanning files…')
|
||||
files = files_for_send(cli_opts, args)
|
||||
print(f'Found {len(files)} files and directories, requesting transfer permission…')
|
||||
loop = Loop()
|
||||
handler = Send(cli_opts, files)
|
||||
loop.loop(handler)
|
||||
p = handler.manager.progress
|
||||
if handler.manager.has_rsync and p.total_transferred + p.signature_bytes:
|
||||
tsf = 0
|
||||
for f in files:
|
||||
if f.ttype is TransmissionType.rsync:
|
||||
tsf += f.file_size
|
||||
if tsf:
|
||||
print_rsync_stats(tsf, p.total_transferred, p.signature_bytes)
|
||||
if handler.failed_files:
|
||||
print(f'Transfer of {len(handler.failed_files)} out of {len(handler.manager.files)} files failed')
|
||||
for ff in handler.failed_files:
|
||||
print(styled(ff.display_name, fg='red'))
|
||||
print(' ', ff.err_msg)
|
||||
|
||||
raise SystemExit(loop.return_code)
|
||||
Reference in New Issue
Block a user