#!/usr/bin/env python # License: GPLv3 Copyright: 2026, Kovid Goyal import errno import fnmatch import itertools import os import random import shutil import stat import tempfile import time import uuid from base64 import standard_b64encode from functools import partial from urllib.parse import unquote, urlparse from kitty.constants import kitten_exe from kitty.fast_data_types import ( DND_CODE, GLFW_DRAG_OPERATION_COPY, GLFW_DRAG_OPERATION_MOVE, dnd_set_test_write_func, dnd_test_cleanup_fake_window, dnd_test_create_fake_window, dnd_test_drag_finish, dnd_test_drag_get_data, dnd_test_drag_notify, dnd_test_fake_drop_data, dnd_test_fake_drop_event, dnd_test_force_drag_dropped, dnd_test_probe_state, dnd_test_start_drag_offer, ) from kitty.utils import as_file_url from . import PTY, BaseTest from .dnd import WriteCapture class Capture(WriteCapture): def __call__(self, window_id: int, data: bytes) -> None: self.pty.write_to_child(data) def create_fs(base, include_toplevel_working_symlink=True): join = partial(os.path.join, base) def w(sz, *path): if sz == 0: sz = random.randint(5713, 9879) with open(join(*path), 'wb') as f: f.write(b'x' * sz) os.makedirs(join('d1', 'sd', 'ssd')) os.mkdir(join('d2')) os.symlink('/' + str(uuid.uuid4()), join('s1')) # non-existent if include_toplevel_working_symlink: os.symlink('d1', join('sd')) os.symlink('/', join('d1', 'sa')) # absolute symlink in sub dir os.symlink('../d1', join('d1', 'sr')) w(4096 * 3 + 113, 'some-image.png') w(0, 'd1', 'f1') w(0, 'd1', 'f2') w(0, 'd1', 'sd', 'f11') w(0, 'd1', 'sd', 'ssd', 'f111') os.symlink('../moose', join('d1', 'sd', 'ssd', 's11')) class TestDnDKitten(BaseTest): def assert_trees_equal(self, a: str, b: str, ignored='.dnd-kitten-drop-*'): a_name = os.path.relpath(a, self.test_dir) b_name = os.path.relpath(b, self.test_dir) entries_a = list(itertools.filterfalse(lambda x: fnmatch.fnmatch(x, ignored), os.listdir(a))) entries_b = list(itertools.filterfalse(lambda x: fnmatch.fnmatch(x, ignored), os.listdir(b))) self.assertEqual(set(entries_a), set(entries_b), f'readdir() different for {a_name} vs {b_name}') for x in entries_a: ca, cb = os.path.join(a, x), os.path.join(b, x) sta, stb = os.lstat(ca), os.lstat(cb) self.assertEqual( stat.S_IFMT(sta.st_mode), stat.S_IFMT(stb.st_mode), f'type mismatch for {a_name}/{x} vs {b_name}/{x}') if stat.S_ISDIR(sta.st_mode): self.assert_trees_equal(ca, cb) elif stat.S_ISLNK(sta.st_mode): self.assertEqual(os.readlink(ca), os.readlink(cb)) elif stat.S_ISREG(sta.st_mode): self.assert_files_have_same_content(ca, cb) def setUp(self): capture = Capture() dnd_set_test_write_func(capture) os_window_id, window_id = dnd_test_create_fake_window() capture.window_id = window_id capture.os_window_id = os_window_id self.capture = capture self.test_dir = os.path.realpath(self.enterContext(tempfile.TemporaryDirectory())) self.kitten_wd = os.path.join(self.test_dir, 'kitten') os.mkdir(self.kitten_wd) self.src_data_dir = os.path.join(self.test_dir, 'src') os.mkdir(self.src_data_dir) self.messages_from_kitten = '' self.set_options({'tab_bar_style': 'hidden'}) def send_dnd_command_to_kitten(self, payload=b'', as_base64=False, flush=False, t='T', **metadata): header = f'\x1b]{DND_CODE};' metadata['t'] = t for k, v in metadata.items(): header = header + f'{k}={v}:' self.pty.write_to_child(header.encode()) if not payload: self.pty.write_to_child(b'\x1b\\', flush=flush) return if isinstance(payload, str): payload = payload.encode() payload = memoryview(standard_b64encode(payload) if as_base64 else payload) for i in range(0, len(payload), 4096): end = i + 4096 is_last = end >= len(payload) chunk = payload[i:min(i+4096, len(payload))] if i == 0: self.pty.write_to_child(f'm={0 if is_last else 1};'.encode()) else: self.pty.write_to_child(f'\x1b]{DND_CODE};m={0 if is_last else 1};'.encode()) self.pty.write_to_child(chunk) self.pty.write_to_child(b'\x1b\\', flush=is_last and flush) def finish_setup(self, remote_client: bool = False, cli_args = ()): cmd = [kitten_exe(), 'dnd'] cmd += list(cli_args) self.pty = self.enterContext(PTY(argv=cmd, cwd=self.kitten_wd, rows=25, columns=80, window_id=self.capture.window_id)) self.capture.pty = self.pty self.pty.callbacks.printbuf = self self.screen = self.pty.screen self.reset_kitten(remote_client, clear_tdir=False) self.assertTrue(self.probe_state('drop_wanted')) self.assertEqual(remote_client, self.probe_state('drop_is_remote_client')) if self.probe_state('drag_can_offer'): self.assertEqual(remote_client, self.probe_state('drag_is_remote_client')) def reset_kitten(self, remote_client: bool, clear_tdir=True): if clear_tdir: self.send_dnd_command_to_kitten('PING') self.wait_for_responses('PONG') shutil.rmtree(self.kitten_wd) os.mkdir(self.kitten_wd) shutil.rmtree(self.src_data_dir) os.mkdir(self.src_data_dir) self.send_dnd_command_to_kitten('SETUP_REMOTE' if remote_client else 'SETUP_LOCAL') self.wait_for_responses('SETUP_DONE') def get_button_geometry(self, are_present: bool = True): self.send_dnd_command_to_kitten('GEOMETRY') self.pty.wait_till(lambda: bool(self.messages_from_kitten)) self.assertTrue(self.messages_from_kitten.startswith('GEOMETRY') and self.messages_from_kitten.endswith('\n'), f'Unexpected messages from kitten: {self.messages_from_kitten!r}') q, self.messages_from_kitten = self.messages_from_kitten.rstrip(), '' parts = tuple(map(int, q.split(':')[1:])) copy, move = parts[:4], parts[4:] if are_present: self.assertGreater(copy[2], 4) self.assertGreater(move[2], 4) else: self.assertEqual(copy, (0,0,0,0)) self.assertEqual(move, (0,0,0,0)) return copy, move def append(self, text): self.messages_from_kitten += text def wait_for_responses(self, *responses, timeout=10): q = '\n'.join(responses) def wait_till(): return q == self.messages_from_kitten.strip() try: self.pty.wait_till(wait_till, timeout, lambda: f'Responses so far: Expected:\n{q!r}\nActual:\n{self.messages_from_kitten.strip()!r}\n') finally: self.messages_from_kitten = '' def wait_for_state(self, q, expected, timeout=10): self.pty.wait_till(lambda: self.probe_state(q) == expected, timeout, lambda: f'{q}: {self.probe_state(q)!r}') def probe_state(self, which: str): return dnd_test_probe_state(self.capture.window_id, which) def roundtrip(self): self.send_dnd_command_to_kitten('PING') self.wait_for_responses('PONG') def exit_kitten(self): self.pty.write_to_child('\x1b[27u') # ] self.pty.wait_till_child_exits(require_exit_code=0) def tearDown(self): dnd_set_test_write_func(None) dnd_test_cleanup_fake_window(self.capture.os_window_id) self.capture = None self.screen = None self.pty = None def test_dnd_kitten_drop(self): img_drop_path = 'images/image.png' self.finish_setup(cli_args=(f'--drop=image/png:{img_drop_path}', '--confirm-drop-overwrite')) with self.subTest(remote=False): self.dnd_kitten_drop(False, img_drop_path) self.reset_kitten(True) with self.subTest(remote=True): self.dnd_kitten_drop(True, img_drop_path) self.exit_kitten() def dnd_kitten_drop(self, remote_client, img_drop_path): copy, move = self.get_button_geometry() all_mimes = 'text/uri-list a/b c/d' for b, expected in ((copy, GLFW_DRAG_OPERATION_COPY), (move, GLFW_DRAG_OPERATION_MOVE)): dnd_test_fake_drop_event(self.capture.window_id, False, all_mimes.split(), b[0] + 1, b[1] + 1) self.wait_for_state('drop_action', expected) self.assertEqual('text/uri-list', self.probe_state('drop_mimes').rstrip('\x00')) self.send_dnd_command_to_kitten('DROP_MIMES') self.wait_for_responses(all_mimes) dnd_test_fake_drop_event(self.capture.window_id, False) self.send_dnd_command_to_kitten('DROP_MIMES') self.wait_for_responses('') large_mimes = ((all_mimes + ' ') * 300).rstrip() self.assertGreater(len(large_mimes), 4096) dnd_test_fake_drop_event(self.capture.window_id, False, large_mimes.split(), copy[0] + 1, copy[1] + 1) self.wait_for_state('drop_action', GLFW_DRAG_OPERATION_COPY) self.send_dnd_command_to_kitten('DROP_MIMES') self.wait_for_responses(large_mimes) del large_mimes dnd_test_fake_drop_event(self.capture.window_id, False) self.send_dnd_command_to_kitten('DROP_MIMES') self.wait_for_responses('') all_mimes += ' image/png' dnd_test_fake_drop_event(self.capture.window_id, False, all_mimes.split(), copy[0] + 1, copy[1] + 1) self.wait_for_state('drop_action', GLFW_DRAG_OPERATION_COPY) self.assertEqual('text/uri-list\x00image/png', self.probe_state('drop_mimes').rstrip('\x00')) dnd_test_fake_drop_event(self.capture.window_id, True, all_mimes.split(), copy[0] + 1, copy[1] + 1) self.send_dnd_command_to_kitten('DROP_MIMES') self.wait_for_responses(all_mimes) self.assertEqual('text/uri-list\x00image/png', self.probe_state('drop_mimes').rstrip('\x00')) self.wait_for_state('drop_data_requests', ((1,0,0), (4,0,0))) self.assertEqual('text/uri-list', self.probe_state('drop_getting_data_for_mime')) create_fs(self.src_data_dir) uri_list, path_list = [], [] for x in sorted(os.listdir(self.src_data_dir)): uri_list.append(as_file_url(self.src_data_dir, x)) uri_list = ['moose:cow', '# file:///frog/march'] + uri_list uri_list.insert(3, 'ignore://me') for x in uri_list: if x.startswith('#'): continue if x.startswith('file://'): path_list.append(unquote(urlparse(x).path)) else: path_list.append('') dnd_test_fake_drop_data(self.capture.window_id, 'text/uri-list', '\r\n'.join(uri_list).encode()) self.assertEqual('image/png', self.probe_state('drop_getting_data_for_mime')) def send_file_in_chunks(f, mime, sz=3072): while True: chunk = f.read(sz) if not chunk: break dnd_test_fake_drop_data(self.capture.window_id, mime, chunk, 0, True) dnd_test_fake_drop_data(self.capture.window_id, mime, b'') self.send_dnd_command_to_kitten('DROP_IS_REMOTE') self.wait_for_responses(str(remote_client)) with open(os.path.join(self.src_data_dir, 'some-image.png'), 'rb') as f: send_file_in_chunks(f, 'image/png', 1117) self.send_dnd_command_to_kitten('DROP_URI_LIST') self.wait_for_responses('|'.join(path_list)) jn = os.path.join self.assert_files_have_same_content(jn(self.src_data_dir, 'some-image.png'), jn(self.kitten_wd, img_drop_path)) shutil.rmtree(os.path.dirname(jn(self.kitten_wd, img_drop_path))) self.wait_for_state('last_drop_action', GLFW_DRAG_OPERATION_COPY) self.wait_for_state('drop_action', 0) self.assert_trees_equal(self.src_data_dir, self.kitten_wd) # ---- edge case: blank lines in text/uri-list are treated as empty entries ---- # RFC 2483 allows blank lines; parse_uri_list should skip them cleanly. with tempfile.NamedTemporaryFile(dir=self.src_data_dir, suffix='.txt', delete=False) as tf: tf.write(b'edge case blank line test') edge_file = tf.name blank_line_uri_list = ('\r\n' + as_file_url(self.src_data_dir, os.path.basename(edge_file)) + '\r\n\r\n').encode() dnd_test_fake_drop_event(self.capture.window_id, False, ['text/uri-list'], copy[0]+1, copy[1]+1) self.wait_for_state('drop_action', GLFW_DRAG_OPERATION_COPY) dnd_test_fake_drop_event(self.capture.window_id, True, ['text/uri-list'], copy[0]+1, copy[1]+1) self.wait_for_state('drop_data_requests', ((1, 0, 0),)) dnd_test_fake_drop_data(self.capture.window_id, 'text/uri-list', blank_line_uri_list) self.wait_for_state('drop_action', 0) # The file should have been copied despite surrounding blank lines in the URI list. self.assertTrue(os.path.exists(jn(self.kitten_wd, os.path.basename(edge_file))), 'File from URI list with surrounding blank lines should be copied to destination') os.unlink(edge_file) # ---- overwrite confirmation: Enter key allows the overwrite ---- # kitten_wd already has 'some-image.png'; dropping the same filename again triggers confirmation. enter_content = b'overwrite-enter-test-content' overwrite_uri = (as_file_url(self.src_data_dir, 'some-image.png') + '\r\n').encode() def do_overwrite_drop(src_content, key_press, action=GLFW_DRAG_OPERATION_COPY): with open(jn(self.src_data_dir, 'some-image.png'), 'wb') as f: f.write(src_content) orig_content = b'content-to-be-overwritten' path = jn(self.kitten_wd, 'some-image.png') os.unlink(path) with open(path, 'wb') as f: f.write(orig_content) expected_dest_content = src_content if action == GLFW_DRAG_OPERATION_COPY else orig_content dnd_test_fake_drop_event(self.capture.window_id, False, ['text/uri-list'], copy[0]+1, copy[1]+1) self.wait_for_state('drop_action', GLFW_DRAG_OPERATION_COPY) dnd_test_fake_drop_event(self.capture.window_id, True, ['text/uri-list'], copy[0]+1, copy[1]+1) self.wait_for_state('drop_data_requests', ((1, 0, 0),)) dnd_test_fake_drop_data(self.capture.window_id, 'text/uri-list', overwrite_uri) self.pty.wait_till(lambda: 'overwrite existing' in self.pty.screen_contents()) self.pty.write_to_child(key_press) self.wait_for_state('last_drop_action', action) self.wait_for_state('drop_action', 0) with open(jn(self.kitten_wd, 'some-image.png'), 'rb') as f: self.assertEqual(expected_dest_content, f.read()) do_overwrite_drop(enter_content, '\x1b[13u') do_overwrite_drop(b'overwrite-esc-test-content', '\x1b[27u', 0) # ]]] # ---- move operation: source items deleted for local client only ---- move_file = jn(self.src_data_dir, 'move_test.txt') move_dir = jn(self.src_data_dir, 'move_test_dir') move_link = jn(self.src_data_dir, 'move_test_link') os.makedirs(move_dir) with open(jn(move_dir, 'inside.txt'), 'wb') as f: f.write(b'nested file in move dir') with open(move_file, 'wb') as f: f.write(b'move test content') os.symlink('nonexistent_target', move_link) move_uri = ( as_file_url(self.src_data_dir, 'move_test.txt') + '\r\n' + as_file_url(self.src_data_dir, 'move_test_dir') + '\r\n' + as_file_url(self.src_data_dir, 'move_test_link') + '\r\n' ).encode() dnd_test_fake_drop_event(self.capture.window_id, False, ['text/uri-list'], move[0]+1, move[1]+1) self.wait_for_state('drop_action', GLFW_DRAG_OPERATION_MOVE) dnd_test_fake_drop_event(self.capture.window_id, True, ['text/uri-list'], move[0]+1, move[1]+1) self.wait_for_state('drop_data_requests', ((1, 0, 0),)) dnd_test_fake_drop_data(self.capture.window_id, 'text/uri-list', move_uri) self.wait_for_state('last_drop_action', GLFW_DRAG_OPERATION_MOVE) self.wait_for_state('drop_action', 0) self.roundtrip() if remote_client: # Remote move: source items must NOT be deleted (the source app handles deletion) self.assertTrue(os.path.exists(move_file), 'remote move: source file must not be deleted') self.assertTrue(os.path.isdir(move_dir), 'remote move: source directory must not be deleted') self.assertTrue(os.path.lexists(move_link), 'remote move: source symlink must not be deleted') else: # Local move: source items must be deleted after successful move self.assertFalse(os.path.exists(move_file), 'local move: source file must be deleted') self.assertFalse(os.path.exists(move_dir), 'local move: source directory must be deleted') self.assertFalse(os.path.lexists(move_link), 'local move: source symlink must be deleted') # ---- data URI: inline data should be written to data-uri-N.ext in the destination ---- data_uri_content_1 = b'data uri plain text content' data_uri_b64 = standard_b64encode(data_uri_content_1).decode() data_uri_content_2 = b'\x89PNG\r\n\x1a\n' + b'\x00' * 8 # fake PNG bytes data_uri_b64_2 = standard_b64encode(data_uri_content_2).decode() data_uri_list = ( f'data:text/plain;base64,{data_uri_b64}\r\n' f'data:image/png;base64,{data_uri_b64_2}\r\n' ).encode() dnd_test_fake_drop_event(self.capture.window_id, False, ['text/uri-list'], copy[0]+1, copy[1]+1) self.wait_for_state('drop_action', GLFW_DRAG_OPERATION_COPY) dnd_test_fake_drop_event(self.capture.window_id, True, ['text/uri-list'], copy[0]+1, copy[1]+1) self.wait_for_state('drop_data_requests', ((1, 0, 0),)) dnd_test_fake_drop_data(self.capture.window_id, 'text/uri-list', data_uri_list) self.wait_for_state('last_drop_action', GLFW_DRAG_OPERATION_COPY) self.wait_for_state('drop_action', 0) data_uri_file_1 = jn(self.kitten_wd, 'data-uri-1.txt') data_uri_file_2 = jn(self.kitten_wd, 'data-uri-2.png') self.assertTrue(os.path.exists(data_uri_file_1), f'data URI file 1 should be created at {data_uri_file_1}') self.assertTrue(os.path.exists(data_uri_file_2), f'data URI file 2 should be created at {data_uri_file_2}') with open(data_uri_file_1, 'rb') as f: self.assertEqual(data_uri_content_1, f.read()) with open(data_uri_file_2, 'rb') as f: self.assertEqual(data_uri_content_2, f.read()) os.unlink(data_uri_file_1) os.unlink(data_uri_file_2) def assert_files_have_same_content(self, a, b): with open(a, 'rb') as fa, open(b, 'rb') as fb: self.assertEqual(fa.read(), fb.read(), f'{a} ({os.path.getsize(a)}) != {b} ({os.path.getsize(b)})') def test_dnd_kitten_drag(self): from .graphics import png_data drag_thumbnail = os.path.join(self.test_dir, 'drag.png') with open(drag_thumbnail, 'wb') as f: f.write(png_data) img_drag_path = 'image.png' def create_files(): with open(os.path.join(self.kitten_wd, img_drag_path), 'wb') as f: self.img_drag_data = os.urandom(10113) f.write(self.img_drag_data) create_fs(self.src_data_dir) create_files() tl = tuple(os.path.join(self.src_data_dir, x) for x in os.listdir(self.src_data_dir)) self.finish_setup(cli_args=(f'--drag-thumbnail={drag_thumbnail}', f'--drag=image/png:{img_drag_path}') + tl) # ))) with self.subTest(remote_client=False): self.dnd_kitten_drag(False, img_drag_path) self.reset_kitten(True) create_files() with self.subTest(remote_client=True): self.dnd_kitten_drag(True, img_drag_path) self.exit_kitten() self.img_drag_data = None def read_drag_data(self, mime, timeout=10): # self.pty.log_data_flow = True ans = b'' end_time = time.monotonic() + timeout while time.monotonic() <= end_time: try: chunk = dnd_test_drag_get_data(self.capture.window_id, mime) if not chunk: return ans ans += chunk except OSError as err: if err.errno == errno.EAGAIN: self.pty.process_input_from_child(timeout=end_time - time.monotonic()) continue chunk = ans = b'' raise chunk = ans = b'' raise TimeoutError(f'timed out waiting for data from drag_get_data for {mime}') def dnd_kitten_drag(self, remote_client, img_drop_path): # self.pty.log_data_flow = True copy, move = self.get_button_geometry() self.wait_for_state('can_offer', True) self.wait_for_state('drag_operations', 0) def wait_for_drag_active(active=True): self.send_dnd_command_to_kitten('DRAG_ACTIVE') self.wait_for_responses('DRAG_ACTIVE' if active else 'DRAG_INACTIVE') if active: self.send_dnd_command_to_kitten('DRAG_OK') self.wait_for_responses('DRAG_OK') def start_drag(x, y, expected): dnd_test_start_drag_offer(self.capture.window_id, x, y) wait_for_drag_active() self.wait_for_state('drag_operations', expected) self.wait_for_state('drag_thumbnail_size', 4) def end_drag(canceled=True): dnd_test_drag_finish(self.capture.window_id, canceled) wait_for_drag_active(False) self.wait_for_state('drag_operations', 0) start_drag(move[0] + 1, move[1] + 1, 2) self.assertEqual(set(self.probe_state('drag_mimes')), {'image/png', 'text/uri-list'}) end_drag() start_drag(copy[0] + 1, copy[1] + 1, 1) end_drag() start_drag(1, 1, 3) dnd_test_drag_notify(self.capture.window_id, 0, 'text/uri-list') dnd_test_drag_notify(self.capture.window_id, 1, '', GLFW_DRAG_OPERATION_MOVE) dnd_test_force_drag_dropped(self.capture.window_id) dnd_test_drag_notify(self.capture.window_id, 2) self.send_dnd_command_to_kitten('DRAG_STATUS') self.wait_for_responses('text/uri-list:2:true') self.assertEqual(self.img_drag_data, self.read_drag_data('image/png')) # if remote_client: # self.pty.log_data_flow = True uri_list = self.read_drag_data('text/uri-list').decode().splitlines() paths = set() for line in uri_list: line = line.strip() if line and not line.startswith('#'): purl = urlparse(line) if purl.scheme == 'file': fpath = purl.path.rstrip('/') or '/' paths.add(fpath) if remote_client: self.assertNotEqual(self.src_data_dir, os.path.dirname(fpath)) else: self.assertEqual(self.src_data_dir, os.path.dirname(fpath)) src_items = set(os.listdir(self.src_data_dir)) self.assertEqual(src_items, {os.path.basename(x) for x in paths}) if remote_client: for actual in paths: expected = os.path.join(self.src_data_dir, os.path.basename(actual)) if os.path.isdir(actual): self.assert_trees_equal(expected, actual) elif os.path.islink(actual): self.assertEqual(os.readlink(expected), os.readlink(actual)) else: self.assert_files_have_same_content(expected, actual) src_items_before = set(os.listdir(self.src_data_dir)) end_drag(False) if remote_client: # After a move drag finishes, all source files from text/uri-list should be deleted for name in src_items_before: item_path = os.path.join(self.src_data_dir, name) self.assertFalse(os.path.lexists(item_path), f'move drag: {name} should have been deleted from source')