More typing work

This commit is contained in:
Kovid Goyal
2020-03-09 08:56:02 +05:30
parent 9beae321d7
commit 5bdb405635
5 changed files with 176 additions and 97 deletions

View File

@@ -6,7 +6,7 @@
import importlib
import os
import sys
from functools import partial
from functools import lru_cache, partial
aliases = {'url_hints': 'hints'}
@@ -42,7 +42,7 @@ def import_kitten_main_module(config_dir, kitten):
kitten = resolved_kitten(kitten)
m = importlib.import_module('kittens.{}.main'.format(kitten))
return {'start': m.main, 'end': getattr(m, 'handle_result', lambda *a, **k: None)}
return {'start': getattr(m, 'main'), 'end': getattr(m, 'handle_result', lambda *a, **k: None)}
def create_kitten_handler(kitten, orig_args):
@@ -50,15 +50,15 @@ def create_kitten_handler(kitten, orig_args):
kitten = resolved_kitten(kitten)
m = import_kitten_main_module(config_dir, kitten)
ans = partial(m['end'], [kitten] + orig_args)
ans.type_of_input = getattr(m['end'], 'type_of_input', None)
ans.no_ui = getattr(m['end'], 'no_ui', False)
setattr(ans, 'type_of_input', getattr(m['end'], 'type_of_input', None))
setattr(ans, 'no_ui', getattr(m['end'], 'no_ui', False))
return ans
def set_debug(kitten):
from kittens.tui.loop import debug
import builtins
builtins.debug = debug
setattr(builtins, 'debug', debug)
def launch(args):
@@ -117,18 +117,16 @@ def run_kitten(kitten, run_name='__main__'):
m['main'](sys.argv)
@lru_cache(maxsize=2)
def all_kitten_names():
ans = getattr(all_kitten_names, 'ans', None)
if ans is None:
n = []
import glob
base = os.path.dirname(os.path.abspath(__file__))
for x in glob.glob(os.path.join(base, '*', '__init__.py')):
q = os.path.basename(os.path.dirname(x))
if q != 'tui':
n.append(q)
all_kitten_names.ans = ans = frozenset(n)
return ans
n = []
import glob
base = os.path.dirname(os.path.abspath(__file__))
for x in glob.glob(os.path.join(base, '*', '__init__.py')):
q = os.path.basename(os.path.dirname(x))
if q != 'tui':
n.append(q)
return frozenset(n)
def list_kittens():
@@ -140,19 +138,19 @@ def list_kittens():
def get_kitten_cli_docs(kitten):
sys.cli_docs = {}
setattr(sys, 'cli_docs', {})
run_kitten(kitten, run_name='__doc__')
ans = sys.cli_docs
del sys.cli_docs
ans = getattr(sys, 'cli_docs')
delattr(sys, 'cli_docs')
if 'help_text' in ans and 'usage' in ans and 'options' in ans:
return ans
def get_kitten_conf_docs(kitten):
sys.all_options = None
setattr(sys, 'all_options', None)
run_kitten(kitten, run_name='__conf__')
ans = sys.all_options
del sys.all_options
ans = getattr(sys, 'all_options')
delattr(sys, 'all_options')
return ans

View File

