mirror of
https://github.com/kovidgoyal/kitty
synced 2026-06-08 14:18:26 +02:00
more typing work
This commit is contained in:
@@ -12,7 +12,10 @@ from base64 import standard_b64encode
|
||||
from functools import lru_cache
|
||||
from math import ceil
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Union
|
||||
from typing import (
|
||||
TYPE_CHECKING, Dict, Generator, List, NamedTuple, Optional, Pattern, Tuple,
|
||||
Union
|
||||
)
|
||||
|
||||
from kitty.cli import parse_args
|
||||
from kitty.cli_stub import IcatCLIOptions
|
||||
@@ -22,7 +25,8 @@ from kitty.utils import (
|
||||
)
|
||||
|
||||
from ..tui.images import (
|
||||
ConvertFailed, NoImageMagick, OpenFailed, convert, fsenc, identify, GraphicsCommand
|
||||
ConvertFailed, GraphicsCommand, NoImageMagick, OpenFailed, convert, fsenc,
|
||||
identify
|
||||
)
|
||||
from ..tui.operations import clear_images_on_screen
|
||||
|
||||
@@ -136,7 +140,7 @@ def write_gr_cmd(cmd: GraphicsCommand, payload: Optional[bytes] = None) -> None:
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def calculate_in_cell_x_offset(width, cell_width, align):
|
||||
def calculate_in_cell_x_offset(width: int, cell_width: int, align: str) -> int:
|
||||
if align == 'left':
|
||||
return 0
|
||||
extra_pixels = width % cell_width
|
||||
@@ -147,7 +151,7 @@ def calculate_in_cell_x_offset(width, cell_width, align):
|
||||
return (cell_width - extra_pixels) // 2
|
||||
|
||||
|
||||
def set_cursor(cmd: GraphicsCommand, width, height, align):
|
||||
def set_cursor(cmd: GraphicsCommand, width: int, height: int, align: str) -> None:
|
||||
ss = get_screen_size()
|
||||
cw = int(ss.width / ss.cols)
|
||||
num_of_cells_needed = int(ceil(width / cw))
|
||||
@@ -167,7 +171,7 @@ def set_cursor(cmd: GraphicsCommand, width, height, align):
|
||||
sys.stdout.buffer.write(b' ' * extra_cells)
|
||||
|
||||
|
||||
def set_cursor_for_place(place, cmd: GraphicsCommand, width, height, align):
|
||||
def set_cursor_for_place(place: 'Place', cmd: GraphicsCommand, width: int, height: int, align: str) -> None:
|
||||
x = place.left + 1
|
||||
ss = get_screen_size()
|
||||
cw = int(ss.width / ss.cols)
|
||||
@@ -193,7 +197,14 @@ def write_chunked(cmd: GraphicsCommand, data: bytes) -> None:
|
||||
cmd.clear()
|
||||
|
||||
|
||||
def show(outfile, width: int, height: int, zindex: int, fmt: 'GRT_f', transmit_mode: 'GRT_t' = 't', align: str = 'center', place=None):
|
||||
def show(
|
||||
outfile: str,
|
||||
width: int, height: int, zindex: int,
|
||||
fmt: 'GRT_f',
|
||||
transmit_mode: 'GRT_t' = 't',
|
||||
align: str = 'center',
|
||||
place: Optional['Place'] = None
|
||||
) -> None:
|
||||
cmd = GraphicsCommand()
|
||||
cmd.a = 'T'
|
||||
cmd.f = fmt
|
||||
@@ -217,7 +228,7 @@ def show(outfile, width: int, height: int, zindex: int, fmt: 'GRT_f', transmit_m
|
||||
write_chunked(cmd, data)
|
||||
|
||||
|
||||
def parse_z_index(val):
|
||||
def parse_z_index(val: str) -> int:
|
||||
origin = 0
|
||||
if val.startswith('--'):
|
||||
val = val[1:]
|
||||
@@ -225,11 +236,17 @@ def parse_z_index(val):
|
||||
return origin + int(val)
|
||||
|
||||
|
||||
def process(path, args, is_tempfile):
|
||||
class ParsedOpts:
|
||||
|
||||
place: Optional['Place'] = None
|
||||
z_index: int = 0
|
||||
|
||||
|
||||
def process(path: str, args: IcatCLIOptions, parsed_opts: ParsedOpts, is_tempfile: bool) -> bool:
|
||||
m = identify(path)
|
||||
ss = get_screen_size()
|
||||
available_width = args.place.width * (ss.width / ss.cols) if args.place else ss.width
|
||||
available_height = args.place.height * (ss.height / ss.rows) if args.place else 10 * m.height
|
||||
available_width = parsed_opts.place.width * (ss.width // ss.cols) if parsed_opts.place else ss.width
|
||||
available_height = parsed_opts.place.height * (ss.height // ss.rows) if parsed_opts.place else 10 * m.height
|
||||
needs_scaling = m.width > available_width or m.height > available_height
|
||||
needs_scaling = needs_scaling or args.scale_up
|
||||
file_removed = False
|
||||
@@ -243,13 +260,13 @@ def process(path, args, is_tempfile):
|
||||
fmt = 24 if m.mode == 'rgb' else 32
|
||||
transmit_mode = 't'
|
||||
outfile, width, height = convert(path, m, available_width, available_height, args.scale_up)
|
||||
show(outfile, width, height, args.z_index, fmt, transmit_mode, align=args.align, place=args.place)
|
||||
show(outfile, width, height, parsed_opts.z_index, fmt, transmit_mode, align=args.align, place=parsed_opts.place)
|
||||
if not args.place:
|
||||
print() # ensure cursor is on a new line
|
||||
return file_removed
|
||||
|
||||
|
||||
def scan(d):
|
||||
def scan(d: str) -> Generator[Tuple[str, str], None, None]:
|
||||
for dirpath, dirnames, filenames in os.walk(d):
|
||||
for f in filenames:
|
||||
mt = mimetypes.guess_type(f)[0]
|
||||
@@ -257,7 +274,7 @@ def scan(d):
|
||||
yield os.path.join(dirpath, f), mt
|
||||
|
||||
|
||||
def detect_support(wait_for: int = 10, silent: bool = False) -> bool:
|
||||
def detect_support(wait_for: float = 10, silent: bool = False) -> bool:
|
||||
global can_transfer_with_files
|
||||
if not silent:
|
||||
print('Checking for graphics ({}s max. wait)...'.format(wait_for), end='\r')
|
||||
@@ -266,7 +283,7 @@ def detect_support(wait_for: int = 10, silent: bool = False) -> bool:
|
||||
received = b''
|
||||
responses: Dict[int, bool] = {}
|
||||
|
||||
def parse_responses():
|
||||
def parse_responses() -> None:
|
||||
for m in re.finditer(b'\033_Gi=([1|2]);(.+?)\033\\\\', received):
|
||||
iid = m.group(1)
|
||||
if iid in (b'1', b'2'):
|
||||
@@ -274,7 +291,7 @@ def detect_support(wait_for: int = 10, silent: bool = False) -> bool:
|
||||
if iid_ not in responses:
|
||||
responses[iid_] = m.group(2) == b'OK'
|
||||
|
||||
def more_needed(data):
|
||||
def more_needed(data: bytes) -> bool:
|
||||
nonlocal received
|
||||
received += data
|
||||
parse_responses()
|
||||
@@ -290,7 +307,7 @@ def detect_support(wait_for: int = 10, silent: bool = False) -> bool:
|
||||
gc.i = 2
|
||||
write_gr_cmd(gc, standard_b64encode(f.name.encode(fsenc)))
|
||||
with TTYIO() as io:
|
||||
io.recv(more_needed, timeout=float(wait_for))
|
||||
io.recv(more_needed, timeout=wait_for)
|
||||
finally:
|
||||
if not silent:
|
||||
sys.stdout.buffer.write(b'\033[J'), sys.stdout.flush()
|
||||
@@ -325,7 +342,13 @@ help_text = (
|
||||
usage = 'image-file-or-url-or-directory ...'
|
||||
|
||||
|
||||
def process_single_item(item, args, url_pat=None, maybe_dir=True):
|
||||
def process_single_item(
|
||||
item: Union[bytes, str],
|
||||
args: IcatCLIOptions,
|
||||
parsed_opts: ParsedOpts,
|
||||
url_pat: Optional[Pattern] = None,
|
||||
maybe_dir: bool = True
|
||||
) -> None:
|
||||
is_tempfile = False
|
||||
file_removed = False
|
||||
try:
|
||||
@@ -343,34 +366,34 @@ def process_single_item(item, args, url_pat=None, maybe_dir=True):
|
||||
raise SystemExit('Failed to download image at URL: {} with error: {}'.format(item, e))
|
||||
item = tf.name
|
||||
is_tempfile = True
|
||||
file_removed = process(item, args, is_tempfile)
|
||||
file_removed = process(item, args, parsed_opts, is_tempfile)
|
||||
elif item.lower().startswith('file://'):
|
||||
from urllib.parse import urlparse
|
||||
from urllib.request import url2pathname
|
||||
item = urlparse(item)
|
||||
pitem = urlparse(item)
|
||||
if os.sep == '\\':
|
||||
item = item.netloc + item.path
|
||||
item = pitem.netloc + pitem.path
|
||||
else:
|
||||
item = item.path
|
||||
item = pitem.path
|
||||
item = url2pathname(item)
|
||||
file_removed = process(item, args, is_tempfile)
|
||||
file_removed = process(item, args, parsed_opts, is_tempfile)
|
||||
else:
|
||||
if maybe_dir and os.path.isdir(item):
|
||||
for (x, mt) in scan(item):
|
||||
process_single_item(x, args, url_pat=None, maybe_dir=False)
|
||||
process_single_item(x, args, parsed_opts, url_pat=None, maybe_dir=False)
|
||||
else:
|
||||
file_removed = process(item, args, is_tempfile)
|
||||
file_removed = process(item, args, parsed_opts, is_tempfile)
|
||||
finally:
|
||||
if is_tempfile and not file_removed:
|
||||
os.remove(item)
|
||||
|
||||
|
||||
def main(args=sys.argv):
|
||||
def main(args: List[str] = sys.argv) -> None:
|
||||
global can_transfer_with_files
|
||||
args, items_ = parse_args(args[1:], options_spec, usage, help_text, '{} +kitten icat'.format(appname), result_class=IcatCLIOptions)
|
||||
cli_opts, items_ = parse_args(args[1:], options_spec, usage, help_text, '{} +kitten icat'.format(appname), result_class=IcatCLIOptions)
|
||||
items: List[Union[str, bytes]] = list(items_)
|
||||
|
||||
if args.print_window_size:
|
||||
if cli_opts.print_window_size:
|
||||
screen_size_function.cache_clear()
|
||||
with open(os.ctermid()) as tty:
|
||||
ss = screen_size_function(tty)()
|
||||
@@ -380,7 +403,7 @@ def main(args=sys.argv):
|
||||
if not sys.stdout.isatty():
|
||||
sys.stdout = open(os.ctermid(), 'w')
|
||||
stdin_data = None
|
||||
if args.stdin == 'yes' or (not sys.stdin.isatty() and args.stdin == 'detect'):
|
||||
if cli_opts.stdin == 'yes' or (not sys.stdin.isatty() and cli_opts.stdin == 'detect'):
|
||||
stdin_data = sys.stdin.buffer.read()
|
||||
if stdin_data:
|
||||
items.insert(0, stdin_data)
|
||||
@@ -390,53 +413,55 @@ def main(args=sys.argv):
|
||||
screen_size = get_screen_size_function()
|
||||
signal.signal(signal.SIGWINCH, lambda signum, frame: setattr(screen_size, 'changed', True))
|
||||
if screen_size().width == 0:
|
||||
if args.detect_support:
|
||||
if cli_opts.detect_support:
|
||||
raise SystemExit(1)
|
||||
raise SystemExit(
|
||||
'Terminal does not support reporting screen sizes via the TIOCGWINSZ ioctl'
|
||||
)
|
||||
try:
|
||||
args.place = parse_place(args.place)
|
||||
except Exception:
|
||||
raise SystemExit('Not a valid place specification: {}'.format(args.place))
|
||||
parsed_opts = ParsedOpts()
|
||||
if cli_opts.place:
|
||||
try:
|
||||
parsed_opts.place = parse_place(cli_opts.place)
|
||||
except Exception:
|
||||
raise SystemExit('Not a valid place specification: {}'.format(cli_opts.place))
|
||||
|
||||
try:
|
||||
args.z_index = parse_z_index(args.z_index)
|
||||
parsed_opts.z_index = parse_z_index(cli_opts.z_index)
|
||||
except Exception:
|
||||
raise SystemExit('Not a valid z-index specification: {}'.format(args.z_index))
|
||||
raise SystemExit('Not a valid z-index specification: {}'.format(cli_opts.z_index))
|
||||
|
||||
if args.detect_support:
|
||||
if not detect_support(wait_for=args.detection_timeout, silent=True):
|
||||
if cli_opts.detect_support:
|
||||
if not detect_support(wait_for=cli_opts.detection_timeout, silent=True):
|
||||
raise SystemExit(1)
|
||||
print('file' if can_transfer_with_files else 'stream', end='', file=sys.stderr)
|
||||
return
|
||||
if args.transfer_mode == 'detect':
|
||||
if not detect_support(wait_for=args.detection_timeout, silent=args.silent):
|
||||
if cli_opts.transfer_mode == 'detect':
|
||||
if not detect_support(wait_for=cli_opts.detection_timeout, silent=cli_opts.silent):
|
||||
raise SystemExit('This terminal emulator does not support the graphics protocol, use a terminal emulator such as kitty that does support it')
|
||||
else:
|
||||
can_transfer_with_files = args.transfer_mode == 'file'
|
||||
can_transfer_with_files = cli_opts.transfer_mode == 'file'
|
||||
errors = []
|
||||
if args.clear:
|
||||
if cli_opts.clear:
|
||||
sys.stdout.write(clear_images_on_screen(delete_data=True))
|
||||
if not items:
|
||||
return
|
||||
if not items:
|
||||
raise SystemExit('You must specify at least one file to cat')
|
||||
if args.place:
|
||||
if parsed_opts.place:
|
||||
if len(items) > 1 or (isinstance(items[0], str) and os.path.isdir(items[0])):
|
||||
raise SystemExit(f'The --place option can only be used with a single image, not {items}')
|
||||
sys.stdout.buffer.write(b'\0337') # save cursor
|
||||
url_pat = re.compile(r'(?:https?|ftp)://', flags=re.I)
|
||||
for item in items:
|
||||
try:
|
||||
process_single_item(item, args, url_pat)
|
||||
process_single_item(item, cli_opts, parsed_opts, url_pat)
|
||||
except NoImageMagick as e:
|
||||
raise SystemExit(str(e))
|
||||
except ConvertFailed as e:
|
||||
raise SystemExit(str(e))
|
||||
except OpenFailed as e:
|
||||
errors.append(e)
|
||||
if args.place:
|
||||
if parsed_opts.place:
|
||||
sys.stdout.buffer.write(b'\0338') # restore cursor
|
||||
if not errors:
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user