From ea112a6592ee84896a6f920a96b01505a8b092fa Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Fri, 26 Jul 2024 11:46:17 +0530 Subject: [PATCH] Start work on adding icon support to desktop notifications --- docs/changelog.rst | 5 +- kitty/notifications.py | 201 ++++++++++++++++++++++++++++------- kitty_tests/notifications.py | 38 +++++-- 3 files changed, 196 insertions(+), 48 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index cb90dde30..064c643cb 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -80,6 +80,8 @@ Detailed list of changes - A new option :opt:`second_transparent_bg` to make a second background color semi-transparent via :opt:`background_opacity`. Useful for things like cursor line highlight in editors (:iss:`7646`) +- Desktop notifications protocol: Add support for closing notifications and querying if the terminal emulator supports the protocol (:iss:`7658`, :iss:`7659`) + - A new protocol to allow terminal applications to change colors in the terminal more robustly than with the legacy XTerm protocol (:ref:`color_control`) - Sessions: A new command ``focus_matching_window`` to shift focus to a specific window, useful when creating complex layouts with splits (:disc:`7635`) @@ -110,9 +112,6 @@ Detailed list of changes - Allow controlling the easing curves used for :opt:`visual_bell_duration` -- Desktop notifications protocol: Add support for querying if the terminal emulator supports the protocol (:iss:`7658`) - -- Desktop notifications protocol: Add support for closing previous notifications (:iss:`7659`) 0.35.2 [2024-06-22] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/kitty/notifications.py b/kitty/notifications.py index 68673bd8c..5f42c29d5 100644 --- a/kitty/notifications.py +++ b/kitty/notifications.py @@ -1,15 +1,17 @@ #!/usr/bin/env python # License: GPLv3 Copyright: 2024, Kovid Goyal +import os import re from collections import OrderedDict from contextlib import suppress from enum import Enum from itertools import count -from typing import Any, Callable, Dict, FrozenSet, List, NamedTuple, Optional, Tuple, Union +from typing import Any, Callable, Dict, FrozenSet, Iterator, List, NamedTuple, Optional, Tuple, Union +from weakref import ReferenceType, ref -from .constants import is_macos, logo_png_file -from .fast_data_types import ESC_OSC, current_focused_os_window_id, get_boss +from .constants import cache_dir, is_macos, logo_png_file +from .fast_data_types import ESC_OSC, base64_decode, current_focused_os_window_id, get_boss from .types import run_once from .typing import WindowType from .utils import get_custom_window_icon, log_error, sanitize_control_codes @@ -17,6 +19,79 @@ from .utils import get_custom_window_icon, log_error, sanitize_control_codes debug_desktop_integration = False +class IconDataCache: + + + def __init__(self, base_cache_dir: str = '', max_cache_size: int = 128 * 1024 * 1024): + self.max_cache_size = max_cache_size + self.key_map: 'OrderedDict[str, str]' = OrderedDict() + self.base_cache_dir = base_cache_dir + self.cache_dir = '' + self.total_size = 0 + + def _ensure_state(self) -> str: + if not self.cache_dir: + self.cache_dir = os.path.join(self.base_cache_dir or cache_dir(), 'notifications-icons', str(os.getpid())) + os.makedirs(self.cache_dir, exist_ok=True, mode=0o700) + return self.cache_dir + + def __del__(self) -> None: + if self.cache_dir: + import shutil + with suppress(FileNotFoundError): + shutil.rmtree(self.cache_dir) + self.cache_dir = '' + + def keys(self) -> Iterator[str]: + yield from self.key_map.keys() + + def add_icon(self, key: str, data: bytes) -> str: + from kittens.transfer.rsync import Hasher + self._ensure_state() + data_hash = Hasher(which='xxh3-128', data=data).hexdigest() + path = os.path.join(self.cache_dir, data_hash) + if not os.path.exists(path): + with open(path, 'wb') as f: + f.write(data) + self.total_size += len(data) + self.key_map.pop(key, None) # mark this key as being used recently + self.key_map[key] = data_hash + self.prune() + return path + + def get_icon(self, key: str) -> str: + self._ensure_state() + data_hash = self.key_map.pop(key, None) + if data_hash: + self.key_map[key] = data_hash # mark this key as being used recently + return os.path.join(self.cache_dir, data_hash) + return '' + + def clear(self) -> None: + while self.key_map: + key, data_hash = self.key_map.popitem(False) + self._remove_data_hash(data_hash) + + def prune(self) -> None: + self._ensure_state() + while self.total_size > self.max_cache_size and self.key_map: + key, data_hash = self.key_map.popitem(False) + self._remove_data_hash(data_hash) + + def _remove_data_hash(self, data_hash: str) -> None: + path = os.path.join(self.cache_dir, data_hash) + with suppress(FileNotFoundError): + sz = os.path.getsize(path) + os.remove(path) + self.total_size -= sz + + def remove_icon(self, key: str) -> None: + self._ensure_state() + data_hash = self.key_map.pop(key, None) + if data_hash: + self._remove_data_hash(data_hash) + + class Urgency(Enum): Low: int = 0 Normal: int = 1 @@ -29,6 +104,7 @@ class PayloadType(Enum): body = 'body' query = '?' close = 'close' + icon = 'icon' @property def is_text(self) -> bool: @@ -49,11 +125,19 @@ class Action(Enum): class DataStore: - def __init__(self) -> None: + def __init__(self, max_size: int = 4 * 1024 * 1024) -> None: self.buf: List[bytes] = [] + self.current_size = 0 + self.max_size = max_size + self.truncated = 0 def __call__(self, data: bytes) -> None: - self.buf.append(data) + if data: + if self.current_size > self.max_size: + self.truncated += len(data) + else: + self.current_size += len(data) + self.buf.append(data) def finalise(self) -> bytes: return b''.join(self.buf) @@ -65,6 +149,10 @@ class EncodedDataStore: self.current_leftover_bytes = memoryview(b'') self.data_store = data_store + @property + def truncated(self) -> int: + return self.data_store.truncated + def add_unencoded_data(self, data: Union[str, bytes]) -> None: if isinstance(data, str): data = data.encode('utf-8') @@ -101,9 +189,7 @@ class EncodedDataStore: write_saving_leftover_bytes(data) def _write_base64_data(self, b: bytes) -> None: - from base64 import standard_b64decode - d = standard_b64decode(b) - self.data_store(d) + self.data_store(base64_decode(b)) def flush_encoded_data(self) -> None: b = self.current_leftover_bytes @@ -135,6 +221,9 @@ class NotificationCommand: only_when: OnlyWhen = OnlyWhen.unset urgency: Optional[Urgency] = None close_response_requested: Optional[bool] = None + icon_data_key: str = '' + icon_path: str = '' + icon_name: str = '' # payload handling current_payload_type: PayloadType = PayloadType.title @@ -147,6 +236,10 @@ class NotificationCommand: # event callbacks on_activation: Optional[Callable[['NotificationCommand'], None]] = None + def __init__(self, icon_data_cache: ReferenceType[IconDataCache], log: 'Log') -> None: + self.icon_data_cache_ref = icon_data_cache + self.log = log + @property def report_requested(self) -> bool: return Action.report in self.actions @@ -198,19 +291,30 @@ class NotificationCommand: self.urgency = Urgency(int(v)) elif k == 'c': self.close_response_requested = v != '0' + elif k == 'g': + self.icon_data_key = sanitize_id(v) + elif k == 'n': + self.icon_name = v if not prev.done and prev.identifier == self.identifier: - self.actions = prev.actions.union(self.actions) - self.title = prev.title - self.body = prev.body - if self.only_when is OnlyWhen.unset: - self.only_when = prev.only_when - if self.urgency is None: - self.urgency = prev.urgency - if self.close_response_requested is None: - self.close_response_requested = prev.close_response_requested - + self.merge_metadata(prev) return payload_type, payload_is_encoded + def merge_metadata(self, prev: 'NotificationCommand') -> None: + self.actions = prev.actions.union(self.actions) + self.title = prev.title + self.body = prev.body + if self.only_when is OnlyWhen.unset: + self.only_when = prev.only_when + if self.urgency is None: + self.urgency = prev.urgency + if self.close_response_requested is None: + self.close_response_requested = prev.close_response_requested + if not self.icon_data_key: + self.icon_data_key = prev.icon_data_key + if not self.icon_name: + self.icon_name = prev.icon_name + self.icon_path = prev.icon_path + def create_payload_buffer(self, payload_type: PayloadType) -> EncodedDataStore: self.current_payload_type = payload_type return EncodedDataStore(DataStore()) @@ -223,7 +327,7 @@ class NotificationCommand: else: if prev_cmd.current_payload_buffer: self.current_payload_type = prev_cmd.current_payload_type - self.commit_data(prev_cmd.current_payload_buffer.finalise()) + self.commit_data(prev_cmd.current_payload_buffer.finalise(), prev_cmd.current_payload_buffer.truncated) if self.current_payload_buffer is None: self.current_payload_buffer = self.create_payload_buffer(payload_type) if payload_is_encoded: @@ -231,20 +335,37 @@ class NotificationCommand: else: self.current_payload_buffer.add_unencoded_data(payload) - def commit_data(self, data: bytes) -> None: + def commit_data(self, data: bytes, truncated: int) -> None: if not data: return if self.current_payload_type.is_text: - text = data.decode('utf-8', 'replace') + if truncated: + text = ' too long, truncated' + else: + text = data.decode('utf-8', 'replace') if self.current_payload_type is PayloadType.title: self.title = limit_size(self.title + text) elif self.current_payload_type is PayloadType.body: self.body = limit_size(self.body + text) + elif self.current_payload_type is PayloadType.icon: + if truncated: + self.log('Ignoring too long notification icon data') + else: + if self.icon_data_key: + icd = self.icon_data_cache_ref() + if icd: + self.icon_path = icd.add_icon(self.icon_data_key, data) + else: + self.log('Ignoring notification icon data because no icon data key specified') def finalise(self) -> None: if self.current_payload_buffer: - self.commit_data(self.current_payload_buffer.finalise()) + self.commit_data(self.current_payload_buffer.finalise(), self.current_payload_buffer.truncated) self.current_payload_buffer = None + if self.icon_data_key and not self.icon_path: + icd = self.icon_data_cache_ref() + if icd: + self.icon_path = icd.get_icon(self.icon_data_key) class DesktopIntegration: @@ -266,7 +387,7 @@ class DesktopIntegration: body: str, timeout: int = -1, application: str = 'kitty', - icon: bool = True, + icon_name: str = '', icon_path: str = '', subtitle: Optional[str] = None, urgency: Urgency = Urgency.Normal, ) -> int: @@ -307,7 +428,7 @@ class MacOSIntegration(DesktopIntegration): body: str, timeout: int = -1, application: str = 'kitty', - icon: bool = True, + icon_name: str = '', icon_path: str = '', subtitle: Optional[str] = None, urgency: Urgency = Urgency.Normal, ) -> int: @@ -388,15 +509,14 @@ class FreeDesktopIntegration(DesktopIntegration): body: str, timeout: int = -1, application: str = 'kitty', - icon: bool = True, + icon_name: str = '', icon_path: str = '', subtitle: Optional[str] = None, urgency: Urgency = Urgency.Normal, ) -> int: - icf = '' - if icon is True: - icf = get_custom_window_icon()[1] or logo_png_file + if not icon_name and not icon_path: + icon_path = get_custom_window_icon()[1] or logo_png_file from .fast_data_types import dbus_send_notification - desktop_notification_id = dbus_send_notification(application, icf, title, body, 'Click to see changes', timeout, urgency.value) + desktop_notification_id = dbus_send_notification(application, icon_path, title, body, 'Click to see changes', timeout, urgency.value) if debug_desktop_integration: log_error(f'Created notification with {desktop_notification_id=}') return desktop_notification_id @@ -447,7 +567,7 @@ def sanitize_identifier_pat() -> 're.Pattern[str]': def sanitize_id(v: str) -> str: - return sanitize_identifier_pat().sub('', v) + return sanitize_identifier_pat().sub('', v)[:512] class Log: @@ -463,6 +583,7 @@ class NotificationManager: channel: Channel = Channel(), log: Log = Log(), debug: bool = False, + base_cache_dir: str = '' ): global debug_desktop_integration debug_desktop_integration = debug @@ -471,10 +592,13 @@ class NotificationManager: else: self.desktop_integration = desktop_integration self.channel = channel + self.base_cache_dir = base_cache_dir self.log = log + self.icon_data_cache = IconDataCache(base_cache_dir=self.base_cache_dir) self.reset() def reset(self) -> None: + self.icon_data_cache.clear() self.in_progress_notification_commands: 'OrderedDict[int, NotificationCommand]' = OrderedDict() self.in_progress_notification_commands_by_client_id: Dict[str, NotificationCommand] = {} self.pending_commands: Dict[int, NotificationCommand] = {} @@ -512,7 +636,7 @@ class NotificationManager: boss = get_boss() if w := boss.active_window: from time import monotonic - cmd = NotificationCommand() + cmd = NotificationCommand(ref(self.icon_data_cache), self.log) now = monotonic() cmd.title = f'Test {now}' cmd.body = f'At: {now}' @@ -520,7 +644,7 @@ class NotificationManager: self.notify_with_command(cmd, w.id) def send_new_version_notification(self, version: str) -> None: - cmd = NotificationCommand() + cmd = NotificationCommand(ref(self.icon_data_cache), self.log) cmd.title = 'kitty update available!' cmd.body = f'kitty version {version} released' cmd.on_activation = self.desktop_integration.on_new_version_notification_activation @@ -543,7 +667,10 @@ class NotificationManager: if not title or not self.is_notification_allowed(cmd, channel_id): return None urgency = Urgency.Normal if cmd.urgency is None else cmd.urgency - desktop_notification_id = self.desktop_integration.notify(title=sanitize_text(title), body=sanitize_text(body), urgency=urgency) + desktop_notification_id = self.desktop_integration.notify( + title=sanitize_text(title), body=sanitize_text(body), urgency=urgency, + icon_name=cmd.icon_name, icon_path=cmd.icon_path, + ) self.register_in_progress_notification(cmd, desktop_notification_id) return desktop_notification_id @@ -560,7 +687,7 @@ class NotificationManager: self, prev_cmd: NotificationCommand, channel_id: int, raw: str ) -> Optional[NotificationCommand]: metadata, payload = raw.partition(';')[::2] - cmd = NotificationCommand() + cmd = NotificationCommand(ref(self.icon_data_cache), self.log) try: payload_type, payload_is_encoded = cmd.parse_metadata(metadata, prev_cmd) except Exception: @@ -595,7 +722,7 @@ class NotificationManager: def handle_notification_cmd(self, channel_id: int, osc_code: int, raw: str) -> None: if osc_code == 99: - cmd = self.pending_commands.pop(channel_id, None) or NotificationCommand() + cmd = self.pending_commands.pop(channel_id, None) or NotificationCommand(ref(self.icon_data_cache), self.log) q = self.parse_notification_cmd(cmd, channel_id, raw) if q is not None: if q.done: @@ -603,11 +730,11 @@ class NotificationManager: else: self.pending_commands[channel_id] = q elif osc_code == 9: - n = NotificationCommand() + n = NotificationCommand(ref(self.icon_data_cache), self.log) n.title = raw self.notify_with_command(n, channel_id) elif osc_code == 777: - n = NotificationCommand() + n = NotificationCommand(ref(self.icon_data_cache), self.log) parts = raw.split(';', 1) n.title, n.body = parts[0], (parts[1] if len(parts) > 1 else '') self.notify_with_command(n, channel_id) diff --git a/kitty_tests/notifications.py b/kitty_tests/notifications.py index 3c3b35542..6ed9c9b9e 100644 --- a/kitty_tests/notifications.py +++ b/kitty_tests/notifications.py @@ -2,17 +2,19 @@ # License: GPLv3 Copyright: 2024, Kovid Goyal +import os import re +import tempfile from base64 import standard_b64encode from typing import Optional -from kitty.notifications import Channel, DesktopIntegration, NotificationManager, UIState, Urgency +from kitty.notifications import Channel, DesktopIntegration, IconDataCache, NotificationManager, UIState, Urgency from . import BaseTest -def n(title='title', body='', urgency=Urgency.Normal, desktop_notification_id=1): - return {'title': title, 'body': body, 'urgency': urgency, 'id': desktop_notification_id} +def n(title='title', body='', urgency=Urgency.Normal, desktop_notification_id=1, icon_name='', icon_path=''): + return {'title': title, 'body': body, 'urgency': urgency, 'id': desktop_notification_id, 'icon_name': icon_name, 'icon_path': icon_path} class DesktopIntegration(DesktopIntegration): @@ -40,12 +42,12 @@ class DesktopIntegration(DesktopIntegration): body: str, timeout: int = -1, application: str = 'kitty', - icon: bool = True, + icon_name: str = '', icon_path: str = '', subtitle: Optional[str] = None, urgency: Urgency = Urgency.Normal, ) -> int: self.counter += 1 - self.notifications.append(n(title, body, urgency, self.counter)) + self.notifications.append(n(title, body, urgency, self.counter, icon_name, icon_path)) return self.counter @@ -71,7 +73,7 @@ class Channel(Channel): self.responses.append(osc_escape_code) -def do_test(self: 'TestNotifications') -> None: +def do_test(self: 'TestNotifications', tdir: str) -> None: di = DesktopIntegration(None) ch = Channel() nm = NotificationManager(di, ch, lambda *a, **kw: None) @@ -204,16 +206,36 @@ def do_test(self: 'TestNotifications') -> None: # Test querying h('i=xyz:p=?') self.assertFalse(di.notifications) - qr = 'a=focus,report:o=always,unfocused,invisible:u=0,1,2:p=title,body,?,close:c=1' + qr = 'a=focus,report:o=always,unfocused,invisible:u=0,1,2:p=title,body,?,close,icon:c=1' self.ae(ch.responses, [f'99;i=xyz:p=?;{qr}']) reset() h('p=?') self.assertFalse(di.notifications) self.ae(ch.responses, [f'99;i=0:p=?;{qr}']) + # Test MIME streaming + text = 'some reasonably long text to test MIME streaming with' + encoded = standard_b64encode(text.encode()).decode() + for ch in encoded: + h(f'i=s:e=1:d=0;{ch}') + h(f'i=s:e=1:d=0:p=body;{encoded[:13]}') + h(f'i=s:e=1:d=0:p=body;{encoded[13:]}') + h('i=s') + self.ae(di.notifications, [n(text, text)]) + + # Test Disk Cache + dc = IconDataCache(base_cache_dir=tdir, max_cache_size=4) + cache_dir = dc._ensure_state() + for i in range(5): + dc.add_icon(str(i), str(i).encode()) + self.ae(set(dc.keys()), set(map(str, range(1, 5)))) + del dc + self.assertFalse(os.path.exists(cache_dir)) + class TestNotifications(BaseTest): def test_desktop_notify(self): - do_test(self) + with tempfile.TemporaryDirectory() as tdir: + do_test(self, tdir)