@@ -9,6 +9,7 @@ import sys
from contextlib import suppress
from functools import lru_cache
from gettext import gettext as _
from typing import List, Optional, Sequence, Tuple, Union
from kitty.cli import parse_args
from kitty.cli_stub import UnicodeCLIOptions
@@ -121,15 +122,16 @@ def serialize_favorites(favorites):
return '\n'.join(ans)
def load_favorites(refresh=False):
ans = getattr(load_favorites, 'ans', None)
def load_favorites(refresh: bool = False) -> List[int]:
ans: Optional[List[int]] = getattr(load_favorites, 'ans', None)
if ans is None or refresh:
try:
with open(favorites_path, 'rb') as f:
raw = f.read().decode('utf-8')
ans = load_favorites.ans = list(parse_favorites(raw)) or list(DEFAULT_SET)
ans = list(parse_favorites(raw)) or list(DEFAULT_SET)
except FileNotFoundError:
ans = load_favorites.ans = list(DEFAULT_SET)
ans = list(DEFAULT_SET)
setattr(load_favorites, 'ans', ans)
return ans
@@ -232,7 +234,7 @@ class Table:
col_width = min(col_width, 40)
space_for_desc = col_width - 2 - idx_size - 4
num_cols = self.num_cols = max(cols // col_width, 1)
buf = []
buf: List[str] = []
a = buf.append
rows_left = rows
@@ -241,7 +243,7 @@ class Table:
rows_left -= 1
if rows_left == 0:
break
buf.append('\r\n')
a('\r\n')
buf.extend(cell(i, idx, c, desc))
a(' ')
self.text = ''.join(buf)
@@ -298,7 +300,7 @@ class UnicodeInput(Handler):
def update_codepoints(self):
codepoints = None
if self.mode is HEX:
q = self.mode, None
q: Tuple[str, Optional[Union[str, Sequence[int]]]] = self.mode, None
codepoints = self.recent
elif self.mode is EMOTICONS:
q = self.mode, None
@@ -317,9 +319,9 @@ class UnicodeInput(Handler):
words = words[:index_words[0]]
codepoints = codepoints_matching_search(tuple(words))
if index_words:
index_word = int(index_word.lstrip(INDEX_CHAR), 16)
if index_word < len(codepoints):
codepoints = [codepoints[index_word]]
iindex_word = int(index_word.lstrip(INDEX_CHAR), 16)
if codepoints and iindex_word < len(codepoints):
codepoints = [codepoints[iindex_word]]
if q != self.last_updated_code_point_at:
self.last_updated_code_point_at = q
self.table.set_codepoints(codepoints, self.mode)

View File

@@ -7,11 +7,12 @@ import os
import sys
from collections import defaultdict
from contextlib import contextmanager, suppress
from typing import DefaultDict, List, Optional
from typing import DefaultDict, Dict, Generator, Iterable, List, Optional
import kitty.fast_data_types as fast_data_types
from .constants import is_macos, shell_path, terminfo_dir
from .options_stub import Options
if is_macos:
from kitty.fast_data_types import (
@@ -62,40 +63,44 @@ else:
return ans
def checked_terminfo_dir():
ans = getattr(checked_terminfo_dir, 'ans', None)
if ans is None:
ans = checked_terminfo_dir.ans = terminfo_dir if os.path.isdir(terminfo_dir) else None
def checked_terminfo_dir() -> Optional[str]:
q = getattr(checked_terminfo_dir, 'ans', False)
if q is False:
ans = terminfo_dir if os.path.isdir(terminfo_dir) else None
setattr(checked_terminfo_dir, 'ans', ans)
else:
ans = q
return ans
def processes_in_group(grp):
gmap = getattr(process_group_map, 'cached_map', None)
def processes_in_group(grp: int) -> List[int]:
gmap: Optional[DefaultDict[int, List[int]]] = getattr(process_group_map, 'cached_map', None)
if gmap is None:
try:
gmap = process_group_map()
except Exception:
gmap = {}
gmap = defaultdict(list)
return gmap.get(grp, [])
@contextmanager
def cached_process_data():
def cached_process_data() -> Generator[None, None, None]:
try:
process_group_map.cached_map = process_group_map()
cm = process_group_map()
except Exception:
process_group_map.cached_map = {}
cm = defaultdict(list)
setattr(process_group_map, 'cached_map', cm)
try:
yield
finally:
process_group_map.cached_map = None
delattr(process_group_map, 'cached_map')
def parse_environ_block(data):
def parse_environ_block(data: str) -> Dict[str, str]:
"""Parse a C environ block of environment variables into a dictionary."""
# The block is usually raw data from the target process. It might contain
# trailing garbage and lines that do not look like assignments.
ret = {}
ret: Dict[str, str] = {}
pos = 0
while True:
@@ -114,39 +119,38 @@ def parse_environ_block(data):
return ret
def environ_of_process(pid):
def environ_of_process(pid: int) -> Dict[str, str]:
return parse_environ_block(_environ_of_process(pid))
def remove_cloexec(fd):
def remove_cloexec(fd: int) -> None:
fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.fcntl(fd, fcntl.F_GETFD) & ~fcntl.FD_CLOEXEC)
def remove_blocking(fd):
def remove_blocking(fd: int) -> None:
os.set_blocking(fd, False)
def process_env():
ans = os.environ
def process_env() -> Dict[str, str]:
ans = dict(os.environ)
ssl_env_var = getattr(sys, 'kitty_ssl_env_var', None)
if ssl_env_var is not None:
ans = ans.copy()
ans.pop(ssl_env_var, None)
return ans
def default_env():
try:
return default_env.env
except AttributeError:
def default_env() -> Dict[str, str]:
ans: Optional[Dict[str, str]] = getattr(default_env, 'env', None)
if ans is None:
return process_env()
return ans
def set_default_env(val=None):
def set_default_env(val: Optional[Dict[str, str]] = None) -> None:
env = process_env().copy()
if val:
env.update(val)
default_env.env = env
setattr(default_env, 'env', env)
def openpty():
@@ -162,7 +166,16 @@ class Child:
pid: Optional[int] = None
forked = False
def __init__(self, argv, cwd, opts, stdin=None, env=None, cwd_from=None, allow_remote_control=False):
def __init__(
self,
argv: Iterable[str],
cwd: str,
opts: Options,
stdin: Optional[bytes] = None,
env: Optional[Dict[str, str]] = None,
cwd_from: Optional[int] = None,
allow_remote_control=False
):
self.allow_remote_control = allow_remote_control
self.argv = argv
if cwd_from is not None:
@@ -179,8 +192,8 @@ class Child:
self.env = env or {}
@property
def final_env(self):
env = getattr(self, '_final_env', None)
def final_env(self) -> Dict[str, str]:
env: Optional[Dict[str, str]] = getattr(self, '_final_env', None)
if env is None:
env = self._final_env = default_env().copy()
env.update(self.env)
@@ -191,13 +204,14 @@ class Child:
# can use it to display the current directory name rather
# than the resolved path
env['PWD'] = self.cwd
if checked_terminfo_dir():
env['TERMINFO'] = checked_terminfo_dir()
tdir = checked_terminfo_dir()
if tdir:
env['TERMINFO'] = tdir
return env
def fork(self):
def fork(self) -> Optional[int]:
if self.forked:
return
return None
self.forked = True
master, slave = openpty()
stdin, self.stdin = self.stdin, None
@@ -208,8 +222,7 @@ class Child:
remove_cloexec(stdin_read_fd)
else:
stdin_read_fd = stdin_write_fd = -1
env = self.final_env
env = tuple('{}={}'.format(k, v) for k, v in env.items())
env = tuple('{}={}'.format(k, v) for k, v in self.final_env.items())
argv = list(self.argv)
exe = argv[0]
if is_macos and exe == shell_path:
@@ -237,7 +250,8 @@ class Child:
fast_data_types.thread_write(stdin_write_fd, stdin)
os.close(ready_read_fd)
self.terminal_ready_fd = ready_write_fd
remove_blocking(self.child_fd)
if self.child_fd is not None:
remove_blocking(self.child_fd)
return pid
def mark_terminal_ready(self):
@@ -245,7 +259,9 @@ class Child:
self.terminal_ready_fd = -1
@property
def foreground_processes(self):
def foreground_processes(self) -> List[int]:
if self.child_fd is None:
return []
try:
pgrp = os.tcgetpgrp(self.child_fd)
foreground_processes = processes_in_group(pgrp) if pgrp >= 0 else []
@@ -263,34 +279,39 @@ class Child:
return []
@property
def cmdline(self):
def cmdline(self) -> List[str]:
try:
assert self.pid is not None
return cmdline_of_process(self.pid) or list(self.argv)
except Exception:
return list(self.argv)
@property
def foreground_cmdline(self):
def foreground_cmdline(self) -> List[str]:
try:
assert self.pid_for_cwd is not None
return cmdline_of_process(self.pid_for_cwd) or self.cmdline
except Exception:
return self.cmdline
@property
def environ(self):
def environ(self) -> Dict[str, str]:
try:
assert self.pid is not None
return environ_of_process(self.pid)
except Exception:
return {}
@property
def current_cwd(self):
def current_cwd(self) -> Optional[str]:
with suppress(Exception):
assert self.pid is not None
return cwd_of_process(self.pid)
@property
def pid_for_cwd(self):
def pid_for_cwd(self) -> Optional[int]:
with suppress(Exception):
assert self.child_fd is not None
pgrp = os.tcgetpgrp(self.child_fd)
foreground_processes = processes_in_group(pgrp) if pgrp >= 0 else []
if len(foreground_processes) == 1:
@@ -298,16 +319,19 @@ class Child:
return self.pid
@property
def foreground_cwd(self):
def foreground_cwd(self) -> Optional[str]:
with suppress(Exception):
assert self.pid_for_cwd is not None
return cwd_of_process(self.pid_for_cwd) or None
@property
def foreground_environ(self):
def foreground_environ(self) -> Dict[str, str]:
try:
assert self.pid_for_cwd is not None
return environ_of_process(self.pid_for_cwd)
except Exception:
try:
assert self.pid is not None
return environ_of_process(self.pid)
except Exception:
pass

View File

@@ -960,6 +960,9 @@ class Screen:
def clear_selection(self) -> None:
pass
def refresh_sprite_positions(self) -> None:
pass
def set_marker(self, marker: Optional[Callable] = None) -> None:
pass
@@ -1019,3 +1022,22 @@ class ChildMonitor:
def set_iutf8_winid(self, win_id: int, on: bool) -> bool:
pass
def set_iutf8_fd(fd: int, on: bool) -> bool:
pass
def spawn(
exe: str,
cwd: str,
argv: Tuple[str, ...],
env: Tuple[str, ...],
master: int,
slave: int,
stdin_read_fd: int,
stdin_write_fd: int,
ready_read_fd: int,
ready_write_fd: int
) -> int:
pass

View File

@@ -3,9 +3,10 @@
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
import weakref
from collections import deque, namedtuple
from collections import deque
from contextlib import suppress
from functools import partial
from typing import Deque, NamedTuple, Optional, List, Dict, cast
from .borders import Borders
from .child import Child
@@ -20,14 +21,22 @@ from .tab_bar import TabBar, TabBarData
from .utils import log_error, resolved_shell
from .window import Window
SpecialWindowInstance = namedtuple('SpecialWindow', 'cmd stdin override_title cwd_from cwd overlay_for env')
class SpecialWindowInstance(NamedTuple):
cmd: Optional[List[str]]
stdin: Optional[bytes]
override_title: Optional[str]
cwd_from: Optional[int]
cwd: Optional[str]
overlay_for: Optional[int]
env: Optional[Dict[str, str]]
def SpecialWindow(cmd, stdin=None, override_title=None, cwd_from=None, cwd=None, overlay_for=None, env=None):
return SpecialWindowInstance(cmd, stdin, override_title, cwd_from, cwd, overlay_for, env)
def add_active_id_to_history(items, item_id, maxlen=64):
def add_active_id_to_history(items: Deque[int], item_id: int, maxlen: int = 64) -> None:
with suppress(ValueError):
items.remove(item_id)
items.append(item_id)
@@ -42,7 +51,7 @@ class Tab: # {{{
self.tab_manager_ref = weakref.ref(tab_manager)
self.os_window_id = tab_manager.os_window_id
self.id = add_tab(self.os_window_id)
self.active_window_history = deque()
self.active_window_history: Deque[int] = deque()
if not self.id:
raise Exception('No OS window with id {} found, or tab counter has wrapped'.format(self.os_window_id))
self.opts, self.args = tab_manager.opts, tab_manager.args
@@ -50,7 +59,7 @@ class Tab: # {{{
self.name = getattr(session_tab, 'name', '')
self.enabled_layouts = [x.lower() for x in getattr(session_tab, 'enabled_layouts', None) or self.opts.enabled_layouts]
self.borders = Borders(self.os_window_id, self.id, self.opts)
self.windows = deque()
self.windows: Deque[Window] = deque()
for i, which in enumerate('first second third fourth fifth sixth seventh eighth ninth tenth'.split()):
setattr(self, which + '_window', partial(self.nth_window, num=i))
self._last_used_layout = self._current_layout_name = None
@@ -121,15 +130,16 @@ class Tab: # {{{
@active_window_idx.setter
def active_window_idx(self, val):
try:
old_active_window = self.windows[self._active_window_idx]
old_active_window: Optional[Window] = self.windows[self._active_window_idx]
except Exception:
old_active_window = None
else:
assert old_active_window is not None
wid = old_active_window.id if old_active_window.overlay_for is None else old_active_window.overlay_for
add_active_id_to_history(self.active_window_history, wid)
self._active_window_idx = max(0, min(val, len(self.windows) - 1))
try:
new_active_window = self.windows[self._active_window_idx]
new_active_window: Optional[Window] = self.windows[self._active_window_idx]
except Exception:
new_active_window = None
if old_active_window is not new_active_window:
@@ -143,12 +153,12 @@ class Tab: # {{{
tm.mark_tab_bar_dirty()
@property
def active_window(self):
def active_window(self) -> Optional[Window]:
return self.windows[self.active_window_idx] if self.windows else None
@property
def title(self):
return getattr(self.active_window, 'title', appname)
def title(self) -> str:
return cast(str, getattr(self.active_window, 'title', appname))
def set_title(self, title):
self.name = title or ''
@@ -261,7 +271,7 @@ class Tab: # {{{
cmd = resolved_shell(self.opts)
else:
cmd = self.args.args or resolved_shell(self.opts)
fenv = {}
fenv: Dict[str, str] = {}
if env:
fenv.update(env)
fenv['KITTY_WINDOW_ID'] = str(next_window_id())
@@ -280,10 +290,20 @@ class Tab: # {{{
self.relayout_borders()
def new_window(
self, use_shell=True, cmd=None, stdin=None, override_title=None,
cwd_from=None, cwd=None, overlay_for=None, env=None, location=None,
copy_colors_from=None, allow_remote_control=False, marker=None
):
self,
use_shell: bool = True,
cmd: Optional[List[str]] = None,
stdin: Optional[bytes] = None,
override_title: Optional[str] = None,
cwd_from: Optional[int] = None,
cwd: Optional[str] = None,
overlay_for: Optional[int] = None,
env: Optional[Dict[str, str]] = None,
location: Optional[str] = None,
copy_colors_from: Optional[Window] = None,
allow_remote_control: bool = False,
marker: Optional[str] = None
) -> Window:
child = self.launch_child(
use_shell=use_shell, cmd=cmd, stdin=stdin, cwd_from=cwd_from, cwd=cwd, env=env, allow_remote_control=allow_remote_control)
window = Window(self, child, self.opts, self.args, override_title=override_title, copy_colors_from=copy_colors_from)
@@ -302,8 +322,20 @@ class Tab: # {{{
traceback.print_exc()
return window
def new_special_window(self, special_window, location=None, copy_colors_from=None, allow_remote_control=False):
return self.new_window(False, *special_window, location=location, copy_colors_from=copy_colors_from, allow_remote_control=allow_remote_control)
def new_special_window(
self,
special_window: SpecialWindowInstance,
location: Optional[str] = None,
copy_colors_from: Optional[Window] = None,
allow_remote_control: bool = False
) -> Window:
return self.new_window(
use_shell=False, cmd=special_window.cmd, stdin=special_window.stdin,
override_title=special_window.override_title,
cwd_from=special_window.cwd_from, cwd=special_window.cwd, overlay_for=special_window.overlay_for,
env=special_window.env, location=location, copy_colors_from=copy_colors_from,
allow_remote_control=allow_remote_control
)
def close_window(self):
if self.windows:
@@ -479,8 +511,8 @@ class TabManager: # {{{
self.last_active_tab_id = None
self.opts, self.args = opts, args
self.tab_bar_hidden = self.opts.tab_bar_style == 'hidden'
self.tabs = []
self.active_tab_history = deque()
self.tabs: List[Tab] = []
self.active_tab_history: Deque[int] = deque()
self.tab_bar = TabBar(self.os_window_id, opts)
self._active_tab_idx = 0
@@ -496,14 +528,15 @@ class TabManager: # {{{
@active_tab_idx.setter
def active_tab_idx(self, val):
try:
old_active_tab = self.tabs[self._active_tab_idx]
old_active_tab: Optional[Tab] = self.tabs[self._active_tab_idx]
except Exception:
old_active_tab = None
else:
assert old_active_tab is not None
add_active_id_to_history(self.active_tab_history, old_active_tab.id)
self._active_tab_idx = max(0, min(val, len(self.tabs) - 1))
try:
new_active_tab = self.tabs[self._active_tab_idx]
new_active_tab: Optional[Tab] = self.tabs[self._active_tab_idx]
except Exception:
new_active_tab = None
if old_active_tab is not new_active_tab: