mirror of
https://github.com/kovidgoyal/kitty
synced 2026-06-06 01:05:48 +02:00
DRYer
This commit is contained in:
@@ -74,6 +74,29 @@ class TermManager:
|
||||
del self.tty_fd, self.original_termios
|
||||
|
||||
|
||||
class SignalManager:
|
||||
|
||||
def __init__(self, on_sigwinch, on_sigterm, on_sigint):
|
||||
self.on_sigwinch = on_sigwinch
|
||||
self.on_sigterm = on_sigterm
|
||||
self.on_sigint = on_sigint
|
||||
|
||||
def __enter__(self):
|
||||
for x in ('winch', 'term', 'int'):
|
||||
attr = 'on_sig' + x
|
||||
handler = getattr(self, attr)
|
||||
old_handler = signal.signal(getattr(signal, 'SIG' + x.upper()), handler)
|
||||
setattr(self, attr, old_handler)
|
||||
|
||||
def __exit__(self, *a):
|
||||
for x in ('winch', 'term', 'int'):
|
||||
attr = 'on_sig' + x
|
||||
val = getattr(self, attr)
|
||||
if val is None:
|
||||
val = signal.SIG_DFL
|
||||
signal.signal(getattr(signal, 'SIG' + x.upper()), val)
|
||||
|
||||
|
||||
LEFT, MIDDLE, RIGHT, FOURTH, FIFTH = 1, 2, 4, 8, 16
|
||||
DRAG = REPEAT
|
||||
MouseEvent = namedtuple('MouseEvent', 'x y type buttons mods')
|
||||
@@ -344,63 +367,10 @@ class Loop:
|
||||
events |= selectors.EVENT_WRITE
|
||||
self.sel.modify(tty_fd, events)
|
||||
|
||||
def loop(self, handler):
|
||||
select = self.sel.select
|
||||
tb = None
|
||||
waiting_for_write = True
|
||||
with closing(self.sel), TermManager() as term_manager:
|
||||
self.sel.register(term_manager.tty_fd, selectors.EVENT_READ | selectors.EVENT_WRITE)
|
||||
self._get_screen_size = screen_size_function(term_manager.tty_fd)
|
||||
signal.signal(signal.SIGWINCH, self._on_sigwinch)
|
||||
signal.signal(signal.SIGTERM, self._on_sigterm)
|
||||
signal.signal(signal.SIGINT, self._on_sigint)
|
||||
handler.write_buf = []
|
||||
handler._term_manager = term_manager
|
||||
image_manager = None
|
||||
if handler.image_manager_class is not None:
|
||||
image_manager = handler.image_manager_class(handler)
|
||||
keep_going = True
|
||||
read_ready, write_ready, wakeup_ready = self._read_ready, self._write_ready, self._wakeup_ready
|
||||
tty_fd = term_manager.tty_fd
|
||||
try:
|
||||
handler._initialize(self._get_screen_size(), self.quit, self.wakeup, self.start_job, debug, image_manager)
|
||||
with handler:
|
||||
while keep_going:
|
||||
has_data_to_write = bool(handler.write_buf)
|
||||
if not has_data_to_write and not self.read_allowed:
|
||||
break
|
||||
if has_data_to_write != waiting_for_write:
|
||||
waiting_for_write = has_data_to_write
|
||||
self._modify_output_selector(tty_fd, waiting_for_write)
|
||||
for key, mask in select():
|
||||
fd = key.fd
|
||||
if fd == tty_fd:
|
||||
if mask & selectors.EVENT_READ:
|
||||
read_ready(handler, fd)
|
||||
if mask & selectors.EVENT_WRITE:
|
||||
write_ready(handler, fd)
|
||||
else:
|
||||
wakeup_ready(handler, fd)
|
||||
except Exception:
|
||||
import traceback
|
||||
tb = traceback.format_exc()
|
||||
self.return_code = 1
|
||||
keep_going = False
|
||||
|
||||
term_manager.extra_finalize = b''.join(handler.write_buf).decode('utf-8')
|
||||
|
||||
if tb is not None:
|
||||
self._report_error_loop(tb, term_manager)
|
||||
|
||||
def _report_error_loop(self, tb, term_manager):
|
||||
select = self.sel.select
|
||||
waiting_for_write = False
|
||||
handler = UnhandledException(tb)
|
||||
handler.write_buf = []
|
||||
handler._term_manager = term_manager
|
||||
handler._initialize(self._get_screen_size(), self.quit, self.wakeup, self.start_job, debug)
|
||||
tty_fd = term_manager.tty_fd
|
||||
def loop_impl(self, handler, tty_fd, image_manager=None, waiting_for_write=True):
|
||||
read_ready, write_ready, wakeup_ready = self._read_ready, self._write_ready, self._wakeup_ready
|
||||
select = self.sel.select
|
||||
handler._initialize(self._get_screen_size(), self.quit, self.wakeup, self.start_job, debug, image_manager)
|
||||
with handler:
|
||||
while True:
|
||||
has_data_to_write = bool(handler.write_buf)
|
||||
@@ -408,7 +378,7 @@ class Loop:
|
||||
break
|
||||
if has_data_to_write != waiting_for_write:
|
||||
waiting_for_write = has_data_to_write
|
||||
self._modify_output_selector(term_manager.tty_fd, waiting_for_write)
|
||||
self._modify_output_selector(tty_fd, waiting_for_write)
|
||||
for key, mask in select():
|
||||
fd = key.fd
|
||||
if fd == tty_fd:
|
||||
@@ -418,3 +388,31 @@ class Loop:
|
||||
write_ready(handler, fd)
|
||||
else:
|
||||
wakeup_ready(handler, fd)
|
||||
|
||||
def loop(self, handler):
|
||||
tb = None
|
||||
signal_manager = SignalManager(self._on_sigwinch, self._on_sigterm, self._on_sigint)
|
||||
with closing(self.sel), TermManager() as term_manager, signal_manager:
|
||||
self.sel.register(term_manager.tty_fd, selectors.EVENT_READ | selectors.EVENT_WRITE)
|
||||
self._get_screen_size = screen_size_function(term_manager.tty_fd)
|
||||
handler.write_buf = []
|
||||
handler._term_manager = term_manager
|
||||
image_manager = None
|
||||
if handler.image_manager_class is not None:
|
||||
image_manager = handler.image_manager_class(handler)
|
||||
try:
|
||||
self.loop_impl(handler, term_manager.tty_fd, image_manager)
|
||||
except Exception:
|
||||
import traceback
|
||||
tb = traceback.format_exc()
|
||||
self.return_code = 1
|
||||
|
||||
term_manager.extra_finalize = b''.join(handler.write_buf).decode('utf-8')
|
||||
if tb is not None:
|
||||
self._report_error_loop(tb, term_manager)
|
||||
|
||||
def _report_error_loop(self, tb, term_manager):
|
||||
handler = UnhandledException(tb)
|
||||
handler.write_buf = []
|
||||
handler._term_manager = term_manager
|
||||
self.loop_impl(handler, term_manager.tty_fd, waiting_for_write=False)
|
||||
|
||||
Reference in New Issue
Block a user