Add comprehensive tests for request queue and request_id behavior

Agent-Logs-Url: https://github.com/kovidgoyal/kitty/sessions/3a4975c2-8691-486b-8ff9-f8a2146b8756

Co-authored-by: kovidgoyal <1308621+kovidgoyal@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-04-09 03:20:41 +00:00
committed by GitHub
parent f42d07e7ac
commit 4339deb647

View File

@@ -50,23 +50,27 @@ def client_accept(operation: int, mimes: str = '', client_id: int = 0) -> bytes:
return _osc(f'{meta};{mimes}')
def client_request_data(mime: str = '', client_id: int = 0) -> bytes:
def client_request_data(mime: str = '', client_id: int = 0, request_id: int = 0) -> bytes:
"""Escape code a client sends to request data (t=r) or finish the drop (t=r with no MIME)."""
meta = f'{DND_CODE};t=r'
if request_id:
meta += f':r={request_id}'
if client_id:
meta += f':i={client_id}'
return _osc(f'{meta};{mime}')
def client_request_uri_data(idx: int, client_id: int = 0) -> bytes:
def client_request_uri_data(idx: int, client_id: int = 0, request_id: int = 0) -> bytes:
"""Escape code a client sends to request a file from the URI list (t=s ; text/uri-list:idx)."""
meta = f'{DND_CODE};t=s'
if request_id:
meta += f':r={request_id}'
if client_id:
meta += f':i={client_id}'
return _osc(f'{meta};text/uri-list:{idx}')
def client_dir_read(handle_id: int, entry_num: int | None = None, client_id: int = 0) -> bytes:
def client_dir_read(handle_id: int, entry_num: int | None = None, client_id: int = 0, request_id: int = 0) -> bytes:
"""Escape code for a directory request (t=d:x=handle_id[:y=entry_num]).
* entry_num=None close the directory handle.
@@ -75,6 +79,8 @@ def client_dir_read(handle_id: int, entry_num: int | None = None, client_id: int
meta = f'{DND_CODE};t=d:x={handle_id}'
if entry_num is not None:
meta += f':y={entry_num}'
if request_id:
meta += f':r={request_id}'
if client_id:
meta += f':i={client_id}'
return _osc(meta)
@@ -1469,3 +1475,374 @@ class TestDnDProtocol(BaseTest):
# drag_start calls expand_rgb_data which checks sz == w*h*3
parse_bytes(screen, client_drag_start())
self.assert_error(cap, wid)
# ---- Request queue and request_id tests ----------------------------------
def test_request_id_echoed_in_data_response(self) -> None:
"""request_id is echoed back as r=ID in data responses."""
payload_data = b'hello request_id'
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
parse_bytes(screen, client_request_data('text/plain', request_id=42))
dnd_test_fake_drop_data(wid, 'text/plain', payload_data)
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events, 'no t=r events')
for ev in r_events:
self.ae(ev['meta'].get('r'), '42', f'expected r=42, got {ev["meta"]}')
combined = b''.join(e['payload'] for e in r_events)
self.ae(combined, payload_data)
def test_request_id_echoed_in_error_response(self) -> None:
"""request_id is echoed back as r=ID in error responses."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
parse_bytes(screen, client_request_data('image/png', request_id=99))
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1, events)
self.ae(events[0]['type'], 'R')
self.ae(events[0]['meta'].get('r'), '99')
self.ae(events[0]['payload'].strip(), b'ENOENT')
def test_request_id_zero_not_included(self) -> None:
"""When request_id is 0 (default), r= is not included in responses."""
payload_data = b'no request_id'
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
# Request without request_id (defaults to 0)
parse_bytes(screen, client_request_data('text/plain'))
dnd_test_fake_drop_data(wid, 'text/plain', payload_data)
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events, 'no t=r events')
for ev in r_events:
self.assertNotIn('r', ev['meta'], f'r= should not be present when request_id=0, got {ev["meta"]}')
def test_request_id_in_error_for_io_failure(self) -> None:
"""request_id is echoed in I/O error responses."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
parse_bytes(screen, client_request_data('text/plain', request_id=77))
dnd_test_fake_drop_data(wid, 'text/plain', b'', errno.EIO)
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1, events)
self.ae(events[0]['type'], 'R')
self.ae(events[0]['meta'].get('r'), '77')
self.ae(events[0]['payload'].strip(), b'EIO')
def test_multiple_queued_requests_fifo(self) -> None:
"""Multiple requests with different request_ids are served in FIFO order."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain text/html'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain', 'text/html'])
cap.consume(wid)
# Queue two requests
parse_bytes(screen, client_request_data('text/plain', request_id=1))
parse_bytes(screen, client_request_data('text/html', request_id=2))
# First request (text/plain) gets served first
dnd_test_fake_drop_data(wid, 'text/plain', b'plain data')
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r' and e['meta'].get('r') == '1']
self.assertTrue(r_events, 'no t=r events for first request')
combined = b''.join(e['payload'] for e in r_events)
self.ae(combined, b'plain data')
# Second request (text/html) gets served next
dnd_test_fake_drop_data(wid, 'text/html', b'<html>data</html>')
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r' and e['meta'].get('r') == '2']
self.assertTrue(r_events, 'no t=r events for second request')
combined = b''.join(e['payload'] for e in r_events)
self.ae(combined, b'<html>data</html>')
def test_request_after_error_proceeds(self) -> None:
"""After an error response, the next queued request is processed."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
# Queue: request for unknown MIME (error) followed by valid request
parse_bytes(screen, client_request_data('image/png', request_id=10))
parse_bytes(screen, client_request_data('text/plain', request_id=11))
# The error for request 10 should have been sent immediately
raw = cap.consume(wid)
events = parse_escape_codes(raw)
err_events = [e for e in events if e['type'] == 'R']
self.assertEqual(len(err_events), 1, events)
self.ae(err_events[0]['meta'].get('r'), '10')
self.ae(err_events[0]['payload'].strip(), b'ENOENT')
# Now serve request 11
dnd_test_fake_drop_data(wid, 'text/plain', b'second request data')
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events, 'no t=r events for second request')
for ev in r_events:
self.ae(ev['meta'].get('r'), '11')
def test_queue_overflow_returns_emfile(self) -> None:
"""Exceeding 128 queued requests returns EMFILE and ends the drop."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
# First request starts async processing
parse_bytes(screen, client_request_data('text/plain', request_id=1))
# Queue 127 more requests (fill to capacity = 128)
for i in range(2, 129):
parse_bytes(screen, client_request_data('text/plain', request_id=i))
# No error yet - queue is at capacity
raw = cap.consume(wid)
err_events = [e for e in parse_escape_codes(raw) if e['type'] == 'R']
self.assertEqual(len(err_events), 0, f'unexpected errors: {err_events}')
# 129th request should trigger EMFILE
parse_bytes(screen, client_request_data('text/plain', request_id=999))
raw = cap.consume(wid)
events = parse_escape_codes(raw)
err_events = [e for e in events if e['type'] == 'R']
self.assertTrue(err_events, 'expected EMFILE error')
self.ae(err_events[0]['meta'].get('r'), '999')
self.ae(err_events[0]['payload'].strip(), b'EMFILE')
def test_request_id_in_uri_file_response(self) -> None:
"""request_id is echoed in t=s (URI file) data responses."""
import os
import tempfile
content = b'URI file with request_id\n'
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(content)
fpath = f.name
try:
uri_list = f'file://{fpath}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0, request_id=55))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events, 'no t=r events')
for ev in r_events:
self.ae(ev['meta'].get('r'), '55')
combined = b''.join(e['payload'] for e in r_events)
self.ae(combined, content)
finally:
os.unlink(fpath)
def test_request_id_in_uri_error_response(self) -> None:
"""request_id is echoed in t=s error responses."""
uri_list = b'file:///tmp/no_such_file_dnd_test_xyz\r\n'
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0, request_id=66))
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1, events)
self.ae(events[0]['type'], 'R')
self.ae(events[0]['meta'].get('r'), '66')
def test_request_id_in_dir_listing_response(self) -> None:
"""request_id is echoed in directory listing (t=d) responses."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
open(os.path.join(root, 'file.txt'), 'w').close()
uri_list = f'file://{root}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0, request_id=88))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_events = [e for e in events if e['type'] == 'd']
self.assertTrue(d_events, 'expected t=d listing')
for ev in d_events:
self.ae(ev['meta'].get('r'), '88')
def test_request_id_in_dir_entry_file_response(self) -> None:
"""request_id is echoed when reading a file via directory handle (t=d)."""
import os
import tempfile
content = b'directory file content\n'
with tempfile.TemporaryDirectory() as root:
with open(os.path.join(root, 'f.txt'), 'wb') as f:
f.write(content)
uri_list = f'file://{root}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
# Get dir listing first (no request_id needed for setup)
parse_bytes(screen, client_request_uri_data(0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_events = [e for e in events if e['type'] == 'd']
self.assertTrue(d_events)
handle_id = int(d_events[0]['meta']['x'])
listing = b''.join(chunk for e in d_events for chunk in e['chunks'] if chunk)
entries = [e.decode() for e in listing.split(b'\x00')[1:] if e]
f_idx = entries.index('f.txt') + 1
# Read file with request_id
parse_bytes(screen, client_dir_read(handle_id, f_idx, request_id=33))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events, 'no t=r events')
for ev in r_events:
self.ae(ev['meta'].get('r'), '33')
combined = b''.join(e['payload'] for e in r_events)
self.ae(combined, content)
def test_request_id_in_dir_entry_error_response(self) -> None:
"""request_id is echoed when a directory entry read fails."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
open(os.path.join(root, 'only.txt'), 'w').close()
uri_list = f'file://{root}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_events = [e for e in events if e['type'] == 'd']
handle_id = int(d_events[0]['meta']['x'])
# Out-of-range entry with request_id
parse_bytes(screen, client_dir_read(handle_id, 999, request_id=44))
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1)
self.ae(events[0]['type'], 'R')
self.ae(events[0]['meta'].get('r'), '44')
self.ae(events[0]['payload'].strip(), b'ENOENT')
def test_mixed_request_types_with_ids(self) -> None:
"""Mixed r/s/d request types with request_ids are processed in order."""
import os
import tempfile
file_content = b'mixed request file\n'
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(file_content)
fpath = f.name
try:
uri_list = f'file://{fpath}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
# Queue: MIME data request, then URI file request
parse_bytes(screen, client_request_data('text/plain', request_id=100))
parse_bytes(screen, client_request_uri_data(0, request_id=200))
# Serve first request (MIME data); the URI file request
# completes synchronously right after so all output is in one batch
dnd_test_fake_drop_data(wid, 'text/plain', b'plain text')
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events_100 = [e for e in events if e['type'] == 'r' and e['meta'].get('r') == '100']
self.assertTrue(r_events_100, 'no events with r=100')
r_events_200 = [e for e in events if e['type'] == 'r' and e['meta'].get('r') == '200']
self.assertTrue(r_events_200, 'no events with r=200')
combined = b''.join(e['payload'] for e in r_events_200)
self.ae(combined, file_content)
finally:
os.unlink(fpath)
def test_finish_after_queued_requests(self) -> None:
"""A finish (empty t=r) after queued requests processes remaining then finishes."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
# Queue: data request then finish
parse_bytes(screen, client_request_data('text/plain', request_id=5))
parse_bytes(screen, client_request_data('')) # finish
# Serve the data request
dnd_test_fake_drop_data(wid, 'text/plain', b'data before finish')
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events, 'no t=r events')
for ev in r_events:
self.ae(ev['meta'].get('r'), '5')
def test_multiple_sync_errors_processed_immediately(self) -> None:
"""Multiple queued requests that all fail synchronously are processed immediately."""
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 0, 0, 0, 0)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
# Queue three requests for unknown MIMEs
parse_bytes(screen, client_request_data('image/png', request_id=1))
parse_bytes(screen, client_request_data('image/gif', request_id=2))
parse_bytes(screen, client_request_data('image/jpeg', request_id=3))
# All three errors should be available immediately
raw = cap.consume(wid)
events = parse_escape_codes(raw)
err_events = [e for e in events if e['type'] == 'R']
self.assertEqual(len(err_events), 3, f'expected 3 errors, got {len(err_events)}: {err_events}')
self.ae(err_events[0]['meta'].get('r'), '1')
self.ae(err_events[1]['meta'].get('r'), '2')
self.ae(err_events[2]['meta'].get('r'), '3')
for ev in err_events:
self.ae(ev['payload'].strip(), b'ENOENT')
def test_request_id_backward_compat_full_flow(self) -> None:
"""Full drop flow without request_id (backward compatibility) still works."""
payload_data = b'backward compat data'
with dnd_test_window() as (osw, wid, screen, cap):
parse_bytes(screen, client_register('text/plain'))
dnd_test_set_mouse_pos(wid, 2, 3, 16, 24)
dnd_test_fake_drop_event(wid, True, ['text/plain'])
cap.consume(wid)
# Request without request_id
parse_bytes(screen, client_request_data('text/plain'))
dnd_test_fake_drop_data(wid, 'text/plain', payload_data)
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events)
combined = b''.join(e['payload'] for e in r_events)
self.ae(combined, payload_data)
# Verify no r= in metadata
for ev in r_events:
self.assertNotIn('r', ev['meta'])
# Finish
parse_bytes(screen, client_request_data(''))
self._assert_no_output(cap, wid)