diff --git a/kitty/data-types.c b/kitty/data-types.c index 625cdf305..9cb2e31d2 100644 --- a/kitty/data-types.c +++ b/kitty/data-types.c @@ -25,7 +25,6 @@ #include "modes.h" #include #include -#include #include #include #include @@ -367,6 +366,7 @@ static struct PyModuleDef module = { extern int init_LineBuf(PyObject *); extern int init_HistoryBuf(PyObject *); extern int init_Cursor(PyObject *); +extern int init_Shlex(PyObject *); extern int init_DiskCache(PyObject *); extern bool init_child_monitor(PyObject *); extern int init_Line(PyObject *); @@ -430,6 +430,7 @@ PyInit_fast_data_types(void) { if (!init_HistoryBuf(m)) return NULL; if (!init_Line(m)) return NULL; if (!init_Cursor(m)) return NULL; + if (!init_Shlex(m)) return NULL; if (!init_DiskCache(m)) return NULL; if (!init_child_monitor(m)) return NULL; if (!init_ColorProfile(m)) return NULL; diff --git a/kitty/fast_data_types.pyi b/kitty/fast_data_types.pyi index 06a8e2ff2..4b6c25f2b 100644 --- a/kitty/fast_data_types.pyi +++ b/kitty/fast_data_types.pyi @@ -1513,6 +1513,11 @@ class AES256GCMDecrypt: def add_data_to_be_decrypted(self, data: bytes, finished: bool = False) -> bytes: ... +class Shlex: + def __init__(self, src: str): ... + def next_word(self) -> Tuple[int, str]: ... + + class SingleKey: __slots__ = () diff --git a/kitty/shlex.c b/kitty/shlex.c new file mode 100644 index 000000000..c4facff65 --- /dev/null +++ b/kitty/shlex.c @@ -0,0 +1,165 @@ +/* + * shlex.c + * Copyright (C) 2023 Kovid Goyal + * + * Distributed under terms of the GPL3 license. + */ + +#include "data-types.h" + +typedef enum { NORMAL, WORD, STRING_WITHOUT_ESCAPES, STRING_WITH_ESCAPES, } State; +typedef struct { + PyObject_HEAD + + PyObject *src, *buf; + Py_ssize_t src_sz, src_pos, word_start, buf_pos; + int kind; void *src_data, *buf_data; + State state; +} Shlex; + + +static PyObject * +new(PyTypeObject *type, PyObject *args, PyObject UNUSED *kwds) { + Shlex *self; + self = (Shlex *)type->tp_alloc(type, 0); + if (self) { + PyObject *src; + if (!PyArg_ParseTuple(args, "U", &src)) return NULL; + self->src_sz = PyUnicode_GET_LENGTH(src); + self->buf = PyUnicode_New(self->src_sz, PyUnicode_MAX_CHAR_VALUE(src)); + if (self->buf) { + self->src = src; + Py_INCREF(src); + self->kind = PyUnicode_KIND(src); + self->src_data = PyUnicode_DATA(src); + self->buf_data = PyUnicode_DATA(self->buf); + } else Py_CLEAR(self); + } + return (PyObject*) self; +} + +static void +dealloc(Shlex* self) { + Py_CLEAR(self->src); Py_CLEAR(self->buf); + Py_TYPE(self)->tp_free((PyObject*)self); +} + +#define WHITESPACE ' ': case '\n': case '\t': case '\r' +#define STRING_WITH_ESCAPES_DELIM '"' +#define STRING_WITHOUT_ESCAPES_DELIM '\'' +#define ESCAPE_CHAR '\\' + +static void +start_word(Shlex *self) { + self->word_start = self->src_pos - 1; + self->buf_pos = 0; +} + +static void +write_ch(Shlex *self, Py_UCS4 ch) { + PyUnicode_WRITE(self->kind, self->buf_data, self->buf_pos, ch); self->buf_pos++; +} + +static PyObject* +get_word(Shlex *self) { + Py_ssize_t pos = self->buf_pos; self->buf_pos = 0; + return Py_BuildValue("nN", self->word_start, PyUnicode_Substring(self->buf, 0, pos)); +} + +static bool +write_escape_ch(Shlex *self) { + if (self->src_pos < self->src_sz) { + Py_UCS4 nch = PyUnicode_READ(self->kind, self->src_data, self->src_pos); self->src_pos++; + write_ch(self, nch); + return true; + } + return false; +} + +static void +set_state(Shlex *self, State s) { + self->state = s; +} + +static PyObject* +next_word(Shlex *self, PyObject *args UNUSED) { +#define write_escaped_or_fail() if (!write_escape_ch(self)) { PyErr_SetString(PyExc_ValueError, "Trailing backslash at end of input data"); return NULL; } + + while (self->src_pos < self->src_sz) { + Py_UCS4 ch = PyUnicode_READ(self->kind, self->src_data, self->src_pos); self->src_pos++; + switch(self->state) { + case NORMAL: + switch(ch) { + case WHITESPACE: break; + case STRING_WITH_ESCAPES_DELIM: set_state(self, STRING_WITH_ESCAPES); start_word(self); break; + case STRING_WITHOUT_ESCAPES_DELIM: set_state(self, STRING_WITHOUT_ESCAPES); start_word(self); break; + case ESCAPE_CHAR: start_word(self); write_escaped_or_fail(); set_state(self, WORD); break; + default: set_state(self, WORD); start_word(self); write_ch(self, ch); break; + } + break; + case WORD: + switch(ch) { + case WHITESPACE: set_state(self, NORMAL); if (self->buf_pos) return get_word(self); break; + case STRING_WITH_ESCAPES_DELIM: set_state(self, STRING_WITH_ESCAPES); break; + case STRING_WITHOUT_ESCAPES_DELIM: set_state(self, STRING_WITHOUT_ESCAPES); break; + case ESCAPE_CHAR: write_escaped_or_fail(); break; + default: write_ch(self, ch); break; + } break; + case STRING_WITHOUT_ESCAPES: + switch(ch) { + case STRING_WITHOUT_ESCAPES_DELIM: + set_state(self, WORD); + if (self->buf_pos && self->state == NORMAL) return get_word(self); + break; + default: write_ch(self, ch); break; + } break; + case STRING_WITH_ESCAPES: + switch(ch) { + case STRING_WITH_ESCAPES_DELIM: + set_state(self, WORD); + if (self->buf_pos && self->state == NORMAL) return get_word(self); + break; + case ESCAPE_CHAR: + if (self->src_pos < self->src_sz) { + Py_UCS4 nch = PyUnicode_READ(self->kind, self->src_data, self->src_pos); self->src_pos++; + write_ch(self, nch); + } + break; + default: write_ch(self, ch); break; + } break; + } + } + switch (self->state) { + case WORD: + self->state = NORMAL; + if (self->buf_pos) return get_word(self); + break; + case STRING_WITH_ESCAPES: case STRING_WITHOUT_ESCAPES: + PyErr_SetString(PyExc_ValueError, "Unterminated string at the end of input"); + self->state = NORMAL; + return NULL; + case NORMAL: + break; + } + return Py_BuildValue("is", -1, ""); +#undef write_escaped_or_fail +} + + +static PyMethodDef methods[] = { + METHODB(next_word, METH_NOARGS), + {NULL} /* Sentinel */ +}; + +PyTypeObject Shlex_Type = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "fast_data_types.Shlex", + .tp_basicsize = sizeof(Shlex), + .tp_dealloc = (destructor)dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "Lexing like a shell", + .tp_methods = methods, + .tp_new = new, +}; + +INIT_TYPE(Shlex) diff --git a/kitty/utils.py b/kitty/utils.py index c3941c86c..ae88599bc 100644 --- a/kitty/utils.py +++ b/kitty/utils.py @@ -43,7 +43,7 @@ from .constants import ( shell_path, ssh_control_master_template, ) -from .fast_data_types import WINDOW_FULLSCREEN, WINDOW_MAXIMIZED, WINDOW_MINIMIZED, WINDOW_NORMAL, Color, get_options, open_tty +from .fast_data_types import WINDOW_FULLSCREEN, WINDOW_MAXIMIZED, WINDOW_MINIMIZED, WINDOW_NORMAL, Color, Shlex, get_options, open_tty from .rgb import to_color from .types import run_once from .typing import AddressFamily, PopenType, Socket, StartupCtx @@ -1226,3 +1226,15 @@ def key_val_matcher(items: Iterable[Tuple[str, str]], key_pat: 're.Pattern[str]' val_pat is None or val_pat.search(val) is not None): return True return False + + +def shlex_split(text: str) -> Iterator[str]: + s = Shlex(text) + while (q := s.next_word())[0] > -1: + yield q[1] + + +def shlex_split_with_positions(text: str) -> Iterator[Tuple[int, str]]: + s = Shlex(text) + while (q := s.next_word())[0] > -1: + yield q diff --git a/kitty_tests/datatypes.py b/kitty_tests/datatypes.py index 181d6dab8..fd4aae165 100644 --- a/kitty_tests/datatypes.py +++ b/kitty_tests/datatypes.py @@ -20,7 +20,7 @@ from kitty.fast_data_types import ( ) from kitty.fast_data_types import Cursor as C from kitty.rgb import to_color -from kitty.utils import is_ok_to_read_image_file, is_path_in_temp_dir, sanitize_title, sanitize_url_for_dispay_to_user +from kitty.utils import is_ok_to_read_image_file, is_path_in_temp_dir, sanitize_title, sanitize_url_for_dispay_to_user, shlex_split_with_positions from . import BaseTest, filled_cursor, filled_history_buf, filled_line_buf @@ -612,3 +612,20 @@ class TestDataTypes(BaseTest): }.items(): actual = expand_ansi_c_escapes(src) self.ae(expected, actual) + + def test_shlex_split(self): + for bad in ( + 'abc\\', '\\', "'abc", "'", '"', 'asd' + '\\', + ): + with self.assertRaises(ValueError, msg=f'Failed to raise exception for {bad!r}'): + tuple(shlex_split_with_positions(bad)) + + for q, expected in { + '"ab"': ((0, 'ab'),), + r'x "ab"y \m': ((0, 'x'), (2, 'aby'), (8, 'm')), + r'''x'y"\z'1''': ((0, 'xy"\\z1'),), + r'\abc\ d': ((0, 'abc d'),), + '': (), ' ': (), ' \tabc\n\t\r ': ((2, 'abc'),), + }.items(): + actual = tuple(shlex_split_with_positions(q)) + self.ae(expected, actual, f'Failed for text: {q!r}')