diff --git a/kittens/runner.py b/kittens/runner.py index b7d214473..15a040b66 100644 --- a/kittens/runner.py +++ b/kittens/runner.py @@ -6,7 +6,7 @@ import importlib import os import sys -from functools import partial +from functools import lru_cache, partial aliases = {'url_hints': 'hints'} @@ -42,7 +42,7 @@ def import_kitten_main_module(config_dir, kitten): kitten = resolved_kitten(kitten) m = importlib.import_module('kittens.{}.main'.format(kitten)) - return {'start': m.main, 'end': getattr(m, 'handle_result', lambda *a, **k: None)} + return {'start': getattr(m, 'main'), 'end': getattr(m, 'handle_result', lambda *a, **k: None)} def create_kitten_handler(kitten, orig_args): @@ -50,15 +50,15 @@ def create_kitten_handler(kitten, orig_args): kitten = resolved_kitten(kitten) m = import_kitten_main_module(config_dir, kitten) ans = partial(m['end'], [kitten] + orig_args) - ans.type_of_input = getattr(m['end'], 'type_of_input', None) - ans.no_ui = getattr(m['end'], 'no_ui', False) + setattr(ans, 'type_of_input', getattr(m['end'], 'type_of_input', None)) + setattr(ans, 'no_ui', getattr(m['end'], 'no_ui', False)) return ans def set_debug(kitten): from kittens.tui.loop import debug import builtins - builtins.debug = debug + setattr(builtins, 'debug', debug) def launch(args): @@ -117,18 +117,16 @@ def run_kitten(kitten, run_name='__main__'): m['main'](sys.argv) +@lru_cache(maxsize=2) def all_kitten_names(): - ans = getattr(all_kitten_names, 'ans', None) - if ans is None: - n = [] - import glob - base = os.path.dirname(os.path.abspath(__file__)) - for x in glob.glob(os.path.join(base, '*', '__init__.py')): - q = os.path.basename(os.path.dirname(x)) - if q != 'tui': - n.append(q) - all_kitten_names.ans = ans = frozenset(n) - return ans + n = [] + import glob + base = os.path.dirname(os.path.abspath(__file__)) + for x in glob.glob(os.path.join(base, '*', '__init__.py')): + q = os.path.basename(os.path.dirname(x)) + if q != 'tui': + n.append(q) + return frozenset(n) def list_kittens(): @@ -140,19 +138,19 @@ def list_kittens(): def get_kitten_cli_docs(kitten): - sys.cli_docs = {} + setattr(sys, 'cli_docs', {}) run_kitten(kitten, run_name='__doc__') - ans = sys.cli_docs - del sys.cli_docs + ans = getattr(sys, 'cli_docs') + delattr(sys, 'cli_docs') if 'help_text' in ans and 'usage' in ans and 'options' in ans: return ans def get_kitten_conf_docs(kitten): - sys.all_options = None + setattr(sys, 'all_options', None) run_kitten(kitten, run_name='__conf__') - ans = sys.all_options - del sys.all_options + ans = getattr(sys, 'all_options') + delattr(sys, 'all_options') return ans diff --git a/kittens/unicode_input/main.py b/kittens/unicode_input/main.py index bc9914936..6f92adace 100644 --- a/kittens/unicode_input/main.py +++ b/kittens/unicode_input/main.py @@ -9,6 +9,7 @@ import sys from contextlib import suppress from functools import lru_cache from gettext import gettext as _ +from typing import List, Optional, Sequence, Tuple, Union from kitty.cli import parse_args from kitty.cli_stub import UnicodeCLIOptions @@ -121,15 +122,16 @@ def serialize_favorites(favorites): return '\n'.join(ans) -def load_favorites(refresh=False): - ans = getattr(load_favorites, 'ans', None) +def load_favorites(refresh: bool = False) -> List[int]: + ans: Optional[List[int]] = getattr(load_favorites, 'ans', None) if ans is None or refresh: try: with open(favorites_path, 'rb') as f: raw = f.read().decode('utf-8') - ans = load_favorites.ans = list(parse_favorites(raw)) or list(DEFAULT_SET) + ans = list(parse_favorites(raw)) or list(DEFAULT_SET) except FileNotFoundError: - ans = load_favorites.ans = list(DEFAULT_SET) + ans = list(DEFAULT_SET) + setattr(load_favorites, 'ans', ans) return ans @@ -232,7 +234,7 @@ class Table: col_width = min(col_width, 40) space_for_desc = col_width - 2 - idx_size - 4 num_cols = self.num_cols = max(cols // col_width, 1) - buf = [] + buf: List[str] = [] a = buf.append rows_left = rows @@ -241,7 +243,7 @@ class Table: rows_left -= 1 if rows_left == 0: break - buf.append('\r\n') + a('\r\n') buf.extend(cell(i, idx, c, desc)) a(' ') self.text = ''.join(buf) @@ -298,7 +300,7 @@ class UnicodeInput(Handler): def update_codepoints(self): codepoints = None if self.mode is HEX: - q = self.mode, None + q: Tuple[str, Optional[Union[str, Sequence[int]]]] = self.mode, None codepoints = self.recent elif self.mode is EMOTICONS: q = self.mode, None @@ -317,9 +319,9 @@ class UnicodeInput(Handler): words = words[:index_words[0]] codepoints = codepoints_matching_search(tuple(words)) if index_words: - index_word = int(index_word.lstrip(INDEX_CHAR), 16) - if index_word < len(codepoints): - codepoints = [codepoints[index_word]] + iindex_word = int(index_word.lstrip(INDEX_CHAR), 16) + if codepoints and iindex_word < len(codepoints): + codepoints = [codepoints[iindex_word]] if q != self.last_updated_code_point_at: self.last_updated_code_point_at = q self.table.set_codepoints(codepoints, self.mode) diff --git a/kitty/child.py b/kitty/child.py index 55074b10b..87ac8ad5a 100644 --- a/kitty/child.py +++ b/kitty/child.py @@ -7,11 +7,12 @@ import os import sys from collections import defaultdict from contextlib import contextmanager, suppress -from typing import DefaultDict, List, Optional +from typing import DefaultDict, Dict, Generator, Iterable, List, Optional import kitty.fast_data_types as fast_data_types from .constants import is_macos, shell_path, terminfo_dir +from .options_stub import Options if is_macos: from kitty.fast_data_types import ( @@ -62,40 +63,44 @@ else: return ans -def checked_terminfo_dir(): - ans = getattr(checked_terminfo_dir, 'ans', None) - if ans is None: - ans = checked_terminfo_dir.ans = terminfo_dir if os.path.isdir(terminfo_dir) else None +def checked_terminfo_dir() -> Optional[str]: + q = getattr(checked_terminfo_dir, 'ans', False) + if q is False: + ans = terminfo_dir if os.path.isdir(terminfo_dir) else None + setattr(checked_terminfo_dir, 'ans', ans) + else: + ans = q return ans -def processes_in_group(grp): - gmap = getattr(process_group_map, 'cached_map', None) +def processes_in_group(grp: int) -> List[int]: + gmap: Optional[DefaultDict[int, List[int]]] = getattr(process_group_map, 'cached_map', None) if gmap is None: try: gmap = process_group_map() except Exception: - gmap = {} + gmap = defaultdict(list) return gmap.get(grp, []) @contextmanager -def cached_process_data(): +def cached_process_data() -> Generator[None, None, None]: try: - process_group_map.cached_map = process_group_map() + cm = process_group_map() except Exception: - process_group_map.cached_map = {} + cm = defaultdict(list) + setattr(process_group_map, 'cached_map', cm) try: yield finally: - process_group_map.cached_map = None + delattr(process_group_map, 'cached_map') -def parse_environ_block(data): +def parse_environ_block(data: str) -> Dict[str, str]: """Parse a C environ block of environment variables into a dictionary.""" # The block is usually raw data from the target process. It might contain # trailing garbage and lines that do not look like assignments. - ret = {} + ret: Dict[str, str] = {} pos = 0 while True: @@ -114,39 +119,38 @@ def parse_environ_block(data): return ret -def environ_of_process(pid): +def environ_of_process(pid: int) -> Dict[str, str]: return parse_environ_block(_environ_of_process(pid)) -def remove_cloexec(fd): +def remove_cloexec(fd: int) -> None: fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.fcntl(fd, fcntl.F_GETFD) & ~fcntl.FD_CLOEXEC) -def remove_blocking(fd): +def remove_blocking(fd: int) -> None: os.set_blocking(fd, False) -def process_env(): - ans = os.environ +def process_env() -> Dict[str, str]: + ans = dict(os.environ) ssl_env_var = getattr(sys, 'kitty_ssl_env_var', None) if ssl_env_var is not None: - ans = ans.copy() ans.pop(ssl_env_var, None) return ans -def default_env(): - try: - return default_env.env - except AttributeError: +def default_env() -> Dict[str, str]: + ans: Optional[Dict[str, str]] = getattr(default_env, 'env', None) + if ans is None: return process_env() + return ans -def set_default_env(val=None): +def set_default_env(val: Optional[Dict[str, str]] = None) -> None: env = process_env().copy() if val: env.update(val) - default_env.env = env + setattr(default_env, 'env', env) def openpty(): @@ -162,7 +166,16 @@ class Child: pid: Optional[int] = None forked = False - def __init__(self, argv, cwd, opts, stdin=None, env=None, cwd_from=None, allow_remote_control=False): + def __init__( + self, + argv: Iterable[str], + cwd: str, + opts: Options, + stdin: Optional[bytes] = None, + env: Optional[Dict[str, str]] = None, + cwd_from: Optional[int] = None, + allow_remote_control=False + ): self.allow_remote_control = allow_remote_control self.argv = argv if cwd_from is not None: @@ -179,8 +192,8 @@ class Child: self.env = env or {} @property - def final_env(self): - env = getattr(self, '_final_env', None) + def final_env(self) -> Dict[str, str]: + env: Optional[Dict[str, str]] = getattr(self, '_final_env', None) if env is None: env = self._final_env = default_env().copy() env.update(self.env) @@ -191,13 +204,14 @@ class Child: # can use it to display the current directory name rather # than the resolved path env['PWD'] = self.cwd - if checked_terminfo_dir(): - env['TERMINFO'] = checked_terminfo_dir() + tdir = checked_terminfo_dir() + if tdir: + env['TERMINFO'] = tdir return env - def fork(self): + def fork(self) -> Optional[int]: if self.forked: - return + return None self.forked = True master, slave = openpty() stdin, self.stdin = self.stdin, None @@ -208,8 +222,7 @@ class Child: remove_cloexec(stdin_read_fd) else: stdin_read_fd = stdin_write_fd = -1 - env = self.final_env - env = tuple('{}={}'.format(k, v) for k, v in env.items()) + env = tuple('{}={}'.format(k, v) for k, v in self.final_env.items()) argv = list(self.argv) exe = argv[0] if is_macos and exe == shell_path: @@ -237,7 +250,8 @@ class Child: fast_data_types.thread_write(stdin_write_fd, stdin) os.close(ready_read_fd) self.terminal_ready_fd = ready_write_fd - remove_blocking(self.child_fd) + if self.child_fd is not None: + remove_blocking(self.child_fd) return pid def mark_terminal_ready(self): @@ -245,7 +259,9 @@ class Child: self.terminal_ready_fd = -1 @property - def foreground_processes(self): + def foreground_processes(self) -> List[int]: + if self.child_fd is None: + return [] try: pgrp = os.tcgetpgrp(self.child_fd) foreground_processes = processes_in_group(pgrp) if pgrp >= 0 else [] @@ -263,34 +279,39 @@ class Child: return [] @property - def cmdline(self): + def cmdline(self) -> List[str]: try: + assert self.pid is not None return cmdline_of_process(self.pid) or list(self.argv) except Exception: return list(self.argv) @property - def foreground_cmdline(self): + def foreground_cmdline(self) -> List[str]: try: + assert self.pid_for_cwd is not None return cmdline_of_process(self.pid_for_cwd) or self.cmdline except Exception: return self.cmdline @property - def environ(self): + def environ(self) -> Dict[str, str]: try: + assert self.pid is not None return environ_of_process(self.pid) except Exception: return {} @property - def current_cwd(self): + def current_cwd(self) -> Optional[str]: with suppress(Exception): + assert self.pid is not None return cwd_of_process(self.pid) @property - def pid_for_cwd(self): + def pid_for_cwd(self) -> Optional[int]: with suppress(Exception): + assert self.child_fd is not None pgrp = os.tcgetpgrp(self.child_fd) foreground_processes = processes_in_group(pgrp) if pgrp >= 0 else [] if len(foreground_processes) == 1: @@ -298,16 +319,19 @@ class Child: return self.pid @property - def foreground_cwd(self): + def foreground_cwd(self) -> Optional[str]: with suppress(Exception): + assert self.pid_for_cwd is not None return cwd_of_process(self.pid_for_cwd) or None @property - def foreground_environ(self): + def foreground_environ(self) -> Dict[str, str]: try: + assert self.pid_for_cwd is not None return environ_of_process(self.pid_for_cwd) except Exception: try: + assert self.pid is not None return environ_of_process(self.pid) except Exception: pass diff --git a/kitty/fast_data_types.pyi b/kitty/fast_data_types.pyi index fd1768c11..579b8767f 100644 --- a/kitty/fast_data_types.pyi +++ b/kitty/fast_data_types.pyi @@ -960,6 +960,9 @@ class Screen: def clear_selection(self) -> None: pass + def refresh_sprite_positions(self) -> None: + pass + def set_marker(self, marker: Optional[Callable] = None) -> None: pass @@ -1019,3 +1022,22 @@ class ChildMonitor: def set_iutf8_winid(self, win_id: int, on: bool) -> bool: pass + + +def set_iutf8_fd(fd: int, on: bool) -> bool: + pass + + +def spawn( + exe: str, + cwd: str, + argv: Tuple[str, ...], + env: Tuple[str, ...], + master: int, + slave: int, + stdin_read_fd: int, + stdin_write_fd: int, + ready_read_fd: int, + ready_write_fd: int +) -> int: + pass diff --git a/kitty/tabs.py b/kitty/tabs.py index 22abdf6bf..be47b9d59 100644 --- a/kitty/tabs.py +++ b/kitty/tabs.py @@ -3,9 +3,10 @@ # License: GPL v3 Copyright: 2016, Kovid Goyal import weakref -from collections import deque, namedtuple +from collections import deque from contextlib import suppress from functools import partial +from typing import Deque, NamedTuple, Optional, List, Dict, cast from .borders import Borders from .child import Child @@ -20,14 +21,22 @@ from .tab_bar import TabBar, TabBarData from .utils import log_error, resolved_shell from .window import Window -SpecialWindowInstance = namedtuple('SpecialWindow', 'cmd stdin override_title cwd_from cwd overlay_for env') + +class SpecialWindowInstance(NamedTuple): + cmd: Optional[List[str]] + stdin: Optional[bytes] + override_title: Optional[str] + cwd_from: Optional[int] + cwd: Optional[str] + overlay_for: Optional[int] + env: Optional[Dict[str, str]] def SpecialWindow(cmd, stdin=None, override_title=None, cwd_from=None, cwd=None, overlay_for=None, env=None): return SpecialWindowInstance(cmd, stdin, override_title, cwd_from, cwd, overlay_for, env) -def add_active_id_to_history(items, item_id, maxlen=64): +def add_active_id_to_history(items: Deque[int], item_id: int, maxlen: int = 64) -> None: with suppress(ValueError): items.remove(item_id) items.append(item_id) @@ -42,7 +51,7 @@ class Tab: # {{{ self.tab_manager_ref = weakref.ref(tab_manager) self.os_window_id = tab_manager.os_window_id self.id = add_tab(self.os_window_id) - self.active_window_history = deque() + self.active_window_history: Deque[int] = deque() if not self.id: raise Exception('No OS window with id {} found, or tab counter has wrapped'.format(self.os_window_id)) self.opts, self.args = tab_manager.opts, tab_manager.args @@ -50,7 +59,7 @@ class Tab: # {{{ self.name = getattr(session_tab, 'name', '') self.enabled_layouts = [x.lower() for x in getattr(session_tab, 'enabled_layouts', None) or self.opts.enabled_layouts] self.borders = Borders(self.os_window_id, self.id, self.opts) - self.windows = deque() + self.windows: Deque[Window] = deque() for i, which in enumerate('first second third fourth fifth sixth seventh eighth ninth tenth'.split()): setattr(self, which + '_window', partial(self.nth_window, num=i)) self._last_used_layout = self._current_layout_name = None @@ -121,15 +130,16 @@ class Tab: # {{{ @active_window_idx.setter def active_window_idx(self, val): try: - old_active_window = self.windows[self._active_window_idx] + old_active_window: Optional[Window] = self.windows[self._active_window_idx] except Exception: old_active_window = None else: + assert old_active_window is not None wid = old_active_window.id if old_active_window.overlay_for is None else old_active_window.overlay_for add_active_id_to_history(self.active_window_history, wid) self._active_window_idx = max(0, min(val, len(self.windows) - 1)) try: - new_active_window = self.windows[self._active_window_idx] + new_active_window: Optional[Window] = self.windows[self._active_window_idx] except Exception: new_active_window = None if old_active_window is not new_active_window: @@ -143,12 +153,12 @@ class Tab: # {{{ tm.mark_tab_bar_dirty() @property - def active_window(self): + def active_window(self) -> Optional[Window]: return self.windows[self.active_window_idx] if self.windows else None @property - def title(self): - return getattr(self.active_window, 'title', appname) + def title(self) -> str: + return cast(str, getattr(self.active_window, 'title', appname)) def set_title(self, title): self.name = title or '' @@ -261,7 +271,7 @@ class Tab: # {{{ cmd = resolved_shell(self.opts) else: cmd = self.args.args or resolved_shell(self.opts) - fenv = {} + fenv: Dict[str, str] = {} if env: fenv.update(env) fenv['KITTY_WINDOW_ID'] = str(next_window_id()) @@ -280,10 +290,20 @@ class Tab: # {{{ self.relayout_borders() def new_window( - self, use_shell=True, cmd=None, stdin=None, override_title=None, - cwd_from=None, cwd=None, overlay_for=None, env=None, location=None, - copy_colors_from=None, allow_remote_control=False, marker=None - ): + self, + use_shell: bool = True, + cmd: Optional[List[str]] = None, + stdin: Optional[bytes] = None, + override_title: Optional[str] = None, + cwd_from: Optional[int] = None, + cwd: Optional[str] = None, + overlay_for: Optional[int] = None, + env: Optional[Dict[str, str]] = None, + location: Optional[str] = None, + copy_colors_from: Optional[Window] = None, + allow_remote_control: bool = False, + marker: Optional[str] = None + ) -> Window: child = self.launch_child( use_shell=use_shell, cmd=cmd, stdin=stdin, cwd_from=cwd_from, cwd=cwd, env=env, allow_remote_control=allow_remote_control) window = Window(self, child, self.opts, self.args, override_title=override_title, copy_colors_from=copy_colors_from) @@ -302,8 +322,20 @@ class Tab: # {{{ traceback.print_exc() return window - def new_special_window(self, special_window, location=None, copy_colors_from=None, allow_remote_control=False): - return self.new_window(False, *special_window, location=location, copy_colors_from=copy_colors_from, allow_remote_control=allow_remote_control) + def new_special_window( + self, + special_window: SpecialWindowInstance, + location: Optional[str] = None, + copy_colors_from: Optional[Window] = None, + allow_remote_control: bool = False + ) -> Window: + return self.new_window( + use_shell=False, cmd=special_window.cmd, stdin=special_window.stdin, + override_title=special_window.override_title, + cwd_from=special_window.cwd_from, cwd=special_window.cwd, overlay_for=special_window.overlay_for, + env=special_window.env, location=location, copy_colors_from=copy_colors_from, + allow_remote_control=allow_remote_control + ) def close_window(self): if self.windows: @@ -479,8 +511,8 @@ class TabManager: # {{{ self.last_active_tab_id = None self.opts, self.args = opts, args self.tab_bar_hidden = self.opts.tab_bar_style == 'hidden' - self.tabs = [] - self.active_tab_history = deque() + self.tabs: List[Tab] = [] + self.active_tab_history: Deque[int] = deque() self.tab_bar = TabBar(self.os_window_id, opts) self._active_tab_idx = 0 @@ -496,14 +528,15 @@ class TabManager: # {{{ @active_tab_idx.setter def active_tab_idx(self, val): try: - old_active_tab = self.tabs[self._active_tab_idx] + old_active_tab: Optional[Tab] = self.tabs[self._active_tab_idx] except Exception: old_active_tab = None else: + assert old_active_tab is not None add_active_id_to_history(self.active_tab_history, old_active_tab.id) self._active_tab_idx = max(0, min(val, len(self.tabs) - 1)) try: - new_active_tab = self.tabs[self._active_tab_idx] + new_active_tab: Optional[Tab] = self.tabs[self._active_tab_idx] except Exception: new_active_tab = None if old_active_tab is not new_active_tab: