mirror of
https://github.com/kovidgoyal/kitty
synced 2026-06-12 03:29:10 +02:00
Fixing this involved adding a new mode to kitty where it handles ctrl-c/z/q by sending signals to the tty foreground process group instead of delegating to the kernel to do that. Since the pipe may be full we have no way of knowing when the kernel will get around to reading the signal byte. So send the signal ourselves. Fixes #5271
1648 lines
63 KiB
Python
1648 lines
63 KiB
Python
#!/usr/bin/env python3
|
|
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import weakref
|
|
from collections import deque
|
|
from contextlib import suppress
|
|
from enum import Enum, IntEnum, auto
|
|
from functools import lru_cache, partial
|
|
from gettext import gettext as _
|
|
from itertools import chain
|
|
from time import monotonic
|
|
from typing import (
|
|
TYPE_CHECKING, Any, Callable, Deque, Dict, Iterable, List, NamedTuple,
|
|
Optional, Pattern, Sequence, Tuple, Union
|
|
)
|
|
|
|
from .child import ProcessDesc
|
|
from .cli_stub import CLIOptions
|
|
from .config import build_ansi_color_table
|
|
from .constants import (
|
|
appname, clear_handled_signals, config_dir, is_macos, wakeup_io_loop
|
|
)
|
|
from .fast_data_types import (
|
|
BGIMAGE_PROGRAM, BLIT_PROGRAM, CELL_BG_PROGRAM, CELL_FG_PROGRAM,
|
|
CELL_PROGRAM, CELL_SPECIAL_PROGRAM, CURSOR_BEAM, CURSOR_BLOCK,
|
|
CURSOR_UNDERLINE, DCS, DECORATION, DECORATION_MASK, DIM, GLFW_MOD_CONTROL,
|
|
GRAPHICS_ALPHA_MASK_PROGRAM, GRAPHICS_PREMULT_PROGRAM, GRAPHICS_PROGRAM,
|
|
MARK, MARK_MASK, NO_CURSOR_SHAPE, NUM_UNDERLINE_STYLES, OSC, REVERSE,
|
|
SCROLL_FULL, SCROLL_LINE, SCROLL_PAGE, STRIKETHROUGH, TINT_PROGRAM, Color,
|
|
KeyEvent, Screen, add_timer, add_window, cell_size_for_window,
|
|
click_mouse_cmd_output, click_mouse_url, compile_program,
|
|
current_os_window, encode_key_for_tty, get_boss, get_click_interval,
|
|
get_clipboard_string, get_options, init_cell_program, mark_os_window_dirty,
|
|
mouse_selection, move_cursor_to_mouse_if_in_prompt, pt_to_px,
|
|
set_clipboard_string, set_titlebar_color, set_window_logo,
|
|
set_window_padding, set_window_render_data, update_ime_position_for_window,
|
|
update_window_title, update_window_visibility, wakeup_main_loop
|
|
)
|
|
from .keys import keyboard_mode_name, mod_mask
|
|
from .notify import NotificationCommand, handle_notification_cmd
|
|
from .options.types import Options
|
|
from .rgb import to_color
|
|
from .terminfo import get_capabilities
|
|
from .types import MouseEvent, WindowGeometry, ac, run_once
|
|
from .typing import BossType, ChildType, EdgeLiteral, TabType, TypedDict
|
|
from .utils import (
|
|
get_primary_selection, kitty_ansi_sanitizer_pat, load_shaders, log_error,
|
|
open_cmd, open_url, parse_color_set, path_from_osc7_url,
|
|
resolve_custom_file, resolved_shell, sanitize_title, set_primary_selection
|
|
)
|
|
|
|
MatchPatternType = Union[Pattern[str], Tuple[Pattern[str], Optional[Pattern[str]]]]
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from .file_transmission import FileTransmission
|
|
|
|
|
|
class CwdRequestType(Enum):
|
|
current: int = auto()
|
|
last_reported: int = auto()
|
|
oldest: int = auto()
|
|
|
|
|
|
class CwdRequest:
|
|
|
|
def __init__(self, window: Optional['Window'] = None, request_type: CwdRequestType = CwdRequestType.current) -> None:
|
|
self.window_id = -1 if window is None else window.id
|
|
self.request_type = request_type
|
|
|
|
def __bool__(self) -> bool:
|
|
return self.window_id > -1
|
|
|
|
@property
|
|
def window(self) -> Optional['Window']:
|
|
return get_boss().window_id_map.get(self.window_id)
|
|
|
|
@property
|
|
def cwd_of_child(self) -> str:
|
|
window = self.window
|
|
if not window:
|
|
return ''
|
|
reported_cwd = path_from_osc7_url(window.screen.last_reported_cwd) if window.screen.last_reported_cwd else ''
|
|
if reported_cwd and not window.child_is_remote and (self.request_type is CwdRequestType.last_reported or window.at_prompt):
|
|
return reported_cwd
|
|
return window.get_cwd_of_child(oldest=self.request_type is CwdRequestType.oldest) or ''
|
|
|
|
def modify_argv_for_launch_with_cwd(self, argv: List[str]) -> str:
|
|
window = self.window
|
|
if not window:
|
|
return ''
|
|
reported_cwd = path_from_osc7_url(window.screen.last_reported_cwd) if window.screen.last_reported_cwd else ''
|
|
if reported_cwd:
|
|
# First check if we are running ssh kitten, and trying to open the configured login shell
|
|
if argv[0] == resolved_shell(get_options())[0]:
|
|
ssh_kitten_cmdline = window.ssh_kitten_cmdline()
|
|
if ssh_kitten_cmdline:
|
|
from kittens.ssh.utils import set_cwd_in_cmdline
|
|
argv[:] = ssh_kitten_cmdline
|
|
set_cwd_in_cmdline(reported_cwd, argv)
|
|
return ''
|
|
if not window.child_is_remote and (self.request_type is CwdRequestType.last_reported or window.at_prompt):
|
|
return reported_cwd
|
|
return window.get_cwd_of_child(oldest=self.request_type is CwdRequestType.oldest) or ''
|
|
|
|
|
|
def process_title_from_child(title: str, is_base64: bool) -> str:
|
|
if is_base64:
|
|
from base64 import standard_b64decode
|
|
try:
|
|
title = standard_b64decode(title).decode('utf-8', 'replace')
|
|
except Exception:
|
|
title = 'undecodeable title'
|
|
return sanitize_title(title)
|
|
|
|
|
|
@lru_cache(maxsize=64)
|
|
def compile_match_query(exp: str, is_simple: bool = True) -> MatchPatternType:
|
|
if is_simple:
|
|
pat: MatchPatternType = re.compile(exp)
|
|
else:
|
|
kp, vp = exp.partition('=')[::2]
|
|
if vp:
|
|
pat = re.compile(kp), re.compile(vp)
|
|
else:
|
|
pat = re.compile(kp), None
|
|
return pat
|
|
|
|
|
|
class WindowDict(TypedDict):
|
|
id: int
|
|
is_focused: bool
|
|
title: str
|
|
pid: Optional[int]
|
|
cwd: str
|
|
cmdline: List[str]
|
|
env: Dict[str, str]
|
|
foreground_processes: List[ProcessDesc]
|
|
is_self: bool
|
|
lines: int
|
|
columns: int
|
|
|
|
|
|
class PipeData(TypedDict):
|
|
input_line_number: int
|
|
scrolled_by: int
|
|
cursor_x: int
|
|
cursor_y: int
|
|
lines: int
|
|
columns: int
|
|
text: str
|
|
|
|
|
|
class ClipboardPending(NamedTuple):
|
|
where: str
|
|
data: str
|
|
truncated: bool = False
|
|
|
|
|
|
class DynamicColor(IntEnum):
|
|
default_fg, default_bg, cursor_color, highlight_fg, highlight_bg = range(1, 6)
|
|
|
|
|
|
class CommandOutput(IntEnum):
|
|
last_run, first_on_screen, last_visited, last_non_empty = 0, 1, 2, 3
|
|
|
|
|
|
DYNAMIC_COLOR_CODES = {
|
|
10: DynamicColor.default_fg,
|
|
11: DynamicColor.default_bg,
|
|
12: DynamicColor.cursor_color,
|
|
17: DynamicColor.highlight_bg,
|
|
19: DynamicColor.highlight_fg,
|
|
}
|
|
DYNAMIC_COLOR_CODES.update({k+100: v for k, v in DYNAMIC_COLOR_CODES.items()})
|
|
|
|
|
|
class Watcher:
|
|
|
|
def __call__(self, boss: BossType, window: 'Window', data: Dict[str, Any]) -> None:
|
|
pass
|
|
|
|
|
|
class Watchers:
|
|
|
|
on_resize: List[Watcher]
|
|
on_close: List[Watcher]
|
|
on_focus_change: List[Watcher]
|
|
|
|
def __init__(self) -> None:
|
|
self.on_resize = []
|
|
self.on_close = []
|
|
self.on_focus_change = []
|
|
|
|
def add(self, others: 'Watchers') -> None:
|
|
def merge(base: List[Watcher], other: List[Watcher]) -> None:
|
|
for x in other:
|
|
if x not in base:
|
|
base.append(x)
|
|
merge(self.on_resize, others.on_resize)
|
|
merge(self.on_close, others.on_close)
|
|
merge(self.on_focus_change, others.on_focus_change)
|
|
|
|
def clear(self) -> None:
|
|
del self.on_close[:], self.on_resize[:], self.on_focus_change[:]
|
|
|
|
def copy(self) -> 'Watchers':
|
|
ans = Watchers()
|
|
ans.on_close = self.on_close[:]
|
|
ans.on_resize = self.on_resize[:]
|
|
ans.on_focus_change = self.on_focus_change[:]
|
|
return ans
|
|
|
|
@property
|
|
def has_watchers(self) -> bool:
|
|
return bool(self.on_close or self.on_resize or self.on_focus_change)
|
|
|
|
|
|
def call_watchers(windowref: Callable[[], Optional['Window']], which: str, data: Dict[str, Any]) -> None:
|
|
|
|
def callback(timer_id: Optional[int]) -> None:
|
|
w = windowref()
|
|
if w is not None:
|
|
watchers: List[Watcher] = getattr(w.watchers, which)
|
|
w.call_watchers(watchers, data)
|
|
|
|
add_timer(callback, 0, False)
|
|
|
|
|
|
def pagerhist(screen: Screen, as_ansi: bool = False, add_wrap_markers: bool = True, upto_output_start: bool = False) -> str:
|
|
pht = screen.historybuf.pagerhist_as_text(upto_output_start)
|
|
if pht and (not as_ansi or not add_wrap_markers):
|
|
sanitizer = text_sanitizer(as_ansi, add_wrap_markers)
|
|
pht = sanitizer(pht)
|
|
return pht
|
|
|
|
|
|
def as_text(
|
|
screen: Screen,
|
|
as_ansi: bool = False,
|
|
add_history: bool = False,
|
|
add_wrap_markers: bool = False,
|
|
alternate_screen: bool = False,
|
|
add_cursor: bool = False
|
|
) -> str:
|
|
lines: List[str] = []
|
|
add_history = add_history and not (screen.is_using_alternate_linebuf() ^ alternate_screen)
|
|
if alternate_screen:
|
|
f = screen.as_text_alternate
|
|
else:
|
|
f = screen.as_text_non_visual if add_history else screen.as_text
|
|
f(lines.append, as_ansi, add_wrap_markers)
|
|
ctext = ''
|
|
if add_cursor:
|
|
ctext += '\x1b[?25' + ('h' if screen.cursor_visible else 'l')
|
|
ctext += f'\x1b[{screen.cursor.y + 1};{screen.cursor.x + 1}H'
|
|
shape = screen.cursor.shape
|
|
if shape == NO_CURSOR_SHAPE:
|
|
ctext += '\x1b[?12' + ('h' if screen.cursor.blink else 'l')
|
|
else:
|
|
code = {CURSOR_BLOCK: 1, CURSOR_UNDERLINE: 3, CURSOR_BEAM: 5}[shape]
|
|
if not screen.cursor.blink:
|
|
code += 1
|
|
ctext += f'\x1b[{code} q'
|
|
|
|
if add_history:
|
|
pht = pagerhist(screen, as_ansi, add_wrap_markers)
|
|
h: List[str] = [pht] if pht else []
|
|
screen.historybuf.as_text(h.append, as_ansi, add_wrap_markers)
|
|
if h:
|
|
if not screen.linebuf.is_continued(0):
|
|
h[-1] += '\n'
|
|
if as_ansi:
|
|
h[-1] += '\x1b[m'
|
|
ans = ''.join(chain(h, lines))
|
|
if ctext:
|
|
ans += ctext
|
|
return ans
|
|
ans = ''.join(lines)
|
|
if ctext:
|
|
ans += ctext
|
|
return ans
|
|
|
|
|
|
def multi_replace(src: str, **replacements: Any) -> str:
|
|
r = {k: str(v) for k, v in replacements.items()}
|
|
|
|
def sub(m: 're.Match[str]') -> str:
|
|
return r.get(m.group(1), m.group(1))
|
|
|
|
return re.sub(r'\{([A-Z_]+)\}', sub, src)
|
|
|
|
|
|
class LoadShaderPrograms:
|
|
|
|
def __call__(self, semi_transparent: bool = False) -> None:
|
|
compile_program(BLIT_PROGRAM, *load_shaders('blit'))
|
|
v, f = load_shaders('cell')
|
|
|
|
for which, p in {
|
|
'SIMPLE': CELL_PROGRAM,
|
|
'BACKGROUND': CELL_BG_PROGRAM,
|
|
'SPECIAL': CELL_SPECIAL_PROGRAM,
|
|
'FOREGROUND': CELL_FG_PROGRAM,
|
|
}.items():
|
|
ff = f.replace('{WHICH_PROGRAM}', which)
|
|
vv = multi_replace(
|
|
v,
|
|
WHICH_PROGRAM=which,
|
|
REVERSE_SHIFT=REVERSE,
|
|
STRIKE_SHIFT=STRIKETHROUGH,
|
|
DIM_SHIFT=DIM,
|
|
DECORATION_SHIFT=DECORATION,
|
|
MARK_SHIFT=MARK,
|
|
MARK_MASK=MARK_MASK,
|
|
DECORATION_MASK=DECORATION_MASK,
|
|
STRIKE_SPRITE_INDEX=NUM_UNDERLINE_STYLES + 1,
|
|
)
|
|
if semi_transparent:
|
|
vv = vv.replace('#define NOT_TRANSPARENT', '#define TRANSPARENT')
|
|
ff = ff.replace('#define NOT_TRANSPARENT', '#define TRANSPARENT')
|
|
compile_program(p, vv, ff)
|
|
|
|
v, f = load_shaders('graphics')
|
|
for which, p in {
|
|
'SIMPLE': GRAPHICS_PROGRAM,
|
|
'PREMULT': GRAPHICS_PREMULT_PROGRAM,
|
|
'ALPHA_MASK': GRAPHICS_ALPHA_MASK_PROGRAM,
|
|
}.items():
|
|
ff = f.replace('ALPHA_TYPE', which)
|
|
compile_program(p, v, ff)
|
|
|
|
v, f = load_shaders('bgimage')
|
|
compile_program(BGIMAGE_PROGRAM, v, f)
|
|
v, f = load_shaders('tint')
|
|
compile_program(TINT_PROGRAM, v, f)
|
|
init_cell_program()
|
|
|
|
|
|
load_shader_programs = LoadShaderPrograms()
|
|
|
|
|
|
def setup_colors(screen: Screen, opts: Options) -> None:
|
|
screen.color_profile.update_ansi_color_table(build_ansi_color_table(opts))
|
|
|
|
def s(c: Optional[Color]) -> int:
|
|
return 0 if c is None else (0xff000000 | int(c))
|
|
screen.color_profile.set_configured_colors(
|
|
s(opts.foreground), s(opts.background),
|
|
s(opts.cursor), s(opts.cursor_text_color),
|
|
s(opts.selection_foreground), s(opts.selection_background),
|
|
s(opts.visual_bell_color)
|
|
)
|
|
|
|
|
|
@run_once
|
|
def load_paste_filter() -> Callable[[str], str]:
|
|
import runpy
|
|
import traceback
|
|
try:
|
|
m = runpy.run_path(os.path.join(config_dir, 'paste-actions.py'))
|
|
func: Callable[[str], str] = m['filter_paste']
|
|
except Exception as e:
|
|
if not isinstance(e, FileNotFoundError):
|
|
traceback.print_exc()
|
|
log_error(f'Failed to load paste filter function with error: {e}')
|
|
|
|
def func(text: str) -> str:
|
|
return text
|
|
return func
|
|
|
|
|
|
def text_sanitizer(as_ansi: bool, add_wrap_markers: bool) -> Callable[[str], str]:
|
|
pat = kitty_ansi_sanitizer_pat()
|
|
ansi, wrap_markers = not as_ansi, not add_wrap_markers
|
|
|
|
def remove_wrap_markers(line: str) -> str:
|
|
return line.replace('\r', '')
|
|
|
|
def remove_sgr(line: str) -> str:
|
|
return str(pat.sub('', line))
|
|
|
|
def remove_both(line: str) -> str:
|
|
return str(pat.sub('', line.replace('\r', '')))
|
|
|
|
if ansi:
|
|
return remove_both if wrap_markers else remove_sgr
|
|
return remove_wrap_markers
|
|
|
|
|
|
def cmd_output(screen: Screen, which: CommandOutput = CommandOutput.last_run, as_ansi: bool = False, add_wrap_markers: bool = False) -> str:
|
|
lines: List[str] = []
|
|
search_in_pager_hist = screen.cmd_output(which, lines.append, as_ansi, add_wrap_markers)
|
|
if search_in_pager_hist:
|
|
pht = pagerhist(screen, as_ansi, add_wrap_markers, True)
|
|
if pht:
|
|
lines.insert(0, pht)
|
|
for i in range(min(len(lines), 3)):
|
|
x = lines[i]
|
|
if x.startswith('\x1b]133;C'):
|
|
lines[i] = x.partition('\\')[-1]
|
|
return ''.join(lines)
|
|
|
|
|
|
def process_remote_print(msg: str) -> str:
|
|
from base64 import standard_b64decode
|
|
|
|
from .cli import green
|
|
text = standard_b64decode(msg).decode('utf-8', 'replace')
|
|
return text.replace('\x1b', green(r'\e')).replace('\a', green(r'\a')).replace('\0', green(r'\0'))
|
|
|
|
|
|
class EdgeWidths:
|
|
left: Optional[float]
|
|
top: Optional[float]
|
|
right: Optional[float]
|
|
bottom: Optional[float]
|
|
|
|
def __init__(self, serialized: Optional[Dict[str, Optional[float]]] = None):
|
|
if serialized is not None:
|
|
self.left = serialized['left']
|
|
self.right = serialized['right']
|
|
self.top = serialized['top']
|
|
self.bottom = serialized['bottom']
|
|
else:
|
|
self.left = self.top = self.right = self.bottom = None
|
|
|
|
def serialize(self) -> Dict[str, Optional[float]]:
|
|
return {'left': self.left, 'right': self.right, 'top': self.top, 'bottom': self.bottom}
|
|
|
|
|
|
class GlobalWatchers:
|
|
|
|
def __init__(self) -> None:
|
|
self.options_spec: Optional[Dict[str, str]] = None
|
|
self.ans = Watchers()
|
|
self.extra = ''
|
|
|
|
def __call__(self) -> Watchers:
|
|
spec = get_options().watcher
|
|
if spec == self.options_spec:
|
|
return self.ans
|
|
from .launch import load_watch_modules
|
|
if self.extra:
|
|
spec = spec.copy()
|
|
spec[self.extra] = self.extra
|
|
self.ans = load_watch_modules(spec.keys()) or self.ans
|
|
self.options_spec = spec.copy()
|
|
return self.ans
|
|
|
|
def set_extra(self, extra: str) -> None:
|
|
self.extra = extra
|
|
|
|
|
|
global_watchers = GlobalWatchers()
|
|
|
|
|
|
class Window:
|
|
|
|
def __init__(
|
|
self,
|
|
tab: TabType,
|
|
child: ChildType,
|
|
args: CLIOptions,
|
|
override_title: Optional[str] = None,
|
|
copy_colors_from: Optional['Window'] = None,
|
|
watchers: Optional[Watchers] = None
|
|
):
|
|
if watchers:
|
|
self.watchers = watchers
|
|
self.watchers.add(global_watchers())
|
|
else:
|
|
self.watchers = global_watchers().copy()
|
|
self.last_focused_at = 0.
|
|
self.started_at = monotonic()
|
|
self.current_remote_data: List[str] = []
|
|
self.current_mouse_event_button = 0
|
|
self.current_clipboard_read_ask: Optional[bool] = None
|
|
self.prev_osc99_cmd = NotificationCommand()
|
|
self.actions_on_close: List[Callable[['Window'], None]] = []
|
|
self.actions_on_focus_change: List[Callable[['Window', bool], None]] = []
|
|
self.actions_on_removal: List[Callable[['Window'], None]] = []
|
|
self.current_marker_spec: Optional[Tuple[str, Union[str, Tuple[Tuple[int, str], ...]]]] = None
|
|
self.kitten_result_processors: List[Callable[['Window', Any], None]] = []
|
|
self.pty_resized_once = False
|
|
self.last_reported_pty_size = (-1, -1, -1, -1)
|
|
self.needs_attention = False
|
|
self.override_title = override_title
|
|
self.default_title = os.path.basename(child.argv[0] or appname)
|
|
self.child_title = self.default_title
|
|
self.title_stack: Deque[str] = deque(maxlen=10)
|
|
self.allow_remote_control = child.allow_remote_control
|
|
self.id: int = add_window(tab.os_window_id, tab.id, self.title)
|
|
self.margin = EdgeWidths()
|
|
self.padding = EdgeWidths()
|
|
self.kitten_result: Optional[Dict[str, Any]] = None
|
|
if not self.id:
|
|
raise Exception(f'No tab with id: {tab.id} in OS Window: {tab.os_window_id} was found, or the window counter wrapped')
|
|
self.tab_id = tab.id
|
|
self.os_window_id = tab.os_window_id
|
|
self.tabref: Callable[[], Optional[TabType]] = weakref.ref(tab)
|
|
self.clipboard_pending: Optional[ClipboardPending] = None
|
|
self.destroyed = False
|
|
self.geometry: WindowGeometry = WindowGeometry(0, 0, 0, 0, 0, 0)
|
|
self.needs_layout = True
|
|
self.is_visible_in_layout: bool = True
|
|
self.child = child
|
|
cell_width, cell_height = cell_size_for_window(self.os_window_id)
|
|
opts = get_options()
|
|
self.screen: Screen = Screen(self, 24, 80, opts.scrollback_lines, cell_width, cell_height, self.id)
|
|
if copy_colors_from is not None:
|
|
self.screen.copy_colors_from(copy_colors_from.screen)
|
|
else:
|
|
setup_colors(self.screen, opts)
|
|
|
|
@property
|
|
def file_transmission_control(self) -> 'FileTransmission':
|
|
ans: Optional['FileTransmission'] = getattr(self, '_file_transmission', None)
|
|
if ans is None:
|
|
from .file_transmission import FileTransmission
|
|
ans = self._file_transmission = FileTransmission(self.id)
|
|
return ans
|
|
|
|
def on_dpi_change(self, font_sz: float) -> None:
|
|
self.update_effective_padding()
|
|
|
|
def change_tab(self, tab: TabType) -> None:
|
|
self.tab_id = tab.id
|
|
self.os_window_id = tab.os_window_id
|
|
self.tabref = weakref.ref(tab)
|
|
|
|
def effective_margin(self, edge: EdgeLiteral, is_single_window: bool = False) -> int:
|
|
q = getattr(self.margin, edge)
|
|
if q is not None:
|
|
return pt_to_px(q, self.os_window_id)
|
|
opts = get_options()
|
|
if is_single_window:
|
|
q = getattr(opts.single_window_margin_width, edge)
|
|
if q > -0.1:
|
|
return pt_to_px(q, self.os_window_id)
|
|
q = getattr(opts.window_margin_width, edge)
|
|
return pt_to_px(q, self.os_window_id)
|
|
|
|
def effective_padding(self, edge: EdgeLiteral) -> int:
|
|
q = getattr(self.padding, edge)
|
|
if q is not None:
|
|
return pt_to_px(q, self.os_window_id)
|
|
q = getattr(get_options().window_padding_width, edge)
|
|
return pt_to_px(q, self.os_window_id)
|
|
|
|
def update_effective_padding(self) -> None:
|
|
set_window_padding(
|
|
self.os_window_id, self.tab_id, self.id,
|
|
self.effective_padding('left'), self.effective_padding('top'),
|
|
self.effective_padding('right'), self.effective_padding('bottom'))
|
|
|
|
def patch_edge_width(self, which: str, edge: EdgeLiteral, val: Optional[float]) -> None:
|
|
q = self.padding if which == 'padding' else self.margin
|
|
setattr(q, edge, val)
|
|
if q is self.padding:
|
|
self.update_effective_padding()
|
|
|
|
def effective_border(self) -> int:
|
|
val, unit = get_options().window_border_width
|
|
if unit == 'pt':
|
|
val = max(1 if val > 0 else 0, pt_to_px(val, self.os_window_id))
|
|
else:
|
|
val = round(val)
|
|
return int(val)
|
|
|
|
def apply_options(self) -> None:
|
|
opts = get_options()
|
|
self.update_effective_padding()
|
|
self.change_titlebar_color()
|
|
setup_colors(self.screen, opts)
|
|
|
|
@property
|
|
def title(self) -> str:
|
|
return self.override_title or self.child_title
|
|
|
|
def __repr__(self) -> str:
|
|
return f'Window(title={self.title}, id={self.id})'
|
|
|
|
def as_dict(self, is_focused: bool = False, is_self: bool = False) -> WindowDict:
|
|
return dict(
|
|
id=self.id,
|
|
is_focused=is_focused,
|
|
title=self.title,
|
|
pid=self.child.pid,
|
|
cwd=self.child.current_cwd or self.child.cwd,
|
|
cmdline=self.child.cmdline,
|
|
env=self.child.environ,
|
|
foreground_processes=self.child.foreground_processes,
|
|
is_self=is_self,
|
|
lines=self.screen.lines,
|
|
columns=self.screen.columns,
|
|
)
|
|
|
|
def serialize_state(self) -> Dict[str, Any]:
|
|
return {
|
|
'version': 1,
|
|
'id': self.id,
|
|
'child_title': self.child_title,
|
|
'override_title': self.override_title,
|
|
'default_title': self.default_title,
|
|
'title_stack': list(self.title_stack),
|
|
'allow_remote_control': self.allow_remote_control,
|
|
'cwd': self.child.current_cwd or self.child.cwd,
|
|
'env': self.child.environ,
|
|
'cmdline': self.child.cmdline,
|
|
'margin': self.margin.serialize(),
|
|
'padding': self.padding.serialize(),
|
|
}
|
|
|
|
@property
|
|
def current_colors(self) -> Dict[str, Optional[int]]:
|
|
return self.screen.color_profile.as_dict()
|
|
|
|
@property
|
|
def at_prompt(self) -> bool:
|
|
return self.screen.cursor_at_prompt()
|
|
|
|
@property
|
|
def has_running_program(self) -> bool:
|
|
return not self.at_prompt
|
|
|
|
def matches(self, field: str, pat: MatchPatternType) -> bool:
|
|
if not pat:
|
|
return False
|
|
if field == 'env':
|
|
assert isinstance(pat, tuple)
|
|
key_pat, val_pat = pat
|
|
for key, val in self.child.environ.items():
|
|
if key_pat.search(key) is not None and (
|
|
val_pat is None or val_pat.search(val) is not None):
|
|
return True
|
|
return False
|
|
assert not isinstance(pat, tuple)
|
|
|
|
if field in ('id', 'window_id'):
|
|
return pat.pattern == str(self.id)
|
|
if field == 'pid':
|
|
return pat.pattern == str(self.child.pid)
|
|
if field == 'title':
|
|
return pat.search(self.override_title or self.title) is not None
|
|
if field in 'cwd':
|
|
return pat.search(self.child.current_cwd or self.child.cwd) is not None
|
|
if field == 'cmdline':
|
|
for x in self.child.cmdline:
|
|
if pat.search(x) is not None:
|
|
return True
|
|
return False
|
|
return False
|
|
|
|
def matches_query(self, field: str, query: str, active_tab: Optional[TabType] = None) -> bool:
|
|
if field in ('num', 'recent'):
|
|
if active_tab is not None:
|
|
try:
|
|
q = int(query)
|
|
except Exception:
|
|
return False
|
|
with suppress(Exception):
|
|
if field == 'num':
|
|
return active_tab.get_nth_window(q) is self
|
|
return active_tab.nth_active_window_id(q) == self.id
|
|
return False
|
|
if field == 'state':
|
|
if query == 'active':
|
|
return active_tab is not None and self is active_tab.active_window
|
|
if query == 'focused':
|
|
return active_tab is not None and self is active_tab.active_window and current_os_window() == self.os_window_id
|
|
if query == 'needs_attention':
|
|
return self.needs_attention
|
|
if query == 'parent_active':
|
|
return active_tab is not None and self.tabref() is active_tab
|
|
if query == 'parent_focused':
|
|
return active_tab is not None and self.tabref() is active_tab and current_os_window() == self.os_window_id
|
|
return False
|
|
pat = compile_match_query(query, field != 'env')
|
|
return self.matches(field, pat)
|
|
|
|
def set_visible_in_layout(self, val: bool) -> None:
|
|
val = bool(val)
|
|
if val is not self.is_visible_in_layout:
|
|
self.is_visible_in_layout = val
|
|
update_window_visibility(self.os_window_id, self.tab_id, self.id, val)
|
|
if val:
|
|
self.refresh()
|
|
|
|
def refresh(self) -> None:
|
|
self.screen.mark_as_dirty()
|
|
wakeup_io_loop()
|
|
wakeup_main_loop()
|
|
|
|
def set_geometry(self, new_geometry: WindowGeometry) -> None:
|
|
if self.destroyed:
|
|
return
|
|
if self.needs_layout or new_geometry.xnum != self.screen.columns or new_geometry.ynum != self.screen.lines:
|
|
self.screen.resize(new_geometry.ynum, new_geometry.xnum)
|
|
self.needs_layout = False
|
|
call_watchers(weakref.ref(self), 'on_resize', {'old_geometry': self.geometry, 'new_geometry': new_geometry})
|
|
current_pty_size = (
|
|
self.screen.lines, self.screen.columns,
|
|
max(0, new_geometry.right - new_geometry.left), max(0, new_geometry.bottom - new_geometry.top))
|
|
update_ime_position = False
|
|
if current_pty_size != self.last_reported_pty_size:
|
|
get_boss().child_monitor.resize_pty(self.id, *current_pty_size)
|
|
if not self.pty_resized_once:
|
|
self.pty_resized_once = True
|
|
self.child.mark_terminal_ready()
|
|
update_ime_position = True
|
|
self.last_reported_pty_size = current_pty_size
|
|
else:
|
|
mark_os_window_dirty(self.os_window_id)
|
|
|
|
self.geometry = g = new_geometry
|
|
set_window_render_data(self.os_window_id, self.tab_id, self.id, self.screen, *g[:4])
|
|
self.update_effective_padding()
|
|
if update_ime_position:
|
|
update_ime_position_for_window(self.id, True)
|
|
|
|
def contains(self, x: int, y: int) -> bool:
|
|
g = self.geometry
|
|
return g.left <= x <= g.right and g.top <= y <= g.bottom
|
|
|
|
def close(self) -> None:
|
|
get_boss().mark_window_for_close(self)
|
|
|
|
@ac('misc', '''
|
|
Send the specified text to the active window
|
|
|
|
See :sc:`send_text <send_text>` for details.
|
|
''')
|
|
def send_text(self, *args: str) -> bool:
|
|
mode = keyboard_mode_name(self.screen)
|
|
required_mode_, text = args[-2:]
|
|
required_mode = frozenset(required_mode_.split(','))
|
|
if not required_mode & {mode, 'all'}:
|
|
return True
|
|
if not text:
|
|
return True
|
|
self.write_to_child(text)
|
|
return False
|
|
|
|
@ac('debug', 'Show a dump of the current lines in the scrollback + screen with their line attributes')
|
|
def dump_lines_with_attrs(self) -> None:
|
|
strings: List[str] = []
|
|
self.screen.dump_lines_with_attrs(strings.append)
|
|
text = ''.join(strings)
|
|
get_boss().display_scrollback(self, text, title='Dump of lines', report_cursor=False)
|
|
|
|
def write_to_child(self, data: Union[str, bytes]) -> None:
|
|
if data:
|
|
if isinstance(data, str):
|
|
data = data.encode('utf-8')
|
|
if get_boss().child_monitor.needs_write(self.id, data) is not True:
|
|
log_error(f'Failed to write to child {self.id} as it does not exist')
|
|
|
|
def title_updated(self) -> None:
|
|
update_window_title(self.os_window_id, self.tab_id, self.id, self.title)
|
|
t = self.tabref()
|
|
if t is not None:
|
|
t.title_changed(self)
|
|
|
|
def set_title(self, title: Optional[str]) -> None:
|
|
if title:
|
|
title = sanitize_title(title)
|
|
self.override_title = title or None
|
|
self.title_updated()
|
|
|
|
def desktop_notify(self, osc_code: int, raw_data: str) -> None:
|
|
if osc_code == 777:
|
|
if not raw_data.startswith('notify;'):
|
|
log_error(f'Ignoring unknown OSC 777: {raw_data}')
|
|
return # unknown OSC 777
|
|
raw_data = raw_data[len('notify;'):]
|
|
cmd = handle_notification_cmd(osc_code, raw_data, self.id, self.prev_osc99_cmd)
|
|
if cmd is not None and osc_code == 99:
|
|
self.prev_osc99_cmd = cmd
|
|
|
|
# screen callbacks {{{
|
|
def use_utf8(self, on: bool) -> None:
|
|
get_boss().child_monitor.set_iutf8_winid(self.id, on)
|
|
|
|
def on_mouse_event(self, event: Dict[str, Any]) -> bool:
|
|
event['mods'] = event.get('mods', 0) & mod_mask
|
|
ev = MouseEvent(**event)
|
|
self.current_mouse_event_button = ev.button
|
|
action = get_options().mousemap.get(ev)
|
|
if action is None:
|
|
return False
|
|
return get_boss().combine(action, window_for_dispatch=self, dispatch_type='MouseEvent')
|
|
|
|
def open_url(self, url: str, hyperlink_id: int, cwd: Optional[str] = None) -> None:
|
|
opts = get_options()
|
|
if hyperlink_id:
|
|
if not opts.allow_hyperlinks:
|
|
return
|
|
from urllib.parse import unquote, urlparse, urlunparse
|
|
try:
|
|
purl = urlparse(url)
|
|
except Exception:
|
|
return
|
|
if (not purl.scheme or purl.scheme == 'file'):
|
|
if purl.netloc:
|
|
from socket import gethostname
|
|
try:
|
|
hostname = gethostname()
|
|
except Exception:
|
|
hostname = ''
|
|
remote_hostname = purl.netloc.partition(':')[0]
|
|
if remote_hostname and remote_hostname != hostname and remote_hostname != 'localhost':
|
|
self.handle_remote_file(purl.netloc, unquote(purl.path))
|
|
return
|
|
url = urlunparse(purl._replace(netloc=''))
|
|
if opts.allow_hyperlinks & 0b10:
|
|
from kittens.tui.operations import styled
|
|
get_boss().choose(
|
|
'What would you like to do with this URL:\n' + styled(unquote(url), fg='yellow'),
|
|
partial(self.hyperlink_open_confirmed, url, cwd),
|
|
'o:Open', 'c:Copy to clipboard', 'n;red:Nothing', default='o',
|
|
window=self,
|
|
)
|
|
return
|
|
get_boss().open_url(url, cwd=cwd)
|
|
|
|
def hyperlink_open_confirmed(self, url: str, cwd: Optional[str], q: str) -> None:
|
|
if q == 'o':
|
|
get_boss().open_url(url, cwd=cwd)
|
|
elif q == 'c':
|
|
set_clipboard_string(url)
|
|
|
|
def handle_remote_file(self, netloc: str, remote_path: str) -> None:
|
|
from kittens.remote_file.main import is_ssh_kitten_sentinel
|
|
from kittens.ssh.main import get_connection_data
|
|
|
|
from .utils import SSHConnectionData
|
|
args = self.ssh_kitten_cmdline()
|
|
conn_data: Union[None, List[str], SSHConnectionData] = None
|
|
if args:
|
|
ssh_cmdline = sorted(self.child.foreground_processes, key=lambda p: p['pid'])[-1]['cmdline'] or ['']
|
|
if 'ControlPath=' in ' '.join(ssh_cmdline):
|
|
idx = ssh_cmdline.index('--')
|
|
conn_data = [is_ssh_kitten_sentinel] + list(ssh_cmdline[:idx + 2])
|
|
if conn_data is None:
|
|
args = self.child.foreground_cmdline
|
|
conn_data = get_connection_data(args, self.child.foreground_cwd or self.child.current_cwd or '')
|
|
if conn_data is None:
|
|
get_boss().show_error('Could not handle remote file', f'No SSH connection data found in: {args}')
|
|
return
|
|
get_boss().run_kitten(
|
|
'remote_file', '--hostname', netloc.partition(':')[0], '--path', remote_path,
|
|
'--ssh-connection-data', json.dumps(conn_data)
|
|
)
|
|
|
|
def send_signal_for_key(self, key_num: int) -> bool:
|
|
try:
|
|
return self.child.send_signal_for_key(key_num)
|
|
except OSError as err:
|
|
log_error(f'Failed to send signal for key to child with err: {err}')
|
|
return False
|
|
|
|
def focus_changed(self, focused: bool) -> None:
|
|
if self.destroyed:
|
|
return
|
|
call_watchers(weakref.ref(self), 'on_focus_change', {'focused': focused})
|
|
for c in self.actions_on_focus_change:
|
|
try:
|
|
c(self, focused)
|
|
except Exception:
|
|
import traceback
|
|
traceback.print_exc()
|
|
self.screen.focus_changed(focused)
|
|
if focused:
|
|
self.last_focused_at = monotonic()
|
|
update_ime_position_for_window(self.id, False, 1)
|
|
changed = self.needs_attention
|
|
self.needs_attention = False
|
|
if changed:
|
|
tab = self.tabref()
|
|
if tab is not None:
|
|
tab.relayout_borders()
|
|
elif self.os_window_id == current_os_window():
|
|
# Cancel IME composition after loses focus
|
|
update_ime_position_for_window(self.id, False, -1)
|
|
|
|
def title_changed(self, new_title: Optional[str], is_base64: bool = False) -> None:
|
|
self.child_title = process_title_from_child(new_title or self.default_title, is_base64)
|
|
if self.override_title is None:
|
|
self.title_updated()
|
|
|
|
def icon_changed(self, new_icon: object) -> None:
|
|
pass # TODO: Implement this
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
return get_boss().active_window is self
|
|
|
|
@property
|
|
def has_activity_since_last_focus(self) -> bool:
|
|
return self.screen.has_activity_since_last_focus()
|
|
|
|
def on_activity_since_last_focus(self) -> None:
|
|
if get_options().tab_activity_symbol:
|
|
get_boss().on_activity_since_last_focus(self)
|
|
|
|
def on_bell(self) -> None:
|
|
cb = get_options().command_on_bell
|
|
if cb and cb != ['none']:
|
|
import shlex
|
|
import subprocess
|
|
env = self.child.foreground_environ
|
|
env['KITTY_CHILD_CMDLINE'] = ' '.join(map(shlex.quote, self.child.cmdline))
|
|
subprocess.Popen(cb, env=env, cwd=self.child.foreground_cwd, preexec_fn=clear_handled_signals)
|
|
if not self.is_active:
|
|
changed = not self.needs_attention
|
|
self.needs_attention = True
|
|
tab = self.tabref()
|
|
if tab is not None:
|
|
if changed:
|
|
tab.relayout_borders()
|
|
tab.on_bell(self)
|
|
|
|
def change_titlebar_color(self) -> None:
|
|
opts = get_options()
|
|
val = opts.macos_titlebar_color if is_macos else opts.wayland_titlebar_color
|
|
if val > 0:
|
|
if (val & 0xff) == 1:
|
|
val = self.screen.color_profile.default_bg
|
|
else:
|
|
val = val >> 8
|
|
set_titlebar_color(self.os_window_id, val)
|
|
else:
|
|
set_titlebar_color(self.os_window_id, 0, True, -val)
|
|
|
|
def change_colors(self, changes: Dict[DynamicColor, Optional[str]]) -> None:
|
|
dirtied = default_bg_changed = False
|
|
|
|
def item(raw: Optional[str]) -> int:
|
|
if raw is None:
|
|
return 0
|
|
v = to_color(raw)
|
|
if v is None:
|
|
return 0
|
|
return 0xff000000 | int(v)
|
|
|
|
for which, val_ in changes.items():
|
|
val = item(val_)
|
|
dirtied = True
|
|
setattr(self.screen.color_profile, which.name, val)
|
|
if which.name == 'default_bg':
|
|
default_bg_changed = True
|
|
if dirtied:
|
|
self.screen.mark_as_dirty()
|
|
if default_bg_changed:
|
|
get_boss().default_bg_changed_for(self.id)
|
|
|
|
def color_profile_popped(self, bg_changed: bool) -> None:
|
|
if bg_changed:
|
|
get_boss().default_bg_changed_for(self.id)
|
|
|
|
def report_color(self, code: str, r: int, g: int, b: int) -> None:
|
|
r |= r << 8
|
|
g |= g << 8
|
|
b |= b << 8
|
|
self.screen.send_escape_code_to_child(OSC, f'{code};rgb:{r:04x}/{g:04x}/{b:04x}')
|
|
|
|
def report_notification_activated(self, identifier: str) -> None:
|
|
self.screen.send_escape_code_to_child(OSC, f'99;i={identifier};')
|
|
|
|
def set_dynamic_color(self, code: int, value: Union[str, bytes]) -> None:
|
|
if isinstance(value, bytes):
|
|
value = value.decode('utf-8')
|
|
color_changes: Dict[DynamicColor, Optional[str]] = {}
|
|
for val in value.split(';'):
|
|
w = DYNAMIC_COLOR_CODES.get(code)
|
|
if w is not None:
|
|
if val == '?':
|
|
col = getattr(self.screen.color_profile, w.name)
|
|
self.report_color(str(code), col >> 16, (col >> 8) & 0xff, col & 0xff)
|
|
else:
|
|
q = None if code >= 100 else val
|
|
color_changes[w] = q
|
|
code += 1
|
|
if color_changes:
|
|
self.change_colors(color_changes)
|
|
|
|
def set_color_table_color(self, code: int, value: str) -> None:
|
|
cp = self.screen.color_profile
|
|
if code == 4:
|
|
changed = False
|
|
for c, val in parse_color_set(value):
|
|
if val is None: # color query
|
|
qc = self.screen.color_profile.as_color((c << 8) | 1)
|
|
assert qc is not None
|
|
self.report_color(f'4;{c}', qc.red, qc.green, qc.blue)
|
|
else:
|
|
changed = True
|
|
cp.set_color(c, val)
|
|
if changed:
|
|
self.refresh()
|
|
elif code == 104:
|
|
if not value.strip():
|
|
cp.reset_color_table()
|
|
else:
|
|
for x in value.split(';'):
|
|
try:
|
|
y = int(x)
|
|
except Exception:
|
|
continue
|
|
if 0 <= y <= 255:
|
|
cp.reset_color(y)
|
|
self.refresh()
|
|
|
|
def request_capabilities(self, q: str) -> None:
|
|
for result in get_capabilities(q, get_options()):
|
|
self.screen.send_escape_code_to_child(DCS, result)
|
|
|
|
def handle_remote_cmd(self, cmd: str) -> None:
|
|
get_boss().handle_remote_cmd(cmd, self)
|
|
|
|
def handle_remote_echo(self, msg: str) -> None:
|
|
from base64 import standard_b64decode
|
|
data = standard_b64decode(msg)
|
|
self.write_to_child(data)
|
|
|
|
def handle_remote_ssh(self, msg: str) -> None:
|
|
from kittens.ssh.main import get_ssh_data
|
|
for line in get_ssh_data(msg, f'{os.getpid()}-{self.id}'):
|
|
self.write_to_child(line)
|
|
|
|
def handle_kitten_result(self, msg: str) -> None:
|
|
import base64
|
|
self.kitten_result = json.loads(base64.b85decode(msg))
|
|
for processor in self.kitten_result_processors:
|
|
try:
|
|
processor(self, self.kitten_result)
|
|
except Exception:
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
def add_kitten_result_processor(self, callback: Callable[['Window', Any], None]) -> None:
|
|
self.kitten_result_processors.append(callback)
|
|
|
|
def handle_overlay_ready(self, msg: str) -> None:
|
|
boss = get_boss()
|
|
tab = boss.tab_for_window(self)
|
|
if tab is not None:
|
|
tab.move_window_to_top_of_group(self)
|
|
|
|
def append_remote_data(self, msg: str) -> str:
|
|
if not msg:
|
|
cdata = ''.join(self.current_remote_data)
|
|
self.current_remote_data = []
|
|
return cdata
|
|
num, rest = msg.split(':', 1)
|
|
max_size = get_options().clipboard_max_size * 1024 * 1024
|
|
if num == '0' or sum(map(len, self.current_remote_data)) > max_size:
|
|
self.current_remote_data = []
|
|
self.current_remote_data.append(rest)
|
|
return ''
|
|
|
|
def handle_remote_edit(self, msg: str) -> None:
|
|
cdata = self.append_remote_data(msg)
|
|
if cdata:
|
|
from .launch import remote_edit
|
|
remote_edit(cdata, self)
|
|
|
|
def handle_remote_clone(self, msg: str) -> None:
|
|
cdata = self.append_remote_data(msg)
|
|
if cdata:
|
|
ac = get_options().allow_cloning
|
|
if ac == 'ask':
|
|
get_boss().confirm(_(
|
|
'A program running in this window wants to clone it into another window.'
|
|
' Allow it do so, once?'),
|
|
partial(self.handle_remote_clone_confirmation, cdata), window=self,
|
|
)
|
|
elif ac in ('yes', 'y', 'true'):
|
|
self.handle_remote_clone_confirmation(cdata, True)
|
|
|
|
def handle_remote_clone_confirmation(self, cdata: str, confirmed: bool) -> None:
|
|
if confirmed:
|
|
from .launch import clone_and_launch
|
|
clone_and_launch(cdata, self)
|
|
|
|
def handle_remote_askpass(self, msg: str) -> None:
|
|
from .shm import SharedMemory
|
|
with SharedMemory(name=msg, readonly=True) as shm:
|
|
shm.seek(1)
|
|
data = json.loads(shm.read_data_with_size())
|
|
|
|
def callback(ans: Any) -> None:
|
|
data = json.dumps(ans)
|
|
with SharedMemory(name=msg) as shm:
|
|
shm.seek(1)
|
|
shm.write_data_with_size(data)
|
|
shm.flush()
|
|
shm.seek(0)
|
|
shm.write(b'\x01')
|
|
|
|
message: str = data['message']
|
|
if data['type'] == 'confirm':
|
|
get_boss().confirm(
|
|
message, callback, window=self, confirm_on_cancel=bool(data.get('confirm_on_cancel')),
|
|
confirm_on_accept=bool(data.get('confirm_on_accept', True)))
|
|
elif data['type'] == 'choose':
|
|
get_boss().choose(
|
|
message, callback, *data['choices'], window=self, default=data.get('default', ''))
|
|
elif data['type'] == 'get_line':
|
|
get_boss().get_line(
|
|
message, callback, window=self, is_password=bool(data.get('is_password')), prompt=data.get('prompt', '> '))
|
|
else:
|
|
log_error(f'Ignoring ask request with unknown type: {data["type"]}')
|
|
|
|
def handle_remote_print(self, msg: str) -> None:
|
|
text = process_remote_print(msg)
|
|
print(text, end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
def send_cmd_response(self, response: Any) -> None:
|
|
self.screen.send_escape_code_to_child(DCS, '@kitty-cmd' + json.dumps(response))
|
|
|
|
def file_transmission(self, data: str) -> None:
|
|
self.file_transmission_control.handle_serialized_command(data)
|
|
|
|
def clipboard_control(self, data: str, is_partial: bool = False) -> None:
|
|
where, text = data.partition(';')[::2]
|
|
if is_partial:
|
|
if self.clipboard_pending is None:
|
|
self.clipboard_pending = ClipboardPending(where, text)
|
|
else:
|
|
self.clipboard_pending = self.clipboard_pending._replace(data=self.clipboard_pending[1] + text)
|
|
limit = get_options().clipboard_max_size
|
|
if limit and len(self.clipboard_pending.data) > limit * 1024 * 1024:
|
|
log_error('Discarding part of too large OSC 52 paste request')
|
|
self.clipboard_pending = self.clipboard_pending._replace(data='', truncated=True)
|
|
return
|
|
|
|
if not where:
|
|
if self.clipboard_pending is not None:
|
|
text = self.clipboard_pending.data + text
|
|
where = self.clipboard_pending.where
|
|
try:
|
|
if self.clipboard_pending.truncated:
|
|
return
|
|
finally:
|
|
self.clipboard_pending = None
|
|
else:
|
|
where = 's0'
|
|
cc = get_options().clipboard_control
|
|
if text == '?':
|
|
response = None
|
|
if 's' in where or 'c' in where:
|
|
if 'read-clipboard-ask' in cc:
|
|
return self.ask_to_read_clipboard(False)
|
|
response = get_clipboard_string() if 'read-clipboard' in cc else ''
|
|
loc = 'c'
|
|
elif 'p' in where:
|
|
if 'read-primary-ask' in cc:
|
|
return self.ask_to_read_clipboard(True)
|
|
response = get_primary_selection() if 'read-primary' in cc else ''
|
|
loc = 'p'
|
|
response = response or ''
|
|
self.send_osc52(loc, response or '')
|
|
|
|
else:
|
|
from base64 import standard_b64decode
|
|
try:
|
|
text = standard_b64decode(text).decode('utf-8')
|
|
except Exception:
|
|
text = ''
|
|
|
|
if 's' in where or 'c' in where:
|
|
if 'write-clipboard' in cc:
|
|
set_clipboard_string(text)
|
|
if 'p' in where:
|
|
if 'write-primary' in cc:
|
|
set_primary_selection(text)
|
|
self.clipboard_pending = None
|
|
|
|
def send_osc52(self, loc: str, response: str) -> None:
|
|
from base64 import standard_b64encode
|
|
self.screen.send_escape_code_to_child(OSC, '52;{};{}'.format(
|
|
loc, standard_b64encode(response.encode('utf-8')).decode('ascii')))
|
|
|
|
def ask_to_read_clipboard(self, primary: bool = False) -> None:
|
|
if self.current_clipboard_read_ask is not None:
|
|
self.current_clipboard_read_ask = primary
|
|
return
|
|
self.current_clipboard_read_ask = primary
|
|
get_boss().confirm(_(
|
|
'A program running in this window wants to read from the system clipboard.'
|
|
' Allow it do so, once?'),
|
|
self.handle_clipboard_confirmation, window=self,
|
|
)
|
|
|
|
def handle_clipboard_confirmation(self, confirmed: bool) -> None:
|
|
try:
|
|
loc = 'p' if self.current_clipboard_read_ask else 'c'
|
|
response = ''
|
|
if confirmed:
|
|
response = get_primary_selection() if self.current_clipboard_read_ask else get_clipboard_string()
|
|
self.send_osc52(loc, response)
|
|
finally:
|
|
self.current_clipboard_read_ask = None
|
|
|
|
def manipulate_title_stack(self, pop: bool, title: str, icon: Any) -> None:
|
|
if title:
|
|
if pop:
|
|
if self.title_stack:
|
|
self.child_title = self.title_stack.pop()
|
|
self.title_updated()
|
|
else:
|
|
if self.child_title:
|
|
self.title_stack.append(self.child_title)
|
|
# }}}
|
|
|
|
# mouse actions {{{
|
|
@ac('mouse', '''
|
|
Handle a mouse click
|
|
|
|
Try to perform the specified actions one after the other till one of them is successful.
|
|
Supported actions are::
|
|
|
|
selection - check for a selection and if one exists abort processing
|
|
link - if a link exists under the mouse, click it
|
|
prompt - if the mouse click happens at a shell prompt move the cursor to the mouse location
|
|
|
|
For examples, see :ref:`conf-kitty-mouse.mousemap`
|
|
''')
|
|
def mouse_handle_click(self, *actions: str) -> None:
|
|
for a in actions:
|
|
if a == 'selection':
|
|
if self.screen.has_selection():
|
|
break
|
|
if a == 'link':
|
|
if click_mouse_url(self.os_window_id, self.tab_id, self.id):
|
|
break
|
|
if a == 'prompt':
|
|
# Do not send move cursor events too soon after the window is
|
|
# focused, this is because there are people that click on
|
|
# windows and start typing immediately and the cursor event
|
|
# can interfere with that. See https://github.com/kovidgoyal/kitty/issues/4128
|
|
if monotonic() - self.last_focused_at < 1.5 * get_click_interval():
|
|
return
|
|
if move_cursor_to_mouse_if_in_prompt(self.os_window_id, self.tab_id, self.id):
|
|
self.screen.ignore_bells_for(1)
|
|
break
|
|
|
|
@ac('mouse', 'Click the URL under the mouse')
|
|
def mouse_click_url(self) -> None:
|
|
self.mouse_handle_click('link')
|
|
|
|
@ac('mouse', 'Click the URL under the mouse only if the screen has no selection')
|
|
def mouse_click_url_or_select(self) -> None:
|
|
self.mouse_handle_click('selection', 'link')
|
|
|
|
@ac('mouse', '''
|
|
Manipulate the selection based on the current mouse position
|
|
|
|
For examples, see :ref:`conf-kitty-mouse.mousemap`
|
|
''')
|
|
def mouse_selection(self, code: int) -> None:
|
|
mouse_selection(self.os_window_id, self.tab_id, self.id, code, self.current_mouse_event_button)
|
|
|
|
@ac('mouse', 'Paste the current primary selection')
|
|
def paste_selection(self) -> None:
|
|
txt = get_boss().current_primary_selection()
|
|
if txt:
|
|
self.paste_with_actions(txt)
|
|
|
|
@ac('mouse', 'Paste the current primary selection or the clipboard if no selection is present')
|
|
def paste_selection_or_clipboard(self) -> None:
|
|
txt = get_boss().current_primary_selection_or_clipboard()
|
|
if txt:
|
|
self.paste_with_actions(txt)
|
|
|
|
@ac('mouse', '''
|
|
Select clicked command output
|
|
|
|
Requires :ref:`shell_integration` to work
|
|
''')
|
|
def mouse_select_command_output(self) -> None:
|
|
click_mouse_cmd_output(self.os_window_id, self.tab_id, self.id, True)
|
|
|
|
@ac('mouse', '''
|
|
Show clicked command output in a pager like less
|
|
|
|
Requires :ref:`shell_integration` to work
|
|
''')
|
|
def mouse_show_command_output(self) -> None:
|
|
if click_mouse_cmd_output(self.os_window_id, self.tab_id, self.id, False):
|
|
self.show_cmd_output(CommandOutput.last_visited, 'Clicked command output')
|
|
# }}}
|
|
|
|
def text_for_selection(self, as_ansi: bool = False) -> str:
|
|
sts = get_options().strip_trailing_spaces
|
|
strip_trailing_spaces = sts == 'always' or (sts == 'smart' and not self.screen.is_rectangle_select())
|
|
lines = self.screen.text_for_selection(as_ansi, strip_trailing_spaces)
|
|
return ''.join(lines)
|
|
|
|
def call_watchers(self, which: Iterable[Watcher], data: Dict[str, Any]) -> None:
|
|
boss = get_boss()
|
|
for w in which:
|
|
try:
|
|
w(boss, self, data)
|
|
except Exception:
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
def destroy(self) -> None:
|
|
self.call_watchers(self.watchers.on_close, {})
|
|
self.destroyed = True
|
|
del self.kitten_result_processors
|
|
if hasattr(self, 'screen'):
|
|
if self.is_active and self.os_window_id == current_os_window():
|
|
# Cancel IME composition when window is destroyed
|
|
update_ime_position_for_window(self.id, False, -1)
|
|
# Remove cycles so that screen is de-allocated immediately
|
|
self.screen.reset_callbacks()
|
|
del self.screen
|
|
|
|
def as_text(
|
|
self,
|
|
as_ansi: bool = False,
|
|
add_history: bool = False,
|
|
add_wrap_markers: bool = False,
|
|
alternate_screen: bool = False,
|
|
add_cursor: bool = False
|
|
) -> str:
|
|
return as_text(self.screen, as_ansi, add_history, add_wrap_markers, alternate_screen, add_cursor)
|
|
|
|
def cmd_output(self, which: CommandOutput = CommandOutput.last_run, as_ansi: bool = False, add_wrap_markers: bool = False) -> str:
|
|
return cmd_output(self.screen, which, as_ansi, add_wrap_markers)
|
|
|
|
def get_cwd_of_child(self, oldest: bool = False) -> Optional[str]:
|
|
return self.child.get_foreground_cwd(oldest) or self.child.current_cwd
|
|
|
|
@property
|
|
def cwd_of_child(self) -> Optional[str]:
|
|
return self.get_cwd_of_child()
|
|
|
|
@property
|
|
def child_is_remote(self) -> bool:
|
|
for p in self.child.foreground_processes:
|
|
q = list(p['cmdline'] or ())
|
|
if q and q[0].lower() == 'ssh':
|
|
return True
|
|
return False
|
|
|
|
def ssh_kitten_cmdline(self) -> List[str]:
|
|
from kittens.ssh.utils import is_kitten_cmdline
|
|
for p in self.child.foreground_processes:
|
|
q = list(p['cmdline'] or ())
|
|
if is_kitten_cmdline(q):
|
|
return q
|
|
return []
|
|
|
|
def pipe_data(self, text: str, has_wrap_markers: bool = False) -> PipeData:
|
|
text = text or ''
|
|
if has_wrap_markers:
|
|
text = text.replace('\r\n', '\n').replace('\r', '\n')
|
|
lines = text.count('\n')
|
|
input_line_number = (lines - (self.screen.lines - 1) - self.screen.scrolled_by)
|
|
return {
|
|
'input_line_number': input_line_number,
|
|
'scrolled_by': self.screen.scrolled_by,
|
|
'cursor_x': self.screen.cursor.x + 1,
|
|
'cursor_y': self.screen.cursor.y + 1,
|
|
'lines': self.screen.lines,
|
|
'columns': self.screen.columns,
|
|
'text': text
|
|
}
|
|
|
|
def set_logo(self, path: str, position: str = '', alpha: float = -1) -> None:
|
|
path = resolve_custom_file(path) if path else ''
|
|
set_window_logo(self.os_window_id, self.tab_id, self.id, path, position or '', alpha)
|
|
|
|
def paste_with_actions(self, text: str) -> None:
|
|
if self.destroyed or not text:
|
|
return
|
|
opts = get_options()
|
|
if 'filter' in opts.paste_actions:
|
|
text = load_paste_filter()(text)
|
|
if not text:
|
|
return
|
|
if 'quote-urls-at-prompt' in opts.paste_actions and self.at_prompt:
|
|
prefixes = '|'.join(opts.url_prefixes)
|
|
m = re.match(f'({prefixes}):(.+)', text)
|
|
if m is not None:
|
|
scheme, rest = m.group(1), m.group(2)
|
|
if rest.startswith('//') or scheme in ('mailto', 'irc'):
|
|
import shlex
|
|
text = shlex.quote(text)
|
|
btext = text.encode('utf-8')
|
|
if 'confirm' in opts.paste_actions:
|
|
msg = ''
|
|
limit = 16 * 1024
|
|
if not self.screen.in_bracketed_paste_mode:
|
|
msg = _('Pasting text into shells that do not support bracketed paste can be dangerous.')
|
|
elif len(btext) > limit:
|
|
msg = _('Pasting very large amounts of text ({} bytes) can be slow.').format(len(btext))
|
|
if msg:
|
|
get_boss().confirm(msg + _(' Are you sure?'), partial(self.handle_paste_confirmation, btext), window=self)
|
|
return
|
|
self.paste_text(btext)
|
|
|
|
def handle_paste_confirmation(self, btext: bytes, confirmed: bool) -> None:
|
|
if confirmed:
|
|
self.paste_text(btext)
|
|
|
|
def paste_bytes(self, text: Union[str, bytes]) -> None:
|
|
# paste raw bytes without any processing
|
|
if isinstance(text, str):
|
|
text = text.encode('utf-8')
|
|
self.screen.paste_bytes(text)
|
|
|
|
def paste_text(self, text: Union[str, bytes]) -> None:
|
|
if text and not self.destroyed:
|
|
if isinstance(text, str):
|
|
text = text.encode('utf-8')
|
|
if self.screen.in_bracketed_paste_mode:
|
|
while True:
|
|
new_text = text.replace(b'\033[201~', b'').replace(b'\x9b201~', b'')
|
|
if len(text) == len(new_text):
|
|
break
|
|
text = new_text
|
|
else:
|
|
# Workaround for broken editors like nano that cannot handle
|
|
# newlines in pasted text see https://github.com/kovidgoyal/kitty/issues/994
|
|
text = text.replace(b'\r\n', b'\n').replace(b'\n', b'\r')
|
|
self.screen.paste(text)
|
|
|
|
def clear_screen(self, reset: bool = False, scrollback: bool = False) -> None:
|
|
self.screen.cursor.x = self.screen.cursor.y = 0
|
|
if reset:
|
|
self.screen.reset()
|
|
else:
|
|
self.screen.erase_in_display(3 if scrollback else 2, False)
|
|
|
|
# actions {{{
|
|
|
|
@ac('cp', 'Show scrollback in a pager like less')
|
|
def show_scrollback(self) -> None:
|
|
text = self.as_text(as_ansi=True, add_history=True, add_wrap_markers=True)
|
|
data = self.pipe_data(text, has_wrap_markers=True)
|
|
cursor_on_screen = self.screen.scrolled_by < self.screen.lines - self.screen.cursor.y
|
|
get_boss().display_scrollback(self, data['text'], data['input_line_number'], report_cursor=cursor_on_screen)
|
|
|
|
def show_cmd_output(self, which: CommandOutput, title: str = 'Command output', as_ansi: bool = True, add_wrap_markers: bool = True) -> None:
|
|
text = self.cmd_output(which, as_ansi=as_ansi, add_wrap_markers=add_wrap_markers)
|
|
text = text.replace('\r\n', '\n').replace('\r', '\n')
|
|
get_boss().display_scrollback(self, text, title=title, report_cursor=False)
|
|
|
|
@ac('cp', '''
|
|
Show output from the first shell command on screen in a pager like less
|
|
|
|
Requires :ref:`shell_integration` to work
|
|
''')
|
|
def show_first_command_output_on_screen(self) -> None:
|
|
self.show_cmd_output(CommandOutput.first_on_screen, 'First command output on screen')
|
|
|
|
@ac('cp', '''
|
|
Show output from the last shell command in a pager like less
|
|
|
|
Requires :ref:`shell_integration` to work
|
|
''')
|
|
def show_last_command_output(self) -> None:
|
|
self.show_cmd_output(CommandOutput.last_run, 'Last command output')
|
|
|
|
@ac('cp', '''
|
|
Show the first command output below the last scrolled position via scroll_to_prompt
|
|
or the last mouse clicked command output in a pager like less
|
|
|
|
Requires :ref:`shell_integration` to work
|
|
''')
|
|
def show_last_visited_command_output(self) -> None:
|
|
self.show_cmd_output(CommandOutput.last_visited, 'Last visited command output')
|
|
|
|
@ac('cp', '''
|
|
Show the last non-empty output from a shell command in a pager like less
|
|
|
|
Requires :ref:`shell_integration` to work
|
|
''')
|
|
def show_last_non_empty_command_output(self) -> None:
|
|
self.show_cmd_output(CommandOutput.last_non_empty, 'Last non-empty command output')
|
|
|
|
@ac('cp', 'Paste the specified text into the current window')
|
|
def paste(self, text: str) -> None:
|
|
self.paste_with_actions(text)
|
|
|
|
@ac('cp', 'Copy the selected text from the active window to the clipboard')
|
|
def copy_to_clipboard(self) -> None:
|
|
text = self.text_for_selection()
|
|
if text:
|
|
set_clipboard_string(text)
|
|
|
|
@ac('cp', 'Copy the selected text from the active window to the clipboard with ANSI formatting codes')
|
|
def copy_ansi_to_clipboard(self) -> None:
|
|
text = self.text_for_selection(as_ansi=True)
|
|
if text:
|
|
set_clipboard_string(text)
|
|
|
|
def encoded_key(self, key_event: KeyEvent) -> bytes:
|
|
return encode_key_for_tty(
|
|
key=key_event.key, shifted_key=key_event.shifted_key, alternate_key=key_event.alternate_key,
|
|
mods=key_event.mods, action=key_event.action, text=key_event.text,
|
|
key_encoding_flags=self.screen.current_key_encoding_flags(),
|
|
cursor_key_mode=self.screen.cursor_key_mode,
|
|
).encode('ascii')
|
|
|
|
@ac('cp', 'Copy the selected text from the active window to the clipboard, if no selection, send SIGINT (aka :kbd:`ctrl+c`)')
|
|
def copy_or_interrupt(self) -> None:
|
|
text = self.text_for_selection()
|
|
if text:
|
|
set_clipboard_string(text)
|
|
else:
|
|
self.scroll_end()
|
|
self.write_to_child(self.encoded_key(KeyEvent(key=ord('c'), mods=GLFW_MOD_CONTROL)))
|
|
|
|
@ac('cp', 'Copy the selected text from the active window to the clipboard and clear selection, if no selection, send SIGINT (aka :kbd:`ctrl+c`)')
|
|
def copy_and_clear_or_interrupt(self) -> None:
|
|
self.copy_or_interrupt()
|
|
self.screen.clear_selection()
|
|
|
|
@ac('cp', 'Pass the selected text from the active window to the specified program')
|
|
def pass_selection_to_program(self, *args: str) -> None:
|
|
cwd = self.cwd_of_child
|
|
text = self.text_for_selection()
|
|
if text:
|
|
if args:
|
|
open_cmd(args, text, cwd=cwd)
|
|
else:
|
|
open_url(text, cwd=cwd)
|
|
|
|
@ac('cp', 'Clear the current selection')
|
|
def clear_selection(self) -> None:
|
|
self.screen.clear_selection()
|
|
|
|
@ac('sc', 'Scroll up by one line')
|
|
def scroll_line_up(self) -> None:
|
|
if self.screen.is_main_linebuf():
|
|
self.screen.scroll(SCROLL_LINE, True)
|
|
|
|
@ac('sc', 'Scroll down by one line')
|
|
def scroll_line_down(self) -> None:
|
|
if self.screen.is_main_linebuf():
|
|
self.screen.scroll(SCROLL_LINE, False)
|
|
|
|
@ac('sc', 'Scroll up by one page')
|
|
def scroll_page_up(self) -> None:
|
|
if self.screen.is_main_linebuf():
|
|
self.screen.scroll(SCROLL_PAGE, True)
|
|
|
|
@ac('sc', 'Scroll down by one page')
|
|
def scroll_page_down(self) -> None:
|
|
if self.screen.is_main_linebuf():
|
|
self.screen.scroll(SCROLL_PAGE, False)
|
|
|
|
@ac('sc', 'Scroll to the top of the scrollback buffer')
|
|
def scroll_home(self) -> None:
|
|
if self.screen.is_main_linebuf():
|
|
self.screen.scroll(SCROLL_FULL, True)
|
|
|
|
@ac('sc', 'Scroll to the bottom of the scrollback buffer')
|
|
def scroll_end(self) -> None:
|
|
if self.screen.is_main_linebuf():
|
|
self.screen.scroll(SCROLL_FULL, False)
|
|
|
|
@ac('sc', '''
|
|
Scroll to the previous/next shell command prompt
|
|
Allows easy jumping from one command to the next. Requires working
|
|
:ref:`shell_integration`. Takes a single, optional, number as argument which is
|
|
the number of prompts to jump, negative values jump up and positive values jump down.
|
|
A value of zero will jump to the last prompt visited by this action.
|
|
For example::
|
|
|
|
map ctrl+p scroll_to_prompt -1 # jump to previous
|
|
map ctrl+n scroll_to_prompt 1 # jump to next
|
|
map ctrl+o scroll_to_prompt 0 # jump to last visited
|
|
''')
|
|
def scroll_to_prompt(self, num_of_prompts: int = -1) -> None:
|
|
if self.screen.is_main_linebuf():
|
|
self.screen.scroll_to_prompt(num_of_prompts)
|
|
|
|
@ac('sc', 'Scroll prompt to the top of the screen, filling screen with empty lines')
|
|
def scroll_prompt_to_top(self, clear_scrollback: bool = False) -> None:
|
|
if self.screen.is_main_linebuf():
|
|
self.screen.scroll_until_cursor_prompt()
|
|
if clear_scrollback:
|
|
self.screen.clear_scrollback()
|
|
elif self.screen.scrolled_by > 0:
|
|
self.screen.scroll(SCROLL_FULL, False)
|
|
|
|
@ac('sc', 'Scroll prompt to the bottom of the screen, filling in extra lines from the scrollback buffer')
|
|
def scroll_prompt_to_bottom(self) -> None:
|
|
self.screen.scroll_prompt_to_bottom()
|
|
|
|
@ac('mk', 'Toggle the current marker on/off')
|
|
def toggle_marker(self, ftype: str, spec: Union[str, Tuple[Tuple[int, str], ...]], flags: int) -> None:
|
|
from .marks import marker_from_spec
|
|
key = ftype, spec
|
|
if key == self.current_marker_spec:
|
|
self.remove_marker()
|
|
return
|
|
self.screen.set_marker(marker_from_spec(ftype, spec, flags))
|
|
self.current_marker_spec = key
|
|
|
|
def set_marker(self, spec: Union[str, Sequence[str]]) -> None:
|
|
from .marks import marker_from_spec
|
|
from .options.utils import parse_marker_spec, toggle_marker
|
|
if isinstance(spec, str):
|
|
func, (ftype, spec_, flags) = toggle_marker('toggle_marker', spec)
|
|
else:
|
|
ftype, spec_, flags = parse_marker_spec(spec[0], spec[1:])
|
|
key = ftype, spec_
|
|
self.screen.set_marker(marker_from_spec(ftype, spec_, flags))
|
|
self.current_marker_spec = key
|
|
|
|
@ac('mk', 'Remove a previously created marker')
|
|
def remove_marker(self) -> None:
|
|
if self.current_marker_spec is not None:
|
|
self.screen.set_marker()
|
|
self.current_marker_spec = None
|
|
|
|
@ac('mk', 'Scroll to the next or previous mark of the specified type')
|
|
def scroll_to_mark(self, prev: bool = True, mark: int = 0) -> None:
|
|
self.screen.scroll_to_next_mark(mark, prev)
|
|
|
|
@ac('misc', '''
|
|
Send the specified SIGNAL to the foreground process in the active window
|
|
|
|
For example::
|
|
|
|
map F1 signal_child SIGTERM
|
|
''')
|
|
def signal_child(self, *signals: int) -> None:
|
|
pid = self.child.pid_for_cwd
|
|
if pid is not None:
|
|
for sig in signals:
|
|
os.kill(pid, sig)
|
|
# }}}
|