diff --git a/docs/changelog.rst b/docs/changelog.rst index e6d7d95cb..6ae2dc015 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -170,6 +170,8 @@ Detailed list of changes - A new option :opt:`palette_generate` to automatically generate the 256 color palette from the first 16 colors (:pull:`9426`) +- Add a testing framework for the :doc:`drag-and-drop protocol ` and tests covering the full drop flow, error handling, data integrity, MIME type negotiation, chunked transfer, and malformed-command handling + - For builtin key mappings automatically :ref:`fallback ` to matching the US-PC layout key when the pressed key has no matches and is a non-English character (:pull:`9671`) - Allow drag and drop of windows to re-arrange them, move them to another diff --git a/kitty/dnd.c b/kitty/dnd.c index 0d459db16..5907ae6a6 100644 --- a/kitty/dnd.c +++ b/kitty/dnd.c @@ -10,6 +10,16 @@ #include "control-codes.h" #include "iqsort.h" +// In test mode, this callable is invoked instead of schedule_write_to_child_if_possible. +// It receives (window_id: int, data: bytes) and its return value is ignored. +static PyObject *g_dnd_test_write_func = NULL; + +void +dnd_set_test_write_func(PyObject *func) { + Py_XDECREF(g_dnd_test_write_func); + g_dnd_test_write_func = Py_XNewRef(func); +} + static void drop_free_offered_mimes(Window *w) { if (w->drop.offerred_mimes) { @@ -100,6 +110,16 @@ string_arrays_cmp(const char **a, size_t an, const char **b, size_t bn) { return 0; } +static bool +test_write_chunk(id_type id, const char *buf, size_t sz) { + // In test mode, deliver the chunk to the registered Python callable. + // Returns true when the test interceptor consumed the data (no real write needed). + if (!g_dnd_test_write_func) return false; + RAII_PyObject(ret, PyObject_CallFunction(g_dnd_test_write_func, "Ky#", (unsigned long long)id, buf, (Py_ssize_t)sz)); + if (!ret) PyErr_Print(); + return true; +} + static size_t send_payload_to_child(id_type id, uint32_t client_id, const char *header, size_t header_sz, const char *data, const size_t data_sz, bool as_base64) { size_t offset = 0; @@ -108,9 +128,11 @@ send_payload_to_child(id_type id, uint32_t client_id, const char *header, size_t if (client_id) header_sz += snprintf(buf + header_sz, sizeof(buf) - header_sz, ":i=%u", (unsigned)client_id); if (!data_sz) { buf[header_sz++] = 0x1b; buf[header_sz++] = '\\'; - bool found, too_much_data; - schedule_write_to_child_if_possible(id, buf, header_sz, &found, &too_much_data); - if (too_much_data) return 0; + if (!test_write_chunk(id, buf, header_sz)) { + bool found, too_much_data; + schedule_write_to_child_if_possible(id, buf, header_sz, &found, &too_much_data); + if (too_much_data) return 0; + } return 1; } buf[header_sz++] = ':'; buf[header_sz++] = 'm'; buf[header_sz++] = '='; @@ -130,10 +152,12 @@ send_payload_to_child(id_type id, uint32_t client_id, const char *header, size_t p += chunk; } buf[p++] = 0x1b; buf[p++] = '\\'; - bool found, too_much_data; - schedule_write_to_child_if_possible(id, buf, p, &found, &too_much_data); - if (too_much_data) break; - if (!found) return data_sz; + if (!test_write_chunk(id, buf, p)) { + bool found, too_much_data; + schedule_write_to_child_if_possible(id, buf, p, &found, &too_much_data); + if (too_much_data) break; + if (!found) return data_sz; + } offset += chunk; } return offset; diff --git a/kitty/dnd.h b/kitty/dnd.h index 53355e0e1..e79852748 100644 --- a/kitty/dnd.h +++ b/kitty/dnd.h @@ -17,3 +17,4 @@ void drop_set_status(Window *w, int operation, const char *payload, size_t paylo size_t drop_update_mimes(Window *w, const char **allowed_mimes, size_t allowed_mimes_count); void drop_dispatch_data(Window *w, const char *mime_type, const char *data, ssize_t sz); void drop_finish(Window *w); +void dnd_set_test_write_func(PyObject *func); diff --git a/kitty/glfw.c b/kitty/glfw.c index 9b3062e03..ce9b35f9f 100644 --- a/kitty/glfw.c +++ b/kitty/glfw.c @@ -876,6 +876,162 @@ request_drop_status_update(OSWindow *osw) { } } +// DnD testing infrastructure {{{ + +static PyObject * +py_dnd_set_test_write_func(PyObject *self UNUSED, PyObject *func) { + // Pass None to clear the interceptor and restore normal operation. + dnd_set_test_write_func(func == Py_None ? NULL : func); + Py_RETURN_NONE; +} + +static void +destroy_fake_window_contents(Window *w) { + // Free window resources without touching GPU objects (none allocated for fake windows). + drop_free_data(w); + free(w->pending_clicks.clicks); zero_at_ptr(&w->pending_clicks); + free(w->buffered_keys.key_data); zero_at_ptr(&w->buffered_keys); + Py_CLEAR(w->render_data.screen); + Py_CLEAR(w->title); + Py_CLEAR(w->title_bar_data.last_drawn_title_object_id); + free(w->title_bar_data.buf); w->title_bar_data.buf = NULL; + Py_CLEAR(w->url_target_bar_data.last_drawn_title_object_id); + free(w->url_target_bar_data.buf); w->url_target_bar_data.buf = NULL; + // render_data.vao_idx is -1 so release_gpu_resources_for_window is safe, but we skip it + // since we never allocated those resources. +} + +static PyObject * +dnd_test_create_fake_window(PyObject *self UNUSED, PyObject *args UNUSED) { + // Create a minimal OS window + tab + window without any OpenGL/GPU resources. + // Returns (os_window_id, window_id). + ensure_space_for(&global_state, os_windows, OSWindow, global_state.num_os_windows + 1, capacity, 1, true); + OSWindow *osw = global_state.os_windows + global_state.num_os_windows++; + zero_at_ptr(osw); + osw->id = ++global_state.os_window_id_counter; + osw->tab_bar_render_data.vao_idx = -1; + osw->background_opacity.alpha = OPT(background_opacity); + osw->created_at = monotonic(); + // osw->handle intentionally left NULL - no real GLFW window + + ensure_space_for(osw, tabs, Tab, 1, capacity, 1, true); + Tab *tab = &osw->tabs[0]; + zero_at_ptr(tab); + tab->id = ++global_state.tab_id_counter; + tab->border_rects.vao_idx = -1; + osw->num_tabs = 1; + osw->active_tab = 0; + + ensure_space_for(tab, windows, Window, 1, capacity, 1, true); + Window *w = &tab->windows[0]; + zero_at_ptr(w); + w->id = ++global_state.window_id_counter; + w->visible = true; + w->render_data.vao_idx = -1; + w->window_title_render_data.vao_idx = -1; + w->drop.wanted = true; + tab->num_windows = 1; + tab->active_window = 0; + + global_state.mouse_hover_in_window = w->id; + return Py_BuildValue("KK", (unsigned long long)osw->id, (unsigned long long)w->id); +} + +static PyObject * +dnd_test_cleanup_fake_window(PyObject *self UNUSED, PyObject *args) { + unsigned long long os_window_id; + if (!PyArg_ParseTuple(args, "K", &os_window_id)) return NULL; + for (size_t i = 0; i < global_state.num_os_windows; i++) { + if (global_state.os_windows[i].id == (id_type)os_window_id) { + OSWindow *osw = global_state.os_windows + i; + for (size_t t = 0; t < osw->num_tabs; t++) { + Tab *tab = osw->tabs + t; + for (size_t j = 0; j < tab->num_windows; j++) { + Window *win = tab->windows + j; + if (global_state.mouse_hover_in_window == win->id) + global_state.mouse_hover_in_window = 0; + destroy_fake_window_contents(win); + } + free(tab->border_rects.rect_buf); tab->border_rects.rect_buf = NULL; + free(tab->windows); tab->windows = NULL; + } + Py_CLEAR(osw->window_title); + Py_CLEAR(osw->tab_bar_render_data.screen); + free(osw->tabs); osw->tabs = NULL; + remove_i_from_array(global_state.os_windows, i, global_state.num_os_windows); + break; + } + } + Py_RETURN_NONE; +} + +static PyObject * +dnd_test_set_mouse_pos(PyObject *self UNUSED, PyObject *args) { + unsigned long long window_id; + int cell_x, cell_y, pixel_x, pixel_y; + if (!PyArg_ParseTuple(args, "Kiiii", &window_id, &cell_x, &cell_y, &pixel_x, &pixel_y)) return NULL; + Window *w = window_for_window_id((id_type)window_id); + if (!w) { PyErr_SetString(PyExc_ValueError, "Window not found"); return NULL; } + w->mouse_pos.cell_x = (unsigned int)cell_x; + w->mouse_pos.cell_y = (unsigned int)cell_y; + w->mouse_pos.global_x = pixel_x; + w->mouse_pos.global_y = pixel_y; + Py_RETURN_NONE; +} + +static PyObject * +dnd_test_fake_drop_event(PyObject *self UNUSED, PyObject *args) { + // Simulate a drop enter/move/drop event. mimes_seq must be a sequence of str, or + // None to simulate a leave event. + unsigned long long window_id; + int is_drop; + PyObject *mimes_seq; + if (!PyArg_ParseTuple(args, "KpO", &window_id, &is_drop, &mimes_seq)) return NULL; + Window *w = window_for_window_id((id_type)window_id); + if (!w) { PyErr_SetString(PyExc_ValueError, "Window not found"); return NULL; } + if (mimes_seq == Py_None) { + drop_left_child(w); + Py_RETURN_NONE; + } + RAII_PyObject(fast_seq, PySequence_Fast(mimes_seq, "mimes must be a sequence")); + if (!fast_seq) return NULL; + Py_ssize_t num_mimes = PySequence_Fast_GET_SIZE(fast_seq); + RAII_ALLOC(const char*, mimes, malloc(sizeof(const char*) * (num_mimes ? num_mimes : 1))); + if (!mimes) return PyErr_NoMemory(); + for (Py_ssize_t i = 0; i < num_mimes; i++) { + mimes[i] = PyUnicode_AsUTF8(PySequence_Fast_GET_ITEM(fast_seq, i)); + if (!mimes[i]) return NULL; + } + drop_move_on_child(w, mimes, (size_t)num_mimes, is_drop ? true : false); + Py_RETURN_NONE; +} + +static PyObject * +dnd_test_fake_drop_data(PyObject *self UNUSED, PyObject *args) { + // Simulate OS delivering drop data for the given MIME type. + // If error_code > 0, simulate an error (e.g. ENOENT=2, EIO=5, EPERM=1). + // Otherwise deliver data and the mandatory end-of-data signal. + unsigned long long window_id; + const char *mime; + RAII_PY_BUFFER(data); + int error_code = 0; + if (!PyArg_ParseTuple(args, "Ksy*|i", &window_id, &mime, &data, &error_code)) return NULL; + Window *w = window_for_window_id((id_type)window_id); + if (!w) { PyErr_SetString(PyExc_ValueError, "Window not found"); return NULL; } + if (error_code > 0) { + drop_dispatch_data(w, mime, NULL, -(ssize_t)error_code); + } else if (data.len > 0) { + drop_dispatch_data(w, mime, (const char*)data.buf, (ssize_t)data.len); + drop_dispatch_data(w, mime, NULL, 0); // mandatory end-of-data signal + } else { + // Empty data: just the end-of-data signal (sz=0 is the sentinel for "no more data"). + drop_dispatch_data(w, mime, NULL, 0); + } + Py_RETURN_NONE; +} + +// }}} + static void application_close_requested_callback(int flags) { if (flags) { @@ -3065,6 +3221,12 @@ static PyMethodDef module_methods[] = { {"glfw_get_monitor_workarea", (PyCFunction)get_monitor_workarea, METH_NOARGS, ""}, {"glfw_get_monitor_names", (PyCFunction)get_monitor_names, METH_NOARGS, ""}, {"glfw_primary_monitor_content_scale", (PyCFunction)primary_monitor_content_scale, METH_NOARGS, ""}, + {"dnd_set_test_write_func", (PyCFunction)py_dnd_set_test_write_func, METH_O, ""}, + METHODB(dnd_test_create_fake_window, METH_NOARGS), + METHODB(dnd_test_cleanup_fake_window, METH_VARARGS), + METHODB(dnd_test_set_mouse_pos, METH_VARARGS), + METHODB(dnd_test_fake_drop_event, METH_VARARGS), + METHODB(dnd_test_fake_drop_data, METH_VARARGS), {NULL, NULL, 0, NULL} /* Sentinel */ }; diff --git a/kitty_tests/dnd.py b/kitty_tests/dnd.py new file mode 100644 index 000000000..ffa7a57d1 --- /dev/null +++ b/kitty_tests/dnd.py @@ -0,0 +1,479 @@ +#!/usr/bin/env python +# License: GPL v3 Copyright: 2026, Kovid Goyal + +import errno +import re +from base64 import standard_b64decode, standard_b64encode +from contextlib import contextmanager + +from kitty.fast_data_types import ( + DND_CODE, + Screen, + dnd_set_test_write_func, + dnd_test_cleanup_fake_window, + dnd_test_create_fake_window, + dnd_test_fake_drop_data, + dnd_test_fake_drop_event, + dnd_test_set_mouse_pos, +) + +from . import BaseTest, parse_bytes + + +# ---- helpers ---------------------------------------------------------------- + +def _osc(payload: str) -> bytes: + """Wrap *payload* in an OSC escape sequence (OSC payload ST).""" + return f'\x1b]{payload}\x1b\\'.encode() + + +def client_register(mimes: str = '', client_id: int = 0) -> bytes: + """Escape code a client sends to start accepting drops (t=a).""" + meta = f'{DND_CODE};t=a' + if client_id: + meta += f':i={client_id}' + return _osc(f'{meta};{mimes}') + + +def client_unregister(client_id: int = 0) -> bytes: + """Escape code a client sends to stop accepting drops (t=A).""" + meta = f'{DND_CODE};t=A' + if client_id: + meta += f':i={client_id}' + return _osc(meta) + + +def client_accept(operation: int, mimes: str = '', client_id: int = 0) -> bytes: + """Escape code a client sends to signal acceptance of the current drop (t=m:o=…).""" + meta = f'{DND_CODE};t=m:o={operation}' + if client_id: + meta += f':i={client_id}' + return _osc(f'{meta};{mimes}') + + +def client_request_data(mime: str = '', client_id: int = 0) -> bytes: + """Escape code a client sends to request data (t=r) or finish the drop (t=r with no MIME).""" + meta = f'{DND_CODE};t=r' + if client_id: + meta += f':i={client_id}' + return _osc(f'{meta};{mime}') + + +# ---- escape-code decoder used by assertions --------------------------------- + +_OSC_RE = re.compile( + rb'\x1b\]' + re.escape(str(DND_CODE).encode()) + rb';([^;\x1b]*?)(?:;([^\x1b]*))?\x1b\\', +) + + +def _decode_meta(raw: bytes) -> dict: + """Parse the colon-separated metadata portion of a DnD escape code.""" + ans: dict = {} + for kv in raw.split(b':'): + if b'=' in kv: + k, _, v = kv.partition(b'=') + ans[k.decode()] = v.decode() + elif kv: + ans[kv.decode()] = '' + return ans + + +def parse_escape_codes(data: bytes) -> list[dict]: + """Decode all DnD escape codes present in *data*. + + Each returned dict has keys: + * ``type`` – the 't' value (single character string) + * ``meta`` – full parsed metadata dict (from the first chunk) + * ``payload`` – concatenated raw payload bytes from all chunks + * ``chunks`` – list of individual raw chunk payloads (bytes) + Chunked sequences (m=1 … m=0) are assembled into a single entry. + """ + results: list[dict] = [] + pending: dict | None = None + + for m in _OSC_RE.finditer(data): + meta_raw = m.group(1) + payload_raw: bytes = m.group(2) if m.group(2) is not None else b'' + meta = _decode_meta(meta_raw) + more = meta.get('m', '0') == '1' + t = meta.get('t', 'a') + + if pending is None: + pending = {'type': t, 'meta': meta, 'chunks': [], 'payload': b''} + + pending['chunks'].append(payload_raw) + pending['payload'] += payload_raw + + if not more: + results.append(pending) + pending = None + + if pending is not None: + results.append(pending) + return results + + +def parse_escape_codes_b64(data: bytes) -> list[dict]: + """Like *parse_escape_codes* but base64-decodes each chunk's payload.""" + result = parse_escape_codes(data) + for entry in result: + decoded_chunks = [] + full = b'' + for chunk in entry['chunks']: + dec = standard_b64decode(chunk + b'==') if chunk else b'' + decoded_chunks.append(dec) + full += dec + entry['chunks'] = decoded_chunks + entry['payload'] = full + return result + + +# ---- test context manager --------------------------------------------------- + +class _WriteCapture: + """Accumulates bytes delivered by the DnD write interceptor.""" + + def __init__(self) -> None: + self._buf: dict[int, bytearray] = {} + + def __call__(self, window_id: int, data: bytes) -> None: + self._buf.setdefault(window_id, bytearray()) + self._buf[window_id] += data + + def consume(self, window_id: int) -> bytes: + """Return and clear all buffered data for *window_id*.""" + buf = self._buf.pop(window_id, bytearray()) + return bytes(buf) + + def peek(self, window_id: int) -> bytes: + return bytes(self._buf.get(window_id, bytearray())) + + +@contextmanager +def dnd_test_window(): + """Context manager that creates a fake window + write-capture harness. + + Yields (os_window_id, window_id, screen, capture) where: + * ``os_window_id`` – OS-level window ID + * ``window_id`` – kitty window ID (pass to the fake-event helpers) + * ``screen`` – Screen object whose window_id matches the fake window + * ``capture`` – _WriteCapture accumulating bytes sent to the child + """ + from kitty.fast_data_types import get_options + from kitty.options.types import defaults + capture = _WriteCapture() + dnd_set_test_write_func(capture) + os_window_id, window_id = dnd_test_create_fake_window() + try: + screen = Screen(None, 24, 80, 0, 0, 0, window_id) + yield os_window_id, window_id, screen, capture + finally: + dnd_set_test_write_func(None) + dnd_test_cleanup_fake_window(os_window_id) + + +# ---- test class ------------------------------------------------------------- + +class TestDnDProtocol(BaseTest): + + def _assert_no_output(self, capture: _WriteCapture, window_id: int) -> None: + self.ae(capture.peek(window_id), b'', 'unexpected output to child') + + def _get_events(self, capture: _WriteCapture, window_id: int) -> list[dict]: + return parse_escape_codes(capture.consume(window_id)) + + def test_register_and_unregister(self) -> None: + """Client can register and unregister for drops.""" + with dnd_test_window() as (osw, wid, screen, cap): + # Client registers – state is already wanted=True from fake-window creation, + # but calling the escape code should not break things. + parse_bytes(screen, client_register('text/plain text/uri-list')) + # No output expected at this point (no drop in progress). + self._assert_no_output(cap, wid) + + # Client unregisters. + parse_bytes(screen, client_unregister()) + self._assert_no_output(cap, wid) + + def test_drop_move_sends_move_event(self) -> None: + """A drop entering and moving over the window generates t=m events.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 5, 3, 100, 60) + dnd_test_fake_drop_event(wid, False, ['text/plain', 'text/uri-list']) + + events = self._get_events(cap, wid) + self.assertEqual(len(events), 1, events) + ev = events[0] + self.ae(ev['type'], 'm') + self.ae(ev['meta'].get('x'), '5') + self.ae(ev['meta'].get('y'), '3') + self.ae(ev['meta'].get('X'), '100') + self.ae(ev['meta'].get('Y'), '60') + # MIME list should be present in the payload + self.assertIn(b'text/plain', ev['payload']) + self.assertIn(b'text/uri-list', ev['payload']) + + def test_drop_move_mime_always_sent(self) -> None: + """The current implementation always includes the MIME list in move events.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + mimes = ['text/plain'] + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, False, mimes) + cap.consume(wid) # discard first event + + # Second move with same mimes – list is still included. + dnd_test_set_mouse_pos(wid, 1, 0, 8, 0) + dnd_test_fake_drop_event(wid, False, mimes) + raw = cap.consume(wid) + events = parse_escape_codes(raw) + self.assertEqual(len(events), 1, raw) + self.ae(events[0]['type'], 'm') + self.assertIn(b'text/plain', events[0]['payload']) + + def test_drop_leave_sends_leave_event(self) -> None: + """Drop leaving sends t=m with x=-1,y=-1.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, False, ['text/plain']) + cap.consume(wid) + + dnd_test_fake_drop_event(wid, False, None) # None → leave + events = self._get_events(cap, wid) + self.assertEqual(len(events), 1, events) + ev = events[0] + self.ae(ev['type'], 'm') + self.ae(ev['meta'].get('x'), '-1') + self.ae(ev['meta'].get('y'), '-1') + + def test_client_accepts_drop(self) -> None: + """Client sending t=m:o=1 is recorded and does not trigger extra output.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, False, ['text/plain']) + cap.consume(wid) + + # Client accepts with copy operation. + parse_bytes(screen, client_accept(1, 'text/plain')) + # No immediate output expected. + self._assert_no_output(cap, wid) + + def test_full_drop_flow(self) -> None: + """Complete happy-path: move → accept → drop → request → data → finish.""" + payload_data = b'hello world' + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + + # Move + dnd_test_set_mouse_pos(wid, 2, 3, 16, 24) + dnd_test_fake_drop_event(wid, False, ['text/plain']) + cap.consume(wid) + + # Client accepts + parse_bytes(screen, client_accept(1, 'text/plain')) + + # OS drops + dnd_test_set_mouse_pos(wid, 2, 3, 16, 24) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + events = self._get_events(cap, wid) + self.assertEqual(len(events), 1, events) + self.ae(events[0]['type'], 'M') + self.assertIn(b'text/plain', events[0]['payload']) + + # Client requests data + parse_bytes(screen, client_request_data('text/plain')) + + # OS delivers data + dnd_test_fake_drop_data(wid, 'text/plain', payload_data) + raw = cap.consume(wid) + data_events = parse_escape_codes_b64(raw) + # Should have data chunks plus an empty terminator + self.assertTrue(len(data_events) >= 1, data_events) + combined = b''.join(e['payload'] for e in data_events if e['type'] == 'r') + self.ae(combined, payload_data) + + # Client finishes + parse_bytes(screen, client_request_data('')) + self._assert_no_output(cap, wid) + + def test_request_unknown_mime(self) -> None: + """Requesting a MIME type not in the offered set yields an error.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + # Client requests a MIME that was not offered. + parse_bytes(screen, client_request_data('image/png')) + events = self._get_events(cap, wid) + self.assertEqual(len(events), 1, events) + self.ae(events[0]['type'], 'R') + self.ae(events[0]['payload'].strip(), b'ENOENT') + + def test_data_error_propagation(self) -> None: + """When data retrieval fails the client receives a t=R error code.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + parse_bytes(screen, client_request_data('text/plain')) + + # Simulate I/O error (EIO = 5 on Linux) + dnd_test_fake_drop_data(wid, 'text/plain', b'', errno.EIO) + events = self._get_events(cap, wid) + self.assertEqual(len(events), 1, events) + self.ae(events[0]['type'], 'R') + self.ae(events[0]['payload'].strip(), b'EIO') + + def test_data_eperm_error(self) -> None: + """EPERM error is correctly forwarded to the client.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + parse_bytes(screen, client_request_data('text/plain')) + dnd_test_fake_drop_data(wid, 'text/plain', b'', errno.EPERM) + events = self._get_events(cap, wid) + self.assertEqual(len(events), 1, events) + self.ae(events[0]['type'], 'R') + self.ae(events[0]['payload'].strip(), b'EPERM') + + def test_large_data_chunking(self) -> None: + """Data larger than the chunk limit is sent in multiple base64 chunks.""" + # Each chunk is ≤ 3072 bytes of raw data (base64-encoded to ≤ 4096 bytes). + chunk_limit = 3072 + big_payload = b'X' * (chunk_limit * 3) # 3 chunks expected + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + parse_bytes(screen, client_request_data('text/plain')) + dnd_test_fake_drop_data(wid, 'text/plain', big_payload) + raw = cap.consume(wid) + data_events = parse_escape_codes_b64(raw) + combined = b''.join(e['payload'] for e in data_events if e['type'] == 'r') + self.ae(combined, big_payload) + # Verify that we got more than one escape code (chunking happened) + self.assertGreater(len(data_events), 1, 'expected multiple chunks') + + def test_client_id_propagated(self) -> None: + """The client_id (i=…) set during registration is echoed in all replies.""" + client_id = 42 + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain', client_id=client_id)) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, False, ['text/plain']) + raw = cap.consume(wid) + events = parse_escape_codes(raw) + self.assertEqual(len(events), 1, raw) + self.ae(events[0]['meta'].get('i'), str(client_id)) + + def test_multiple_mimes_priority(self) -> None: + """The client can specify a preferred MIME ordering.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain text/uri-list')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + # OS offers both types. + dnd_test_fake_drop_event(wid, True, ['text/plain', 'text/uri-list']) + cap.consume(wid) + + # Request text/uri-list first (different from registration order). + parse_bytes(screen, client_request_data('text/uri-list')) + dnd_test_fake_drop_data(wid, 'text/uri-list', b'file:///tmp/test\n') + raw = cap.consume(wid) + data_events = parse_escape_codes_b64(raw) + combined = b''.join(e['payload'] for e in data_events if e['type'] == 'r') + self.ae(combined, b'file:///tmp/test\n') + + def test_drop_without_register_no_output(self) -> None: + """If the client has not registered, no escape codes are sent on drop.""" + with dnd_test_window() as (osw, wid, screen, cap): + # Explicitly unregister (clears the wanted flag). + parse_bytes(screen, client_unregister()) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + # Fake window is created with wanted=True; after unregister it should be False. + # drop_move_on_child only sends if w->drop.wanted is true, which is handled + # by the caller (on_drop in glfw.c checks w->drop.wanted before calling). + # Here we call drop_left_child which checks w->drop.wanted. + dnd_test_fake_drop_event(wid, False, None) + self._assert_no_output(cap, wid) + + def test_malformed_dnd_command_invalid_type(self) -> None: + """A DnD command with an unknown type character is silently ignored.""" + with dnd_test_window() as (osw, wid, screen, cap): + # 'z' is not a valid type; the parser should emit an error and return + # without calling any handler – no crash, no output. + bad_cmd = _osc(f'{DND_CODE};t=z;') + parse_bytes(screen, bad_cmd) + self._assert_no_output(cap, wid) + + def test_move_event_after_mime_change(self) -> None: + """When offered MIME list changes, the new list is included in the move event.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, False, ['text/plain']) + cap.consume(wid) + + # Second move with a different MIME list – list must be re-sent. + dnd_test_set_mouse_pos(wid, 1, 0, 8, 0) + dnd_test_fake_drop_event(wid, False, ['text/html', 'text/plain']) + raw = cap.consume(wid) + events = parse_escape_codes(raw) + self.assertEqual(len(events), 1, raw) + self.assertIn(b'text/html', events[0]['payload']) + + def test_drop_event_has_uppercase_M(self) -> None: + """A drop (not just a move) sends t=M (uppercase).""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + events = self._get_events(cap, wid) + self.assertEqual(len(events), 1, events) + self.ae(events[0]['type'], 'M') + + def test_data_end_signal(self) -> None: + """The end-of-data signal is an empty payload escape code.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + parse_bytes(screen, client_request_data('text/plain')) + dnd_test_fake_drop_data(wid, 'text/plain', b'hello') + raw = cap.consume(wid) + events = parse_escape_codes(raw) + # Last event must be an empty (end-of-stream) t=r. + r_events = [e for e in events if e['type'] == 'r'] + self.assertTrue(r_events, 'no t=r events found') + last = r_events[-1] + self.ae(last['payload'], b'') + + def test_empty_data(self) -> None: + """Zero-byte payload is handled gracefully – only end signal is sent.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + parse_bytes(screen, client_request_data('text/plain')) + dnd_test_fake_drop_data(wid, 'text/plain', b'') + raw = cap.consume(wid) + events = parse_escape_codes(raw) + r_events = [e for e in events if e['type'] == 'r'] + # Only the end signal should be present. + self.assertEqual(len(r_events), 1, raw) + self.ae(r_events[0]['payload'], b'')