diff --git a/kitty_tests/dnd.py b/kitty_tests/dnd.py index 143227f32..3a90d61c4 100644 --- a/kitty_tests/dnd.py +++ b/kitty_tests/dnd.py @@ -50,23 +50,27 @@ def client_accept(operation: int, mimes: str = '', client_id: int = 0) -> bytes: return _osc(f'{meta};{mimes}') -def client_request_data(mime: str = '', client_id: int = 0) -> bytes: +def client_request_data(mime: str = '', client_id: int = 0, request_id: int = 0) -> bytes: """Escape code a client sends to request data (t=r) or finish the drop (t=r with no MIME).""" meta = f'{DND_CODE};t=r' + if request_id: + meta += f':r={request_id}' if client_id: meta += f':i={client_id}' return _osc(f'{meta};{mime}') -def client_request_uri_data(idx: int, client_id: int = 0) -> bytes: +def client_request_uri_data(idx: int, client_id: int = 0, request_id: int = 0) -> bytes: """Escape code a client sends to request a file from the URI list (t=s ; text/uri-list:idx).""" meta = f'{DND_CODE};t=s' + if request_id: + meta += f':r={request_id}' if client_id: meta += f':i={client_id}' return _osc(f'{meta};text/uri-list:{idx}') -def client_dir_read(handle_id: int, entry_num: int | None = None, client_id: int = 0) -> bytes: +def client_dir_read(handle_id: int, entry_num: int | None = None, client_id: int = 0, request_id: int = 0) -> bytes: """Escape code for a directory request (t=d:x=handle_id[:y=entry_num]). * entry_num=None → close the directory handle. @@ -75,6 +79,8 @@ def client_dir_read(handle_id: int, entry_num: int | None = None, client_id: int meta = f'{DND_CODE};t=d:x={handle_id}' if entry_num is not None: meta += f':y={entry_num}' + if request_id: + meta += f':r={request_id}' if client_id: meta += f':i={client_id}' return _osc(meta) @@ -1469,3 +1475,374 @@ class TestDnDProtocol(BaseTest): # drag_start calls expand_rgb_data which checks sz == w*h*3 parse_bytes(screen, client_drag_start()) self.assert_error(cap, wid) + + # ---- Request queue and request_id tests ---------------------------------- + + def test_request_id_echoed_in_data_response(self) -> None: + """request_id is echoed back as r=ID in data responses.""" + payload_data = b'hello request_id' + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + parse_bytes(screen, client_request_data('text/plain', request_id=42)) + dnd_test_fake_drop_data(wid, 'text/plain', payload_data) + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + r_events = [e for e in events if e['type'] == 'r'] + self.assertTrue(r_events, 'no t=r events') + for ev in r_events: + self.ae(ev['meta'].get('r'), '42', f'expected r=42, got {ev["meta"]}') + combined = b''.join(e['payload'] for e in r_events) + self.ae(combined, payload_data) + + def test_request_id_echoed_in_error_response(self) -> None: + """request_id is echoed back as r=ID in error responses.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + parse_bytes(screen, client_request_data('image/png', request_id=99)) + events = self._get_events(cap, wid) + self.assertEqual(len(events), 1, events) + self.ae(events[0]['type'], 'R') + self.ae(events[0]['meta'].get('r'), '99') + self.ae(events[0]['payload'].strip(), b'ENOENT') + + def test_request_id_zero_not_included(self) -> None: + """When request_id is 0 (default), r= is not included in responses.""" + payload_data = b'no request_id' + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + # Request without request_id (defaults to 0) + parse_bytes(screen, client_request_data('text/plain')) + dnd_test_fake_drop_data(wid, 'text/plain', payload_data) + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + r_events = [e for e in events if e['type'] == 'r'] + self.assertTrue(r_events, 'no t=r events') + for ev in r_events: + self.assertNotIn('r', ev['meta'], f'r= should not be present when request_id=0, got {ev["meta"]}') + + def test_request_id_in_error_for_io_failure(self) -> None: + """request_id is echoed in I/O error responses.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + parse_bytes(screen, client_request_data('text/plain', request_id=77)) + dnd_test_fake_drop_data(wid, 'text/plain', b'', errno.EIO) + events = self._get_events(cap, wid) + self.assertEqual(len(events), 1, events) + self.ae(events[0]['type'], 'R') + self.ae(events[0]['meta'].get('r'), '77') + self.ae(events[0]['payload'].strip(), b'EIO') + + def test_multiple_queued_requests_fifo(self) -> None: + """Multiple requests with different request_ids are served in FIFO order.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain text/html')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain', 'text/html']) + cap.consume(wid) + + # Queue two requests + parse_bytes(screen, client_request_data('text/plain', request_id=1)) + parse_bytes(screen, client_request_data('text/html', request_id=2)) + + # First request (text/plain) gets served first + dnd_test_fake_drop_data(wid, 'text/plain', b'plain data') + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + r_events = [e for e in events if e['type'] == 'r' and e['meta'].get('r') == '1'] + self.assertTrue(r_events, 'no t=r events for first request') + combined = b''.join(e['payload'] for e in r_events) + self.ae(combined, b'plain data') + + # Second request (text/html) gets served next + dnd_test_fake_drop_data(wid, 'text/html', b'data') + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + r_events = [e for e in events if e['type'] == 'r' and e['meta'].get('r') == '2'] + self.assertTrue(r_events, 'no t=r events for second request') + combined = b''.join(e['payload'] for e in r_events) + self.ae(combined, b'data') + + def test_request_after_error_proceeds(self) -> None: + """After an error response, the next queued request is processed.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + # Queue: request for unknown MIME (error) followed by valid request + parse_bytes(screen, client_request_data('image/png', request_id=10)) + parse_bytes(screen, client_request_data('text/plain', request_id=11)) + + # The error for request 10 should have been sent immediately + raw = cap.consume(wid) + events = parse_escape_codes(raw) + err_events = [e for e in events if e['type'] == 'R'] + self.assertEqual(len(err_events), 1, events) + self.ae(err_events[0]['meta'].get('r'), '10') + self.ae(err_events[0]['payload'].strip(), b'ENOENT') + + # Now serve request 11 + dnd_test_fake_drop_data(wid, 'text/plain', b'second request data') + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + r_events = [e for e in events if e['type'] == 'r'] + self.assertTrue(r_events, 'no t=r events for second request') + for ev in r_events: + self.ae(ev['meta'].get('r'), '11') + + def test_queue_overflow_returns_emfile(self) -> None: + """Exceeding 128 queued requests returns EMFILE and ends the drop.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + # First request starts async processing + parse_bytes(screen, client_request_data('text/plain', request_id=1)) + + # Queue 127 more requests (fill to capacity = 128) + for i in range(2, 129): + parse_bytes(screen, client_request_data('text/plain', request_id=i)) + + # No error yet - queue is at capacity + raw = cap.consume(wid) + err_events = [e for e in parse_escape_codes(raw) if e['type'] == 'R'] + self.assertEqual(len(err_events), 0, f'unexpected errors: {err_events}') + + # 129th request should trigger EMFILE + parse_bytes(screen, client_request_data('text/plain', request_id=999)) + raw = cap.consume(wid) + events = parse_escape_codes(raw) + err_events = [e for e in events if e['type'] == 'R'] + self.assertTrue(err_events, 'expected EMFILE error') + self.ae(err_events[0]['meta'].get('r'), '999') + self.ae(err_events[0]['payload'].strip(), b'EMFILE') + + def test_request_id_in_uri_file_response(self) -> None: + """request_id is echoed in t=s (URI file) data responses.""" + import os + import tempfile + content = b'URI file with request_id\n' + with tempfile.NamedTemporaryFile(delete=False) as f: + f.write(content) + fpath = f.name + try: + uri_list = f'file://{fpath}\r\n'.encode() + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_uri_drop(screen, wid, cap, uri_list) + parse_bytes(screen, client_request_uri_data(0, request_id=55)) + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + r_events = [e for e in events if e['type'] == 'r'] + self.assertTrue(r_events, 'no t=r events') + for ev in r_events: + self.ae(ev['meta'].get('r'), '55') + combined = b''.join(e['payload'] for e in r_events) + self.ae(combined, content) + finally: + os.unlink(fpath) + + def test_request_id_in_uri_error_response(self) -> None: + """request_id is echoed in t=s error responses.""" + uri_list = b'file:///tmp/no_such_file_dnd_test_xyz\r\n' + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_uri_drop(screen, wid, cap, uri_list) + parse_bytes(screen, client_request_uri_data(0, request_id=66)) + events = self._get_events(cap, wid) + self.assertEqual(len(events), 1, events) + self.ae(events[0]['type'], 'R') + self.ae(events[0]['meta'].get('r'), '66') + + def test_request_id_in_dir_listing_response(self) -> None: + """request_id is echoed in directory listing (t=d) responses.""" + import os + import tempfile + with tempfile.TemporaryDirectory() as root: + open(os.path.join(root, 'file.txt'), 'w').close() + uri_list = f'file://{root}\r\n'.encode() + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_uri_drop(screen, wid, cap, uri_list) + parse_bytes(screen, client_request_uri_data(0, request_id=88)) + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + d_events = [e for e in events if e['type'] == 'd'] + self.assertTrue(d_events, 'expected t=d listing') + for ev in d_events: + self.ae(ev['meta'].get('r'), '88') + + def test_request_id_in_dir_entry_file_response(self) -> None: + """request_id is echoed when reading a file via directory handle (t=d).""" + import os + import tempfile + content = b'directory file content\n' + with tempfile.TemporaryDirectory() as root: + with open(os.path.join(root, 'f.txt'), 'wb') as f: + f.write(content) + uri_list = f'file://{root}\r\n'.encode() + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_uri_drop(screen, wid, cap, uri_list) + # Get dir listing first (no request_id needed for setup) + parse_bytes(screen, client_request_uri_data(0)) + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + d_events = [e for e in events if e['type'] == 'd'] + self.assertTrue(d_events) + handle_id = int(d_events[0]['meta']['x']) + listing = b''.join(chunk for e in d_events for chunk in e['chunks'] if chunk) + entries = [e.decode() for e in listing.split(b'\x00')[1:] if e] + f_idx = entries.index('f.txt') + 1 + + # Read file with request_id + parse_bytes(screen, client_dir_read(handle_id, f_idx, request_id=33)) + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + r_events = [e for e in events if e['type'] == 'r'] + self.assertTrue(r_events, 'no t=r events') + for ev in r_events: + self.ae(ev['meta'].get('r'), '33') + combined = b''.join(e['payload'] for e in r_events) + self.ae(combined, content) + + def test_request_id_in_dir_entry_error_response(self) -> None: + """request_id is echoed when a directory entry read fails.""" + import os + import tempfile + with tempfile.TemporaryDirectory() as root: + open(os.path.join(root, 'only.txt'), 'w').close() + uri_list = f'file://{root}\r\n'.encode() + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_uri_drop(screen, wid, cap, uri_list) + parse_bytes(screen, client_request_uri_data(0)) + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + d_events = [e for e in events if e['type'] == 'd'] + handle_id = int(d_events[0]['meta']['x']) + + # Out-of-range entry with request_id + parse_bytes(screen, client_dir_read(handle_id, 999, request_id=44)) + events = self._get_events(cap, wid) + self.assertEqual(len(events), 1) + self.ae(events[0]['type'], 'R') + self.ae(events[0]['meta'].get('r'), '44') + self.ae(events[0]['payload'].strip(), b'ENOENT') + + def test_mixed_request_types_with_ids(self) -> None: + """Mixed r/s/d request types with request_ids are processed in order.""" + import os + import tempfile + file_content = b'mixed request file\n' + with tempfile.NamedTemporaryFile(delete=False) as f: + f.write(file_content) + fpath = f.name + try: + uri_list = f'file://{fpath}\r\n'.encode() + with dnd_test_window() as (osw, wid, screen, cap): + self._setup_uri_drop(screen, wid, cap, uri_list) + + # Queue: MIME data request, then URI file request + parse_bytes(screen, client_request_data('text/plain', request_id=100)) + parse_bytes(screen, client_request_uri_data(0, request_id=200)) + + # Serve first request (MIME data); the URI file request + # completes synchronously right after so all output is in one batch + dnd_test_fake_drop_data(wid, 'text/plain', b'plain text') + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + r_events_100 = [e for e in events if e['type'] == 'r' and e['meta'].get('r') == '100'] + self.assertTrue(r_events_100, 'no events with r=100') + + r_events_200 = [e for e in events if e['type'] == 'r' and e['meta'].get('r') == '200'] + self.assertTrue(r_events_200, 'no events with r=200') + combined = b''.join(e['payload'] for e in r_events_200) + self.ae(combined, file_content) + finally: + os.unlink(fpath) + + def test_finish_after_queued_requests(self) -> None: + """A finish (empty t=r) after queued requests processes remaining then finishes.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + # Queue: data request then finish + parse_bytes(screen, client_request_data('text/plain', request_id=5)) + parse_bytes(screen, client_request_data('')) # finish + + # Serve the data request + dnd_test_fake_drop_data(wid, 'text/plain', b'data before finish') + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + r_events = [e for e in events if e['type'] == 'r'] + self.assertTrue(r_events, 'no t=r events') + for ev in r_events: + self.ae(ev['meta'].get('r'), '5') + + def test_multiple_sync_errors_processed_immediately(self) -> None: + """Multiple queued requests that all fail synchronously are processed immediately.""" + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 0, 0, 0, 0) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + # Queue three requests for unknown MIMEs + parse_bytes(screen, client_request_data('image/png', request_id=1)) + parse_bytes(screen, client_request_data('image/gif', request_id=2)) + parse_bytes(screen, client_request_data('image/jpeg', request_id=3)) + + # All three errors should be available immediately + raw = cap.consume(wid) + events = parse_escape_codes(raw) + err_events = [e for e in events if e['type'] == 'R'] + self.assertEqual(len(err_events), 3, f'expected 3 errors, got {len(err_events)}: {err_events}') + self.ae(err_events[0]['meta'].get('r'), '1') + self.ae(err_events[1]['meta'].get('r'), '2') + self.ae(err_events[2]['meta'].get('r'), '3') + for ev in err_events: + self.ae(ev['payload'].strip(), b'ENOENT') + + def test_request_id_backward_compat_full_flow(self) -> None: + """Full drop flow without request_id (backward compatibility) still works.""" + payload_data = b'backward compat data' + with dnd_test_window() as (osw, wid, screen, cap): + parse_bytes(screen, client_register('text/plain')) + dnd_test_set_mouse_pos(wid, 2, 3, 16, 24) + dnd_test_fake_drop_event(wid, True, ['text/plain']) + cap.consume(wid) + + # Request without request_id + parse_bytes(screen, client_request_data('text/plain')) + dnd_test_fake_drop_data(wid, 'text/plain', payload_data) + raw = cap.consume(wid) + events = parse_escape_codes_b64(raw) + r_events = [e for e in events if e['type'] == 'r'] + self.assertTrue(r_events) + combined = b''.join(e['payload'] for e in r_events) + self.ae(combined, payload_data) + # Verify no r= in metadata + for ev in r_events: + self.assertNotIn('r', ev['meta']) + + # Finish + parse_bytes(screen, client_request_data('')) + self._assert_no_output(cap, wid)