Implement DnD protocol changes for symlinks and directory handling

- Remove unique identifier (device:inode) from directory listings
- Change directory entry indexing from 1-based to 0-based
- Add symlink handling in directories: respond with t=r:X=1 and target
- Update parser to default cell_y to -1 for close handle detection
- Update and extend tests for all new behaviors

Agent-Logs-Url: https://github.com/kovidgoyal/kitty/sessions/d09883e5-f460-471d-9dcf-e64e7b96882f

Co-authored-by: kovidgoyal <1308621+kovidgoyal@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-04-09 11:33:43 +00:00
committed by GitHub
parent a92b381dc3
commit d548afc94a
6 changed files with 348 additions and 61 deletions

View File

@@ -114,6 +114,7 @@ def generate(
payload_is_base64: bool = True,
start_parsing_at: int = 1,
field_sep: str = ',',
post_init: str = '',
) -> str:
type_map = resolve_keys(keymap)
keys_enum = enum(keymap)
@@ -164,6 +165,7 @@ static inline void
enum PARSER_STATES {{ KEY, EQUAL, UINT, INT, FLAG, AFTER_VALUE {payload} }};
enum PARSER_STATES state = KEY, value_state = FLAG;
{command_class} g = {{0}};
{post_init}
unsigned int i, code;
uint64_t lcode; int64_t accumulator;
bool is_negative; (void)is_negative;
@@ -343,7 +345,7 @@ def parsers() -> None:
}
text = generate(
'parse_dnd_code', 'screen_handle_dnd_command', 'dnd_command', keymap, 'DnDCommand',
payload_is_base64=False, start_parsing_at=0, field_sep=':')
payload_is_base64=False, start_parsing_at=0, field_sep=':', post_init='g.cell_y = -1;')
write_header(text, 'kitty/parse-dnd-command.h')

View File

