diff --git a/kittens/dnd/main.go b/kittens/dnd/main.go index 824e8ba2d..85b02d377 100644 --- a/kittens/dnd/main.go +++ b/kittens/dnd/main.go @@ -55,10 +55,15 @@ type drop_dest struct { func run_loop(opts *Options, drop_dests map[string]drop_dest, drag_sources map[string]drag_source, uri_list_buffer *bytes.Buffer) (err error) { allow_drops, allow_drags, drop_accepted := len(drop_dests) > 0, len(drag_sources) > 0, false drop_copy_allowed, drop_move_allowed, drag_started := false, false, false + in_test_mode := false lp, err := loop.New() if err != nil { return err } + send_test_response := func(payload string) { + in_test_mode = true + lp.DebugPrintln(payload) + } render_screen := func() error { lp.StartAtomicUpdate() defer lp.EndAtomicUpdate() @@ -84,7 +89,7 @@ func run_loop(opts *Options, drop_dests map[string]drop_dest, drag_sources map[s // corresponding operation type. The button should consist of the // triple sized text and a frame with rounded corners around the // text drawn using unicode box drawing symbols. - _, _ = drop_copy_allowed, drop_move_allowed + _, _, _ = drop_copy_allowed, drop_move_allowed, in_test_mode } return nil @@ -97,6 +102,7 @@ func run_loop(opts *Options, drop_dests map[string]drop_dest, drag_sources map[s if allow_drags { lp.StartOfferingDrags() } + lp.SetWindowTitle("Drag and drop") return "", render_screen() } lp.OnFinalize = func() string { @@ -146,6 +152,15 @@ func run_loop(opts *Options, drop_dests map[string]drop_dest, drag_sources map[s // corresponding to the drag sources, including the files in the // uri-list and exit. + switch cmd.Type { + case 'T': + switch string(cmd.Payload) { + case "PING": + send_test_response("PONG") + default: + send_test_response("UNKNOWN TEST COMMAND: " + string(cmd.Payload)) + } + } return nil } lp.OnKeyEvent = func(e *loop.KeyEvent) (err error) { diff --git a/kitty/dnd.c b/kitty/dnd.c index d85bb39b1..cf042ab44 100644 --- a/kitty/dnd.c +++ b/kitty/dnd.c @@ -421,6 +421,7 @@ reset_drop(Window *w) { w->drop.is_remote_client = is_remote_client; } } + void drop_register_window(Window *w, const uint8_t *payload, size_t payload_sz, bool on, uint32_t client_id, bool more) { w->drop.wanted = on; @@ -2177,6 +2178,18 @@ dnd_test_drag_notify(PyObject *self UNUSED, PyObject *args) { Py_RETURN_NONE; } +static PyObject* +dnd_test_probe_state(PyObject *self UNUSED, PyObject *args) { + const char *q; unsigned long long window_id; + if (!PyArg_ParseTuple(args, "Ks", &window_id, &q)) return NULL; + Window *w = window_for_window_id((id_type)window_id); + if (!w) { PyErr_SetString(PyExc_ValueError, "Window not found"); return NULL; } + if (strcmp(q, "drop_is_remote_client") == 0) { + return Py_NewRef(w->drop.is_remote_client ? Py_True : Py_False); + } + Py_RETURN_NONE; +} + static PyMethodDef dnd_methods[] = { {"dnd_set_test_write_func", (PyCFunction)py_dnd_set_test_write_func, METH_VARARGS, ""}, METHODB(dnd_test_create_fake_window, METH_NOARGS), @@ -2187,6 +2200,7 @@ static PyMethodDef dnd_methods[] = { METHODB(dnd_test_force_drag_dropped, METH_VARARGS), METHODB(dnd_test_request_drag_data, METH_VARARGS), METHODB(dnd_test_drag_notify, METH_VARARGS), + METHODB(dnd_test_probe_state, METH_VARARGS), {NULL, NULL, 0, NULL} }; diff --git a/kitty/state.c b/kitty/state.c index 478173ca6..a610834b6 100644 --- a/kitty/state.c +++ b/kitty/state.c @@ -749,7 +749,7 @@ owners_for_window_id(id_type window_id, OSWindow **os_window, Tab **tab) { bool make_window_context_current(id_type window_id) { OSWindow *os_window; - if (owners_for_window_id(window_id, &os_window, NULL)) { + if (owners_for_window_id(window_id, &os_window, NULL) && os_window->handle) { make_os_window_context_current(os_window); return true; } diff --git a/kitty_tests/__init__.py b/kitty_tests/__init__.py index f93cca340..e3b0daeac 100644 --- a/kitty_tests/__init__.py +++ b/kitty_tests/__init__.py @@ -336,7 +336,7 @@ class PTY: def __init__( self, argv=None, rows=25, columns=80, scrollback=100, cell_width=10, cell_height=20, - cwd=None, env=None, stdin_fd=None, stdout_fd=None, needs_da1=True, + cwd=None, env=None, stdin_fd=None, stdout_fd=None, needs_da1=True, window_id=0, ): self.is_child = False if isinstance(argv, str): @@ -377,7 +377,7 @@ class PTY: self.set_window_size(rows=rows, columns=columns) self.needs_da1 = needs_da1 self.callbacks = Callbacks(self) - self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, 0, self.callbacks) + self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, window_id, self.callbacks) self.received_bytes = b'' def reset_termios_state(self): @@ -393,6 +393,12 @@ class PTY: s = termios.tcgetattr(self.master_fd) return True if s[3] & termios.ECHO else False + def __enter__(self): + return self + + def __exit__(self, *a): + self.__del__() + def __del__(self): if not self.is_child: if hasattr(self, 'master_fd'): diff --git a/kitty_tests/dnd_kitten.py b/kitty_tests/dnd_kitten.py new file mode 100644 index 000000000..186dde7ff --- /dev/null +++ b/kitty_tests/dnd_kitten.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +# License: GPLv3 Copyright: 2026, Kovid Goyal + +import tempfile +from base64 import standard_b64encode + +from kitty.constants import kitten_exe +from kitty.fast_data_types import DND_CODE, dnd_set_test_write_func, dnd_test_cleanup_fake_window, dnd_test_create_fake_window, dnd_test_probe_state + +from . import PTY, BaseTest +from .dnd import WriteCapture + + +class Capture(WriteCapture): + + def __call__(self, window_id: int, data: bytes) -> None: + self.pty.write(data) + + +class TestDnDKitten(BaseTest): + + def setUp(self): + capture = Capture() + dnd_set_test_write_func(capture) + os_window_id, window_id = dnd_test_create_fake_window() + capture.window_id = window_id + capture.os_window_id = os_window_id + self.capture = capture + self.test_dir = self.enterContext(tempfile.TemporaryDirectory()) + self.messages_from_kitten = '' + + def send_dnd_command_to_kitten(self, payload=b'', as_base64=False, flush=False, **metadata): + header = f'\x1b]{DND_CODE};' + for k, v in metadata.items(): + header = header + f'{k}={v}:' + self.pty.write_to_child(header.encode()) + if not payload: + self.pty.write_to_child(b'\x1b\\', flush=flush) + return + if isinstance(payload, str): + payload = payload.encode() + payload = memoryview(standard_b64encode(payload) if as_base64 else payload) + for i in range(0, len(payload), 4096): + end = i + 4096 + is_last = end >= len(payload) + chunk = payload[i:min(i+4096, len(payload))] + if i == 0: + self.pty.write_to_child(f'm={0 if is_last else 1};'.encode()) + else: + self.pty.write_to_child(f'\x1b]{DND_CODE};m={0 if is_last else 1};'.encode()) + self.pty.write_to_child(chunk) + self.pty.write_to_child(b'\x1b\\', flush=is_last and flush) + + def finish_setup(self, cmd=None): + cmd = cmd or [kitten_exe(), 'dnd'] + self.pty = self.enterContext(PTY(argv=cmd, rows=25, columns=80, window_id=self.capture.window_id)) + self.pty.callbacks.printbuf = self + self.screen = self.pty.screen + self.pty.wait_till(lambda: bool(self.pty.callbacks.titlebuf)) + self.assertFalse(self.probe_state('drop_is_remote_client')) + + def append(self, text): + self.messages_from_kitten += text + + def wait_for_responses(self, *responses, timeout=10): + q = '\n'.join(responses) + def wait_till(): + return q == self.messages_from_kitten.strip() + self.pty.wait_till(wait_till, timeout, lambda: f'Responses so far: {self.messages_from_kitten!r}') + self.messages_from_kitten = '' + + def probe_state(self, which: str): + return dnd_test_probe_state(self.capture.window_id, which) + + def tearDown(self): + dnd_set_test_write_func(None) + dnd_test_cleanup_fake_window(self.capture.os_window_id) + del self.capture + del self.screen + del self.pty + + def test_dnd_kitten_drop(self): + self.finish_setup()