@@ -710,9 +710,6 @@ drop_find_dir_handle(Window *w, uint32_t id) {
* send the listing to the client as a t=d:x=handle_id response. */
static void
drop_send_dir_listing(Window *w, const char *path) {
struct stat st;
if (stat(path, &st) < 0) { drop_send_error(w, EIO); return; }
DIR *dir = opendir(path);
if (!dir) {
switch (errno) {
@@ -723,17 +720,11 @@ drop_send_dir_listing(Window *w, const char *path) {
return;
}
/* Build null-separated payload: unique_id\0entry1\0entry2\0... */
/* Build null-separated payload: entry1\0entry2\0... */
size_t payload_cap = 4096, payload_sz = 0;
char *payload = malloc(payload_cap);
if (!payload) { closedir(dir); drop_send_error(w, EIO); return; }
/* First entry: unique identifier (device:inode) */
char uid[64];
int uid_len = snprintf(uid, sizeof(uid), "%llu:%llu",
(unsigned long long)st.st_dev,
(unsigned long long)st.st_ino);
#define APPEND(s, n) do { \
size_t _n = (size_t)(n); \
size_t _need = payload_sz + _n + 1; \
@@ -748,8 +739,6 @@ drop_send_dir_listing(Window *w, const char *path) {
payload[payload_sz++] = 0; \
} while(0)
APPEND(uid, uid_len);
/* Collect directory entries */
size_t ents_cap = 16, ents_num = 0;
char **ents = malloc(sizeof(char *) * ents_cap);
@@ -883,7 +872,7 @@ drop_request_uri_data(Window *w, const char *payload, size_t payload_sz) {
/* Handle a t=d request from the client.
* handle_id: the directory handle (x= key).
* entry_num: 0 means close the handle; >=1 means read that entry (1-based).
* entry_num: <0 means close the handle; >=0 means read that entry (0-based).
* Returns true if completed synchronously, false if async file I/O started. */
static bool
do_drop_handle_dir_request(Window *w, uint32_t handle_id, int32_t entry_num) {
@@ -892,7 +881,7 @@ do_drop_handle_dir_request(Window *w, uint32_t handle_id, int32_t entry_num) {
DirHandle *h = drop_find_dir_handle(w, handle_id);
if (!h) { drop_send_error(w, EINVAL); return true; }
if (entry_num == 0) {
if (entry_num < 0) {
/* Close the handle */
size_t hidx = (size_t)(h - w->drop.dir_handles);
drop_free_dir_handle(h);
@@ -900,8 +889,8 @@ do_drop_handle_dir_request(Window *w, uint32_t handle_id, int32_t entry_num) {
return true;
}
/* Read the entry at 1-based index */
size_t eidx = (size_t)(entry_num - 1);
/* Read the entry at 0-based index */
size_t eidx = (size_t)entry_num;
if (eidx >= h->num_entries) { drop_send_error(w, ENOENT); return true; }
char full[PATH_MAX];
@@ -909,8 +898,8 @@ do_drop_handle_dir_request(Window *w, uint32_t handle_id, int32_t entry_num) {
drop_send_error(w, EIO); return true;
}
struct stat st;
if (stat(full, &st) < 0) {
struct stat lst;
if (lstat(full, &lst) < 0) {
switch (errno) {
case ENOENT: case ENOTDIR: case ELOOP: drop_send_error(w, ENOENT); break;
case EACCES: case EPERM: drop_send_error(w, EPERM); break;
@@ -919,10 +908,32 @@ do_drop_handle_dir_request(Window *w, uint32_t handle_id, int32_t entry_num) {
return true;
}
if (S_ISDIR(st.st_mode)) {
if (S_ISLNK(lst.st_mode)) {
/* Symlink: send the symlink target as t=r:X=1 */
char target[PATH_MAX];
ssize_t tlen = readlink(full, target, sizeof(target) - 1);
if (tlen < 0) {
switch (errno) {
case ENOENT: case ENOTDIR: drop_send_error(w, ENOENT); break;
case EACCES: case EPERM: drop_send_error(w, EPERM); break;
default: drop_send_error(w, EIO); break;
}
return true;
}
target[tlen] = '\0';
char hdr[128];
int hdr_sz = snprintf(hdr, sizeof(hdr), "\x1b]%d;t=r:X=1", DND_CODE);
if (w->drop.current_request_id)
hdr_sz += snprintf(hdr + hdr_sz, sizeof(hdr) - hdr_sz, ":r=%u", (unsigned)w->drop.current_request_id);
queue_payload_to_child(w->id, w->drop.client_id, &w->drop.pending, hdr, hdr_sz, target, (size_t)tlen, true);
queue_payload_to_child(w->id, w->drop.client_id, &w->drop.pending, hdr, hdr_sz, NULL, 0, true);
return true;
}
if (S_ISDIR(lst.st_mode)) {
drop_send_dir_listing(w, full);
return true;
} else if (S_ISREG(st.st_mode)) {
} else if (S_ISREG(lst.st_mode)) {
return drop_send_file_data(w, full);
} else {
drop_send_error(w, EINVAL);

View File

@@ -11,6 +11,7 @@ static inline void parse_dnd_code(PS *self, uint8_t *parser_buf,
enum PARSER_STATES { KEY, EQUAL, UINT, INT, FLAG, AFTER_VALUE, PAYLOAD };
enum PARSER_STATES state = KEY, value_state = FLAG;
DnDCommand g = {0};
g.cell_y = -1;
unsigned int i, code;
uint64_t lcode;
int64_t accumulator;

View File

@@ -11,6 +11,7 @@ static inline void parse_graphics_code(PS *self, uint8_t *parser_buf,
enum PARSER_STATES { KEY, EQUAL, UINT, INT, FLAG, AFTER_VALUE, PAYLOAD };
enum PARSER_STATES state = KEY, value_state = FLAG;
GraphicsCommand g = {0};
unsigned int i, code;
uint64_t lcode;
int64_t accumulator;

View File

@@ -11,6 +11,7 @@ static inline void parse_multicell_code(PS *self, uint8_t *parser_buf,
enum PARSER_STATES { KEY, EQUAL, UINT, INT, FLAG, AFTER_VALUE, PAYLOAD };
enum PARSER_STATES state = KEY, value_state = FLAG;
MultiCellCommand g = {0};
unsigned int i, code;
uint64_t lcode;
int64_t accumulator;

View File

@@ -74,7 +74,7 @@ def client_dir_read(handle_id: int, entry_num: int | None = None, client_id: int
"""Escape code for a directory request (t=d:x=handle_id[:y=entry_num]).
* entry_num=None close the directory handle.
* entry_num>=1 read that entry (1-based).
* entry_num>=0 read that entry (0-based).
"""
meta = f'{DND_CODE};t=d:x={handle_id}'
if entry_num is not None:
@@ -773,19 +773,16 @@ class TestDnDProtocol(BaseTest):
root_handle_id = int(d_events[0]['meta']['x'])
self.assertGreater(root_handle_id, 0)
# Decode null-separated entries
# Decode null-separated entries (no unique identifier prefix)
root_entries = [e for e in root_listing_payload.split(b'\x00') if e]
# First entry is the unique identifier; remainder are file/dir names
self.assertGreater(len(root_entries), 1,
f'expected entries, got {root_entries}')
entry_names = {e.decode() for e in root_entries[1:]}
entry_names = {e.decode() for e in root_entries}
self.assertIn('a.txt', entry_names)
self.assertIn('b', entry_names)
# Find index of 'a.txt' in the entries list (1-based for t=d:y=)
entries_list = [e.decode() for e in root_entries[1:]]
a_idx = entries_list.index('a.txt') + 1
b_idx = entries_list.index('b') + 1
# Find index of 'a.txt' in the entries list (0-based for t=d:y=)
entries_list = [e.decode() for e in root_entries]
a_idx = entries_list.index('a.txt')
b_idx = entries_list.index('b')
# Read a.txt
parse_bytes(screen, client_dir_read(root_handle_id, a_idx))
@@ -809,13 +806,13 @@ class TestDnDProtocol(BaseTest):
self.assertNotEqual(b_handle_id, root_handle_id)
b_entries = [e for e in b_listing_payload.split(b'\x00') if e]
b_names = {e.decode() for e in b_entries[1:]}
b_names = {e.decode() for e in b_entries}
self.assertIn('c.txt', b_names)
self.assertIn('d', b_names)
b_entries_list = [e.decode() for e in b_entries[1:]]
bc_idx = b_entries_list.index('c.txt') + 1
bd_idx = b_entries_list.index('d') + 1
b_entries_list = [e.decode() for e in b_entries]
bc_idx = b_entries_list.index('c.txt')
bd_idx = b_entries_list.index('d')
# Read b/c.txt (binary integrity)
parse_bytes(screen, client_dir_read(b_handle_id, bc_idx))
@@ -840,11 +837,11 @@ class TestDnDProtocol(BaseTest):
)
bd_handle_id = int(bd_d_events[0]['meta']['x'])
bd_entries = [e for e in bd_listing_payload.split(b'\x00') if e]
bd_names = {e.decode() for e in bd_entries[1:]}
bd_names = {e.decode() for e in bd_entries}
self.assertIn('e.txt', bd_names)
bd_entries_list = [e.decode() for e in bd_entries[1:]]
bde_idx = bd_entries_list.index('e.txt') + 1
bd_entries_list = [e.decode() for e in bd_entries]
bde_idx = bd_entries_list.index('e.txt')
# Read b/d/e.txt
parse_bytes(screen, client_dir_read(bd_handle_id, bde_idx))
@@ -882,7 +879,7 @@ class TestDnDProtocol(BaseTest):
self._assert_no_output(cap, wid)
# Now try to read from the closed handle → EINVAL
parse_bytes(screen, client_dir_read(hid, 1))
parse_bytes(screen, client_dir_read(hid, 0))
events = self._get_events(cap, wid)
self.assertEqual(len(events), 1)
self.ae(events[0]['type'], 'R')
@@ -910,13 +907,12 @@ class TestDnDProtocol(BaseTest):
self.ae(events[0]['type'], 'R')
self.ae(events[0]['payload'].strip(), b'ENOENT')
def test_dir_unique_identifier_prevents_loops(self) -> None:
"""Each directory listing starts with a unique id (dev:inode format)."""
def test_dir_no_unique_identifier(self) -> None:
"""Directory listings should not contain a unique identifier prefix."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
sub = os.path.join(root, 'sub')
os.mkdir(sub)
open(os.path.join(root, 'hello.txt'), 'w').close()
uri_list = f'file://{root}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
@@ -924,28 +920,303 @@ class TestDnDProtocol(BaseTest):
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_ev = [e for e in events if e['type'] == 'd']
root_payload = b''.join(
payload = b''.join(
chunk for e in d_ev for chunk in e['chunks'] if chunk
)
root_handle_id = int(d_ev[0]['meta']['x'])
root_uid = root_payload.split(b'\x00')[0].decode()
# uid must be non-empty and contain a colon (dev:inode)
self.assertIn(':', root_uid, f'uid={root_uid!r}')
entries = [e.decode() for e in payload.split(b'\x00') if e]
# All entries should be actual file/dir names, no dev:inode prefix
self.assertEqual(entries, ['hello.txt'])
# Get the sub directory listing to compare identifiers
entries = [e.decode() for e in root_payload.split(b'\x00')[1:] if e]
sub_idx = entries.index('sub') + 1
parse_bytes(screen, client_dir_read(root_handle_id, sub_idx))
def test_dir_symlink_to_file(self) -> None:
"""Symlinks to files inside directories are reported with t=r:X=1 and the symlink target."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
real_file = os.path.join(root, 'real.txt')
with open(real_file, 'w') as f:
f.write('real content')
os.symlink('real.txt', os.path.join(root, 'link.txt'))
uri_list = f'file://{root}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_ev2 = [e for e in events if e['type'] == 'd']
sub_payload = b''.join(
chunk for e in d_ev2 for chunk in e['chunks'] if chunk
d_ev = [e for e in events if e['type'] == 'd']
payload = b''.join(
chunk for e in d_ev for chunk in e['chunks'] if chunk
)
sub_uid = sub_payload.split(b'\x00')[0].decode() if sub_payload else ''
self.assertIn(':', sub_uid, f'sub uid={sub_uid!r}')
# Root and sub must have different identifiers
self.assertNotEqual(root_uid, sub_uid)
hid = int(d_ev[0]['meta']['x'])
entries = [e.decode() for e in payload.split(b'\x00') if e]
self.assertIn('link.txt', entries)
self.assertIn('real.txt', entries)
link_idx = entries.index('link.txt')
# Read the symlink entry → should get t=r with X=1 and target path
parse_bytes(screen, client_dir_read(hid, link_idx))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events, 'expected t=r response for symlink')
# Check X=1 flag indicating symlink
self.assertEqual(r_events[0]['meta'].get('X'), '1',
'symlink response must have X=1')
# Payload should be the symlink target
target = b''.join(e['payload'] for e in r_events if e['payload'])
self.ae(target, b'real.txt')
def test_dir_symlink_to_directory(self) -> None:
"""Symlinks to directories inside directories are reported with t=r:X=1."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
os.mkdir(os.path.join(root, 'subdir'))
os.symlink('subdir', os.path.join(root, 'link_to_dir'))
uri_list = f'file://{root}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_ev = [e for e in events if e['type'] == 'd']
payload = b''.join(
chunk for e in d_ev for chunk in e['chunks'] if chunk
)
hid = int(d_ev[0]['meta']['x'])
entries = [e.decode() for e in payload.split(b'\x00') if e]
self.assertIn('link_to_dir', entries)
link_idx = entries.index('link_to_dir')
# Read the symlink → should get t=r with X=1
parse_bytes(screen, client_dir_read(hid, link_idx))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events, 'expected t=r response for dir symlink')
self.assertEqual(r_events[0]['meta'].get('X'), '1')
target = b''.join(e['payload'] for e in r_events if e['payload'])
self.ae(target, b'subdir')
def test_dir_symlink_absolute_target(self) -> None:
"""Symlinks with absolute targets report the full absolute path."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
real_file = os.path.join(root, 'abs_target.txt')
with open(real_file, 'w') as f:
f.write('content')
os.symlink(real_file, os.path.join(root, 'abs_link.txt'))
uri_list = f'file://{root}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_ev = [e for e in events if e['type'] == 'd']
payload = b''.join(
chunk for e in d_ev for chunk in e['chunks'] if chunk
)
hid = int(d_ev[0]['meta']['x'])
entries = [e.decode() for e in payload.split(b'\x00') if e]
link_idx = entries.index('abs_link.txt')
parse_bytes(screen, client_dir_read(hid, link_idx))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events)
self.assertEqual(r_events[0]['meta'].get('X'), '1')
target = b''.join(e['payload'] for e in r_events if e['payload'])
self.ae(target, real_file.encode())
def test_dir_regular_file_no_symlink_flag(self) -> None:
"""Regular files in directories must NOT have the X=1 flag."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
with open(os.path.join(root, 'regular.txt'), 'w') as f:
f.write('hello')
uri_list = f'file://{root}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_ev = [e for e in events if e['type'] == 'd']
payload = b''.join(
chunk for e in d_ev for chunk in e['chunks'] if chunk
)
hid = int(d_ev[0]['meta']['x'])
entries = [e.decode() for e in payload.split(b'\x00') if e]
reg_idx = entries.index('regular.txt')
parse_bytes(screen, client_dir_read(hid, reg_idx))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events)
# Regular files must not have X=1
self.assertNotEqual(r_events[0]['meta'].get('X'), '1',
'regular file must not have X=1 symlink flag')
data = b''.join(e['payload'] for e in r_events if e['payload'])
self.ae(data, b'hello')
def test_dir_symlink_and_regular_mixed(self) -> None:
"""Directory with both regular files and symlinks handles each correctly."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
with open(os.path.join(root, 'data.bin'), 'wb') as f:
f.write(b'\x00\x01\x02\x03')
os.symlink('data.bin', os.path.join(root, 'alias.bin'))
uri_list = f'file://{root}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_ev = [e for e in events if e['type'] == 'd']
payload = b''.join(
chunk for e in d_ev for chunk in e['chunks'] if chunk
)
hid = int(d_ev[0]['meta']['x'])
entries = [e.decode() for e in payload.split(b'\x00') if e]
# Read regular file
data_idx = entries.index('data.bin')
parse_bytes(screen, client_dir_read(hid, data_idx))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertNotEqual(r_events[0]['meta'].get('X'), '1')
self.ae(b''.join(e['payload'] for e in r_events if e['payload']),
b'\x00\x01\x02\x03')
# Read symlink
alias_idx = entries.index('alias.bin')
parse_bytes(screen, client_dir_read(hid, alias_idx))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertEqual(r_events[0]['meta'].get('X'), '1')
self.ae(b''.join(e['payload'] for e in r_events if e['payload']),
b'data.bin')
def test_dir_nested_symlink_in_subdir(self) -> None:
"""Symlinks inside nested subdirectories are handled correctly."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
sub = os.path.join(root, 'sub')
os.makedirs(sub)
with open(os.path.join(sub, 'target.txt'), 'w') as f:
f.write('nested target')
os.symlink('target.txt', os.path.join(sub, 'nested_link.txt'))
uri_list = f'file://{root}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_ev = [e for e in events if e['type'] == 'd']
payload = b''.join(
chunk for e in d_ev for chunk in e['chunks'] if chunk
)
root_hid = int(d_ev[0]['meta']['x'])
entries = [e.decode() for e in payload.split(b'\x00') if e]
sub_idx = entries.index('sub')
# Open subdirectory
parse_bytes(screen, client_dir_read(root_hid, sub_idx))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_ev = [e for e in events if e['type'] == 'd']
sub_payload = b''.join(
chunk for e in d_ev for chunk in e['chunks'] if chunk
)
sub_hid = int(d_ev[0]['meta']['x'])
sub_entries = [e.decode() for e in sub_payload.split(b'\x00') if e]
self.assertIn('nested_link.txt', sub_entries)
link_idx = sub_entries.index('nested_link.txt')
parse_bytes(screen, client_dir_read(sub_hid, link_idx))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertEqual(r_events[0]['meta'].get('X'), '1')
self.ae(b''.join(e['payload'] for e in r_events if e['payload']),
b'target.txt')
def test_dir_entry_zero_based_index(self) -> None:
"""Directory entry index 0 reads the first entry (0-based)."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
with open(os.path.join(root, 'first.txt'), 'w') as f:
f.write('first file')
uri_list = f'file://{root}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_ev = [e for e in events if e['type'] == 'd']
hid = int(d_ev[0]['meta']['x'])
# Index 0 should read the first entry
parse_bytes(screen, client_dir_read(hid, 0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events, 'entry index 0 should read the first entry')
data = b''.join(e['payload'] for e in r_events if e['payload'])
self.ae(data, b'first file')
def test_top_level_symlink_to_file_resolved(self) -> None:
"""Top-level symlink in URI list resolves to file and sends file data."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
real = os.path.join(root, 'real.txt')
with open(real, 'w') as f:
f.write('resolved content')
link = os.path.join(root, 'link.txt')
os.symlink(real, link)
uri_list = f'file://{link}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
r_events = [e for e in events if e['type'] == 'r']
self.assertTrue(r_events, 'top-level symlink should resolve and send file data')
data = b''.join(e['payload'] for e in r_events if e['payload'])
self.ae(data, b'resolved content')
def test_top_level_symlink_to_dir_resolved(self) -> None:
"""Top-level symlink to directory in URI list resolves and returns directory listing."""
import os
import tempfile
with tempfile.TemporaryDirectory() as root:
sub = os.path.join(root, 'realdir')
os.mkdir(sub)
with open(os.path.join(sub, 'inside.txt'), 'w') as f:
f.write('inside')
link = os.path.join(root, 'linkdir')
os.symlink(sub, link)
uri_list = f'file://{link}\r\n'.encode()
with dnd_test_window() as (osw, wid, screen, cap):
self._setup_uri_drop(screen, wid, cap, uri_list)
parse_bytes(screen, client_request_uri_data(0))
raw = cap.consume(wid)
events = parse_escape_codes_b64(raw)
d_events = [e for e in events if e['type'] == 'd']
self.assertTrue(d_events, 'top-level symlink to dir should return directory listing')
payload = b''.join(
chunk for e in d_events for chunk in e['chunks'] if chunk
)
entries = [e.decode() for e in payload.split(b'\x00') if e]
self.assertIn('inside.txt', entries)
def test_window_close_during_transfer_no_leak(self) -> None:
"""Closing the window while dir handles are open frees all resources (no crash)."""
@@ -1707,8 +1978,8 @@ class TestDnDProtocol(BaseTest):
self.assertTrue(d_events)
handle_id = int(d_events[0]['meta']['x'])
listing = b''.join(chunk for e in d_events for chunk in e['chunks'] if chunk)
entries = [e.decode() for e in listing.split(b'\x00')[1:] if e]
f_idx = entries.index('f.txt') + 1
entries = [e.decode() for e in listing.split(b'\x00') if e]
f_idx = entries.index('f.txt')
# Read file with request_id
parse_bytes(screen, client_dir_read(handle_id, f_idx, request_id=33))