Compare commits

..

34 Commits

Author SHA1 Message Date
Kovid Goyal
8b54d19326 ... 2024-07-29 21:27:42 +05:30
Kovid Goyal
b52275e0b5 Simplify API of streaming base64 decoder further 2024-07-29 21:24:45 +05:30
Kovid Goyal
212d7accfc Allow multiple types per notification 2024-07-29 20:52:54 +05:30
Kovid Goyal
4bc532a2d0 Clarify base64 and utf-8 encoding requirements 2024-07-29 20:42:55 +05:30
Kovid Goyal
9047df5080 Use a more correct method name 2024-07-29 20:17:52 +05:30
Kovid Goyal
4ba9fcaf37 Simplify streaming base64 decoder by using the streaming API of libbase64 2024-07-29 20:08:28 +05:30
Kovid Goyal
eb1bb493a7 Ensure icon cache is cleared at exit 2024-07-29 15:24:18 +05:30
Kovid Goyal
7023e1506b get icon by path loading working in the notify kitten 2024-07-29 15:10:11 +05:30
Kovid Goyal
8cfe1d7a16 DRYer 2024-07-29 14:53:41 +05:30
Kovid Goyal
10d62a9596 ... 2024-07-29 14:44:30 +05:30
Kovid Goyal
8b8e752ece Only read each index.theme once 2024-07-29 14:43:02 +05:30
Kovid Goyal
ad48ecad40 Implement handling of icon names on Linux 2024-07-29 14:39:20 +05:30
Kovid Goyal
6e35289f8e Allow sending icon data without an identifier 2024-07-29 10:41:05 +05:30
Kovid Goyal
a38153c890 Make table alphabetically sorted 2024-07-29 10:12:56 +05:30
Kovid Goyal
896833a4f7 notify kitten: Wait for close now implemented 2024-07-29 10:08:37 +05:30
Kovid Goyal
9bd155ae50 More work on notify kitten 2024-07-29 08:21:45 +05:30
Kovid Goyal
f6a24af229 Merge branch 'docs-env' of https://github.com/gileri/kitty 2024-07-29 07:41:09 +05:30
Kovid Goyal
2e829c1349 Sleep longer before retrying to give VM more time to startup 2024-07-29 07:19:10 +05:30
Kovid Goyal
1b1f5656ee more work on notifications kitten 2024-07-29 07:13:32 +05:30
gileri
3bdbf19bb0 Fix env assignation in ssh kitten docs 2024-07-28 22:35:19 +02:00
Kovid Goyal
00737ebf30 Fix build of hyperlinked grep 2024-07-28 21:13:54 +05:30
Kovid Goyal
a19ce66e62 Automate finding of kittens that need Go CLIs generated 2024-07-28 21:07:28 +05:30
Kovid Goyal
674432f886 Round out the options for the new notify kitten 2024-07-28 20:52:44 +05:30
Kovid Goyal
5c1af0fcb1 Start work on the notify kitten 2024-07-28 20:41:01 +05:30
Kovid Goyal
eca487d15f Add support for auto-expiring notifications 2024-07-28 20:14:39 +05:30
Kovid Goyal
d1a8772ac8 Add tests for alive queries 2024-07-28 17:27:09 +05:30
Kovid Goyal
083f158bbd Implement a polling mode for close events 2024-07-28 15:35:22 +05:30
Kovid Goyal
02bc104091 On macOS only track nots that request close events
Send an untracked event when giving up tracking of the notification
2024-07-28 13:19:47 +05:30
Kovid Goyal
2bffea2bdc Implement updating of notifications 2024-07-28 12:25:35 +05:30
Kovid Goyal
59c175f312 Add test for filtering 2024-07-28 09:15:18 +05:30
Kovid Goyal
67410c317f Add a sample showing on_activation event for notifications 2024-07-28 08:46:31 +05:30
Kovid Goyal
de21e5e488 Implement filtering of desktop notifications
Fixes #7670
2024-07-28 08:41:24 +05:30
Kovid Goyal
c59ab759a1 Allow well behaved applications to categorise notifications so that users can easily filter them out 2024-07-27 21:54:19 +05:30
Kovid Goyal
45a3a655a7 Desktop notifications: Fix limited HTML markup in the body text being rendered as HTML on some Linux systems
Fix #7671
2024-07-27 21:19:46 +05:30
30 changed files with 1325 additions and 436 deletions

View File

@@ -82,6 +82,8 @@ Detailed list of changes
- Desktop notifications protocol: Add support for closing notifications and querying if the terminal emulator supports the protocol (:iss:`7658`, :iss:`7659`)
- A new option :opt:`filter_notification` to filter out or perform arbitrary actions on desktop notifications based on sophisticated criteria (:iss:`7670`)
- A new protocol to allow terminal applications to change colors in the terminal more robustly than with the legacy XTerm protocol (:ref:`color_control`)
- Sessions: A new command ``focus_matching_window`` to shift focus to a specific window, useful when creating complex layouts with splits (:disc:`7635`)

View File

@@ -33,7 +33,7 @@ notification from a shell script::
To show a message with a title and a body::
printf '\x1b]99;i=1:d=0;Hello world\x1b\\'
printf '\x1b]99;i=1:d=1:p=body;This is cool\x1b\\'
printf '\x1b]99;i=1:p=body;This is cool\x1b\\'
The most important key in the metadata is the ``p`` key, it controls how the
payload is interpreted. A value of ``title`` means the payload is setting the
@@ -46,17 +46,46 @@ code can be. Chunking is accomplished by the ``i`` and ``d`` keys. The ``i``
key is the *notification id* which can be any string containing the characters
``[a-zA-Z0-9_-+.]``. The ``d`` key stands for *done* and can only take the
values ``0`` and ``1``. A value of ``0`` means the notification is not yet done
and the terminal emulator should hold off displaying it. A value of ``1`` means
and the terminal emulator should hold off displaying it. A non-zero value means
the notification is done, and should be displayed. You can specify the title or
body multiple times and the terminal emulator will concatenate them, thereby
allowing arbitrarily long text (terminal emulators are free to impose a sensible
limit to avoid Denial-of-Service attacks). The size of the payload must be no
longer than ``2048`` bytes, *before being encoded*.
longer than ``2048`` bytes, *before being encoded* or ``4096`` encoded bytes.
Both the ``title`` and ``body`` payloads must be either :ref:`safe_utf8` text ,
or UTF-8 text that is :ref:`base64` encoded, in which case there must be an
``e=1`` key in the metadata to indicate the payload is :ref:`base64`
encoded. No HTML or other markup in the plain text is allowed. It is strictly
plain text, to be interpreted as such.
Allowing users to filter notifications
-------------------------------------------------------
Well behaved applications should identify themselves to the terminal
by means of two keys ``f`` which is the application name and ``t``
which is the notification type. These are free form keys, they can contain
any values, their purpose is to allow users to easily filter out
notifications they do not want. Both keys must have :ref:`base64`
encoded UTF-8 text as their values. The ``t`` key can be specified multiple
times, as notifications can have more than one type. See the `freedesktop.org
spec
<https://specifications.freedesktop.org/notification-spec/notification-spec-latest.html#categories>`__
for examples of notification types.
.. note::
The application name should generally be set to the filename of the
applications `desktop file
<https://specifications.freedesktop.org/desktop-entry-spec/desktop-entry-spec-latest.html#file-naming>`__
(without the ``.desktop`` part) or the bundle identifier for a macOS
application. While not strictly necessary, this allows the terminal
emulator to deduce an icon for the notification when one is not specified.
.. note::
|kitty| has sophisticated notification filtering and management
capabilities via :opt:`filter_notification`.
Both the ``title`` and ``body`` payloads must be either UTF-8 encoded plain
text with no embedded escape codes, or UTF-8 text that is :rfc:`base64 <4648>`
encoded, in which case there must be an ``e=1`` key in the metadata to indicate
the payload is :rfc:`base64 <4648>` encoded.
Being informed when user activates the notification
-------------------------------------------------------
@@ -80,21 +109,13 @@ off, so for example if you do not want any action, turn off the default
a=-focus
Complete specification of all the metadata keys is in the table below. If a
terminal emulator encounters a key in the metadata it does not understand,
Complete specification of all the metadata keys is in the :ref:`table below <keys_in_notificatons_protocol>`.
If a terminal emulator encounters a key in the metadata it does not understand,
the key *must* be ignored, to allow for future extensibility of this escape
code. Similarly if values for known keys are unknown, the terminal emulator
*should* either ignore the entire escape code or perform a best guess effort
to display it based on what it does understand.
*should* either ignore the entire escape code or perform a best guess effort to
display it based on what it does understand.
.. note::
It is possible to extend this escape code to allow specifying an icon for
the notification, however, given that some platforms, such as legacy versions
of macOS, don't allow displaying custom images on a notification, it was
decided to leave it out of the spec for the time being.
Similarly, features such as scheduled notifications could be added in future
revisions.
Being informed when a notification is closed
------------------------------------------------
@@ -114,21 +135,42 @@ escape code to inform when the notification is closed::
If no notification id was specified ``i=0`` will be used.
If ``a=report`` is specified and the notification is activated/clicked on
then both the activation report and close notification are sent.
then both the activation report and close notification are sent. If the notification
is updated then the close event is not sent unless the updated notification
also requests a close notification.
.. note:: On macOS the OS does not supply notification
closed events to applications. As such close events must be implemented
via polling. It is up to the terminal emulator to decide a reasonable
time limit for how long to poll, before giving up. kitty polls for 60
seconds. Therefore, terminal applications should not rely on close events
being authoritative.
Note that on some platforms, such as macOS, the OS does not inform applications
when notifications are closed, on such platforms, terminals reply with::
<OSC> 99 ; i=mynotification : p=close ; untracked <terminator>
This means that the terminal has no way of knowing when the notification is
closed. Instead, applications can poll the terminal to determine which
notifications are still alive (not closed), with::
<OSC> 99 ; i=myid : p=alive ; <terminator>
The terminal will reply with::
<OSC> 99 ; i=myid : p=alive ; id1,id2,id3 <terminator>
Here, ``myid`` is present for multiplexer support. The response from the terminal
contains a comma separated list of ids that are still alive.
Closing an existing notification
----------------------------------
Updating or closing an existing notification
----------------------------------------------
.. versionadded:: 0.36.0
The ability to close a previous notification was added in kitty 0.36.0
The ability to update and close a previous notification was added in kitty 0.36.0
To update a previous notification simply send a new notification with the same
*notification id* (``i`` key) as the one you want to update. If the original
notification is still displayed it will be replaced, otherwise a new
notification is displayed. This can be used, for example, to show progress of
an operation. Note that how smoothly the existing notification is replaced
depends on the underlying OS, for example, on Linux the replacement is usually flicker
free, on macOS it isn't, because of Apple's design choices.
To close a previous notification, send::
@@ -138,6 +180,22 @@ This will close a previous notification with the specified id. If no such
notification exists (perhaps because it was already closed or it was activated)
then the request is ignored.
Automatically expiring notifications
-------------------------------------
A notification can be marked as expiring (being closed) automatically after
a specified number of milliseconds using the ``w`` key. The default if
unspecified is ``-1`` which means to use whatever expiry policy the OS has for
notifications. A value of ``0`` means the notification should never expire.
Values greater than zero specify the number of milliseconds after which the
notification should be auto-closed. Note that the value of ``0``
is best effort, some platforms honor it and some do not. Positive values
are robust, since they can be implemented by the terminal emulator itself,
by manually closing the notification after the expiry time. The notification
could still be closed before the expiry time by user interaction or OS policy,
but it is guaranteed to be closed once the expiry time has passed.
.. _notifications_query:
Querying for support
@@ -168,24 +226,26 @@ Key Value
implements. If no actions are supported, the ``a`` key must be absent from the
query response.
``o`` Comma separated list of occassions from the ``o`` key that the
terminal implements. If no occassions are supported, the value
``o=always`` must be sent in the query response.
``c`` ``c=1`` if the terminal supports close events, otherwise the ``c``
must be omitted.
``u`` Comma separated list of urgency values that the terminal implements.
If urgency is not supported, the ``u`` key must be absent from the
query response.
``o`` Comma separated list of occassions from the ``o`` key that the
terminal implements. If no occasions are supported, the value
``o=always`` must be sent in the query response.
``p`` Comma spearated list of supported payload types (i.e. values of the
``p`` key that the terminal implements). These must contain at least
``title`` and ``body``.
``c`` ``c=1`` if the terminal supports close events, otherwise the ``c``
must be omitted.
``u`` Comma separated list of urgency values that the terminal implements.
If urgency is not supported, the ``u`` key must be absent from the
query response.
``w`` ``w=1`` if the terminal supports auto expiring of notifications.
======= ================================================================================
In the future, if this protocol expands, more keys might be added. Clients must
ignore keys they dont understand in the query response.
ignore keys they do not understand in the query response.
To check if a terminal emulator supports this notifications protocol the best way is to
send the above *query action* followed by a request for the `primary device
@@ -193,6 +253,8 @@ attributes <https://vt100.net/docs/vt510-rm/DA1.html>`_. If you get back an
answer for the device attributes without getting back an answer for the *query
action* the terminal emulator does not support this notifications protocol.
.. _keys_in_notificatons_protocol:
Specification of all keys used in the protocol
--------------------------------------------------
@@ -205,23 +267,23 @@ Key Value Default Description
optional leading
``-``
``c`` ``0`` or ``1`` ``0`` When non-zero an escape code is sent to the application when the notification is closed.
``d`` ``0`` or ``1`` ``1`` Indicates if the notification is
complete or not. A non-zero value
means it is complete.
``e`` ``0`` or ``1`` ``0`` If set to ``1`` means the payload is :rfc:`base64 <4648>` encoded UTF-8,
``e`` ``0`` or ``1`` ``0`` If set to ``1`` means the payload is :ref:`base64` encoded UTF-8,
otherwise it is plain UTF-8 text with no C0 control codes in it
``f`` :ref:`base64` ``unset`` The name of the application sending the notification. Can be used to filter out notifications.
encoded UTF-8
application name
``i`` ``[a-zA-Z0-9-_+.]`` ``0`` Identifier for the notification. Make these globally unqiue,
like an UUID, so that termial multiplxers can
like an UUID, so that terminal multiplexers can
direct responses to the correct window.
``p`` One of ``title``, ``title`` Whether the payload is the notification title or body or query. If a
``body``, notification has no title, the body will be used as title. Terminal
``close``, emulators should ignore payloads of unknown type to allow for future
``?`` expansion of this protocol.
``o`` One of ``always``, ``always`` When to honor the notification request. ``unfocused`` means when the window
``unfocused`` or the notification is sent on does not have keyboard focus. ``invisible``
``invisible`` means the window both is unfocused
@@ -229,10 +291,20 @@ Key Value Default Description
its OS window is not currently active.
``always`` is the default and always honors the request.
``p`` One of ``title``, ``title`` Whether the payload is the notification title or body or query. If a
``body``, notification has no title, the body will be used as title. Terminal
``close``, emulators should ignore payloads of unknown type to allow for future
``?``, ``alive`` expansion of this protocol.
``t`` :ref:`base64` ``unset`` The type of the notification. Can be used to filter out notifications.
encoded UTF-8
notification type
``u`` ``0, 1 or 2`` ``unset`` The *urgency* of the notification. ``0`` is low, ``1`` is normal and ``2`` is critical.
If not specified normal is used.
``c`` ``0`` or ``1`` ``0`` When non-zero an escape code is sent to the application when the notification is closed.
``w`` ``>=-1`` ``-1`` The number of milliseconds to auto-close the notification after.
======= ==================== ========== =================
@@ -247,3 +319,31 @@ Key Value Default Description
|kitty| also supports the `legacy OSC 9 protocol developed by iTerm2
<https://iterm2.com/documentation-escape-codes.html>`__ for desktop
notifications.
.. _base64:
Base64
---------------
The base64 encoding used in the this specification is the one defined in
:rfc:`4648`. When a base64 payload is chunked, either the chunking should be
done before encoding or after. When the chunking is done before encoding, no
more than 2048 bytes of data should be encoded per chunk and the encoded data
**must** include the base64 padding bytes, if any. When the chunking is done
after encoding, each encoded chunk must be no more than 4096 bytes in size.
There may or may not be padding bytes at the end of the last chunk, terminals
must handle either case.
.. _safe_utf8:
Escape code safe UTF-8
--------------------------
This must be valid UTF-8 as per the spec in :rfc:`3629`. In addition, in order
to make it safe for transmission embedded inside an escape code, it must
contain none of the C0 and C1 control characters, that is, the unicode
characters: U+0000 (NUL) - U+1F (Unit separator), U+7F (DEL) and U+80 (PAD) - U+9F
(APC). Note that in particular, this means that no newlines, carriage returns,
tabs, etc. are allowed.

View File

@@ -116,14 +116,14 @@ this could be achieved using the ssh kitten with :program:`zsh` and
hostname myserver-*
# Setup zsh to read its files from my-conf/zsh
env ZDOTDIR $HOME/my-conf/zsh
env ZDOTDIR=$HOME/my-conf/zsh
copy --dest my-conf/zsh/.zshrc .zshrc
copy --dest my-conf/zsh/.zshenv .zshenv
# If you use other zsh init files add them in a similar manner
# Setup vim to read its config from my-conf/vim
env VIMINIT $HOME/my-conf/vim/vimrc
env VIMRUNTIME $HOME/my-conf/vim
env VIMINIT=$HOME/my-conf/vim/vimrc
env VIMRUNTIME=$HOME/my-conf/vim
copy --dest my-conf/vim .vim
copy --dest my-conf/vim/vimrc .vimrc

58
docs/notifications.py Normal file
View File

@@ -0,0 +1,58 @@
#!/usr/bin/env python
# A sample script to process notifications. Save it as
# ~/.config/kitty/notifications.py
import subprocess
from kitty.notifications import NotificationCommand, Urgency
def log_notification(nc: NotificationCommand) -> None:
# Log notifications to /tmp/notifications-log.txt
with open('/tmp/notifications-log.txt', 'a') as log:
print(f'title: {nc.title}', file=log)
print(f'body: {nc.body}', file=log)
print(f'app: {nc.application_name}', file=log)
print(f'types: {nc.notification_types}', file=log)
print('\n', file=log)
def on_notification_activated(nc: NotificationCommand) -> None:
# do something when this notification is activated (clicked on)
# remember to assign this to the on_activation field in main()
pass
def main(nc: NotificationCommand) -> bool:
'''
This function should return True to filter out the notification
'''
log_notification(nc)
# filter out notifications with 'unwanted' in their titles
if 'unwanted' in nc.title.lower():
return True
# filter out notifications from the application badapp
if nc.application_name == 'badapp':
return True
# filter out low urgency notifications
if nc.urgency is Urgency.Low:
return True
# replace some bad text in the notification body
nc.body = nc.body.replace('bad text', 'good text')
# run a script if this notification is from myapp and has
# type foo, passing in the title and body as command line args
# to the script.
if nc.application_name == 'myapp' and 'foo' in nc.notification_types:
subprocess.Popen(['/path/to/my/script', nc.title, nc.body])
# do some arbitrary actions when this notification is activated
nc.on_activation = on_notification_activated
# dont filter out this notification
return False

View File

@@ -464,9 +464,20 @@ def generate_extra_cli_parser(name: str, spec: str) -> None:
print('}')
def kittens_needing_cli_parsers() -> Iterator[str]:
for d in os.scandir('kittens'):
if not d.is_dir(follow_symlinks=False):
continue
if os.path.exists(os.path.join(d.path, 'main.py')) and os.path.exists(os.path.join(d.path, 'main.go')):
with open(os.path.join(d.path, 'main.py')) as f:
raw = f.read()
if 'options' in raw:
yield d.name
def kitten_clis() -> None:
from kittens.runner import get_kitten_conf_docs, get_kitten_extra_cli_parsers
for kitten in wrapped_kittens() + ('pager',):
for kitten in kittens_needing_cli_parsers():
defn = get_kitten_conf_docs(kitten)
if defn is not None:
generate_conf_parser(kitten, defn)

2
glfw/glfw3.h vendored
View File

@@ -1317,7 +1317,7 @@ typedef struct GLFWLayerShellConfig {
typedef struct GLFWDBUSNotificationData {
const char *app_name, *icon, *summary, *body, *action_name;
int32_t timeout; uint8_t urgency;
int32_t timeout; uint8_t urgency; uint32_t replaces;
} GLFWDBUSNotificationData;
/*! @brief The function pointer type for error callbacks.

3
glfw/linux_notify.c vendored
View File

@@ -111,7 +111,6 @@ glfw_dbus_send_user_notification(const GLFWDBUSNotificationData *n, GLFWDBusnoti
data->next_id = ++notification_id;
data->callback = callback; data->data = user_data;
if (!data->next_id) data->next_id = ++notification_id;
uint32_t replaces_id = 0;
RAII_MSG(msg, dbus_message_new_method_call(NOTIFICATIONS_SERVICE, NOTIFICATIONS_PATH, NOTIFICATIONS_IFACE, "Notify"));
if (!msg) { return 0; }
@@ -120,7 +119,7 @@ glfw_dbus_send_user_notification(const GLFWDBUSNotificationData *n, GLFWDBusnoti
#define check_call(func, ...) if (!func(__VA_ARGS__)) { _glfwInputError(GLFW_PLATFORM_ERROR, "%s", "Out of memory allocating DBUS message for notification\n"); return 0; }
#define APPEND(to, type, val) check_call(dbus_message_iter_append_basic, &to, type, &val);
APPEND(args, DBUS_TYPE_STRING, n->app_name)
APPEND(args, DBUS_TYPE_UINT32, replaces_id)
APPEND(args, DBUS_TYPE_UINT32, n->replaces)
APPEND(args, DBUS_TYPE_STRING, n->icon)
APPEND(args, DBUS_TYPE_STRING, n->summary)
APPEND(args, DBUS_TYPE_STRING, n->body)

View File

@@ -427,6 +427,28 @@ func specialize_command(hg *cli.Command) {
hg.ArgCompleter = cli.CompletionForWrapper("rg")
}
type Options struct {
}
func create_cmd(root *cli.Command, run_func func(*cli.Command, *Options, []string) (int, error)) {
ans := root.AddSubCommand(&cli.Command{
Name: "hyperlinked_grep",
Run: func(cmd *cli.Command, args []string) (int, error) {
opts := Options{}
err := cmd.GetOptionValues(&opts)
if err != nil {
return 1, err
}
return run_func(cmd, &opts, args)
},
Hidden: true,
})
specialize_command(ans)
clone := root.AddClone(ans.Group, ans)
clone.Hidden = false
clone.Name = "hyperlinked-grep"
}
func EntryPoint(parent *cli.Command) {
create_cmd(parent, main)
}

View File

321
kittens/notify/main.go Normal file
View File

@@ -0,0 +1,321 @@
package notify
import (
"bytes"
"encoding/base64"
"fmt"
"image"
"io"
"os"
"slices"
"strconv"
"strings"
"time"
"kitty/tools/cli"
"kitty/tools/tty"
"kitty/tools/tui/loop"
"kitty/tools/utils"
)
var _ = fmt.Print
const ESC_CODE_PREFIX = "\x1b]99;"
const ESC_CODE_SUFFIX = "\x1b\\"
const CHUNK_SIZE = 4096
func b64encode(x string) string {
return base64.RawStdEncoding.EncodeToString(utils.UnsafeStringToBytes(x))
}
func check_id_valid(x string) bool {
pat := utils.MustCompile(`[^a-zA-Z0-9_+.-]`)
return pat.ReplaceAllString(x, "") == x
}
type parsed_data struct {
opts *Options
wait_till_closed bool
expire_time time.Duration
title, body, identifier string
image_data []byte
}
func (p *parsed_data) create_metadata() string {
ans := []string{}
if p.opts.AppName != "" {
ans = append(ans, "f="+b64encode(p.opts.AppName))
}
switch p.opts.Urgency {
case "low":
ans = append(ans, "u=0")
case "critical":
ans = append(ans, "u=2")
}
if p.expire_time >= 0 {
ans = append(ans, "w="+strconv.FormatInt(p.expire_time.Milliseconds(), 10))
}
if p.opts.Type != "" {
ans = append(ans, "t="+b64encode(p.opts.Type))
}
if p.wait_till_closed {
ans = append(ans, "c=1:a=report")
}
for _, x := range p.opts.Icon {
ans = append(ans, "n="+b64encode(x))
}
if p.opts.IconCacheId != "" {
ans = append(ans, "g="+p.opts.IconCacheId)
}
m := strings.Join(ans, ":")
if m != "" {
m = ":" + m
}
return m
}
var debugprintln = tty.DebugPrintln
func (p *parsed_data) generate_chunks(callback func(string)) {
prefix := ESC_CODE_PREFIX + "i=" + p.identifier
write_chunk := func(middle string) {
callback(prefix + middle + ESC_CODE_SUFFIX)
}
add_payload := func(payload_type, payload string) {
if payload == "" {
return
}
p := utils.IfElse(payload_type == "title", "", ":p="+payload_type)
payload = b64encode(payload)
for len(payload) > 0 {
chunk := payload[:min(CHUNK_SIZE, len(payload))]
payload = utils.IfElse(len(payload) > len(chunk), payload[len(chunk):], "")
write_chunk(":d=0:e=1" + p + ";" + chunk)
}
}
metadata := p.create_metadata()
write_chunk(":d=0" + metadata + ";")
add_payload("title", p.title)
add_payload("body", p.body)
if len(p.image_data) > 0 {
add_payload("icon", utils.UnsafeBytesToString(p.image_data))
}
write_chunk(";")
}
func (p *parsed_data) run_loop() (err error) {
lp, err := loop.New(loop.NoAlternateScreen, loop.NoRestoreColors, loop.NoMouseTracking)
if err != nil {
return err
}
activated := ""
prefix := ESC_CODE_PREFIX + "i=" + p.identifier
poll_for_close := func() {
lp.AddTimer(time.Millisecond*50, false, func(_ loop.IdType) error {
lp.QueueWriteString(prefix + ":p=alive;" + ESC_CODE_SUFFIX)
return nil
})
}
lp.OnInitialize = func() (string, error) {
p.generate_chunks(func(x string) { lp.QueueWriteString(x) })
return "", nil
}
lp.OnEscapeCode = func(ect loop.EscapeCodeType, data []byte) error {
if ect == loop.OSC && bytes.HasPrefix(data, []byte(ESC_CODE_PREFIX[2:])) {
raw := utils.UnsafeBytesToString(data[len(ESC_CODE_PREFIX[2:]):])
metadata, payload, _ := strings.Cut(raw, ";")
sent_identifier, payload_type := "", ""
for _, x := range strings.Split(metadata, ":") {
key, val, _ := strings.Cut(x, "=")
switch key {
case "i":
sent_identifier = val
case "p":
payload_type = val
}
}
if sent_identifier == p.identifier {
switch payload_type {
case "close":
if payload == "untracked" {
poll_for_close()
} else {
lp.Quit(0)
}
case "alive":
live_ids := strings.Split(payload, ",")
if slices.Contains(live_ids, p.identifier) {
poll_for_close()
} else {
lp.Quit(0)
}
case "":
activated = utils.IfElse(payload == "", "activated", payload)
}
}
}
return nil
}
close_requested := 0
lp.OnKeyEvent = func(event *loop.KeyEvent) error {
if event.MatchesPressOrRepeat("ctrl+c") || event.MatchesPressOrRepeat("esc") {
event.Handled = true
switch close_requested {
case 0:
lp.QueueWriteString(prefix + ":p=close;" + ESC_CODE_SUFFIX)
lp.Println("Closing notification, please wait...")
close_requested++
case 1:
key := "Esc"
if event.MatchesPressOrRepeat("ctrl+c") {
key = "Ctrl+C"
}
lp.Println(fmt.Sprintf("Waiting for response from terminal, press the %s key again to abort. Note that this might result in garbage being printed to the terminal.", key))
close_requested++
default:
return fmt.Errorf("Aborted by user!")
}
}
return nil
}
err = lp.Run()
ds := lp.DeathSignalName()
if ds != "" {
fmt.Println("Killed by signal: ", ds)
lp.KillIfSignalled()
return
}
if activated != "" && err == nil {
fmt.Println(activated)
}
return
}
func random_ident() (string, error) {
return utils.HumanUUID4()
}
func parse_duration(x string) (ans time.Duration, err error) {
switch x {
case "never":
return 0, nil
case "":
return -1, nil
}
trailer := x[len(x)-1]
multipler := time.Second
switch trailer {
case 's':
x = x[:len(x)-1]
case 'm':
x = x[:len(x)-1]
multipler = time.Minute
case 'h':
x = x[:len(x)-1]
multipler = time.Hour
case 'd':
x = x[:len(x)-1]
multipler = time.Hour * 24
}
val, err := strconv.ParseFloat(x, 64)
if err != nil {
return ans, err
}
ans = time.Duration(float64(multipler) * val)
return
}
func (p *parsed_data) load_image_data() (err error) {
if p.opts.IconPath == "" {
return nil
}
f, err := os.Open(p.opts.IconPath)
if err != nil {
return err
}
defer f.Close()
_, imgfmt, err := image.DecodeConfig(f)
if _, err = f.Seek(0, io.SeekStart); err != nil {
return err
}
if err == nil && imgfmt != "" && strings.Contains("jpeg jpg gif png", strings.ToLower(imgfmt)) {
p.image_data, err = io.ReadAll(f)
return
}
return fmt.Errorf("The icon must be in PNG, JPEG or GIF formats")
}
func main(_ *cli.Command, opts *Options, args []string) (rc int, err error) {
if len(args) == 0 {
return 1, fmt.Errorf("Must specify a TITLE for the notification")
}
var p parsed_data
p.opts = opts
p.title = args[0]
if len(p.title) == 0 {
return 1, fmt.Errorf("Must specify a non-empty TITLE for the notification")
}
if len(args) > 1 {
p.body = strings.Join(args[1:], " ")
}
ident := opts.Identifier
if ident == "" {
if ident, err = random_ident(); err != nil {
return 1, fmt.Errorf("Failed to generate a random identifier with error: %w", err)
}
}
bad_ident := func(which string) error {
return fmt.Errorf("Invalid identifier: %s must be only English letters, numbers, hyphens and underscores.", which)
}
if !check_id_valid(ident) {
return 1, bad_ident(ident)
}
p.identifier = ident
if !check_id_valid(opts.IconCacheId) {
return 1, bad_ident(opts.IconCacheId)
}
if p.expire_time, err = parse_duration(opts.ExpireTime); err != nil {
return 1, fmt.Errorf("Invalid expire time: %s with error: %w", opts.ExpireTime, err)
}
p.wait_till_closed = opts.WaitTillClosed
if err = p.load_image_data(); err != nil {
return 1, fmt.Errorf("Failed to load image data from %s with error %w", opts.IconPath, err)
}
if opts.OnlyPrintEscapeCode {
p.generate_chunks(func(x string) {
if err == nil {
_, err = os.Stdout.WriteString(x)
}
})
} else {
if opts.PrintIdentifier {
fmt.Println(ident)
}
if p.wait_till_closed {
err = p.run_loop()
} else {
var term *tty.Term
if term, err = tty.OpenControllingTerm(); err != nil {
return 1, fmt.Errorf("Failed to open controlling terminal with error: %w", err)
}
p.generate_chunks(func(x string) {
if err == nil {
_, err = term.WriteString(x)
}
})
term.RestoreAndClose()
}
}
if err != nil {
rc = 1
}
return
}
func EntryPoint(parent *cli.Command) {
create_cmd(parent, main)
}

96
kittens/notify/main.py Normal file
View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
import sys
def OPTIONS() -> str:
from kitty.constants import standard_icon_names
return f'''
--icon -i
type=list
The name of the icon to use for the notification. An icon with this name
will be searched for on the computer running the terminal emulator. Can
be specified multiple times, the first name that is found will be used.
Standard names: {', '.join(sorted(standard_icon_names))}
--icon-path -I
Path to an image file in PNG/JPEG/GIF formats to use as the icon. If both
name and path are specified then first the name will be looked for and if not found
then the path will be used.
--app-name -a
default=kitten-notify
The application name for the notification.
--urgency -u
default=normal
choices=normal,low,critical
The urgency of the notification.
--expire-time -t
The duration, for the notification to appear on screen. The default is to
use the policy of the OS notification service. A value of :code:`never` means the notification should
never expire, however, this may or may not work depending on the policies of the OS notification
service. Time is specified in the form NUMBER[SUFFIX] where SUFFIX can be :code:`s` for seconds, :code:`m` for minutes,
:code:`h` for hours or :code:`d` for days. Non-integer numbers are allowed.
If not specified, seconds is assumed. The notification is guaranteed to be closed automatically
after the specified time has elapsed. The notification could be closed before by user
action or OS policy.
--type -c
The notification type. Can be any string, it is used by users to create filter rules
for notifications, so choose something descriptive of the notifications, purpose.
--identifier
The identifier of this notification. If a notification with the same identifier
is already displayed, it is replaced/updated.
--print-identifier -p
type=bool-set
Print the identifier for the notification to STDOUT. Useful when not specifying
your own identifier via the --identifier option.
--wait-till-closed -w
type=bool-set
Wait until the notification is closed. If the user activates the notification,
"activated" is printed to STDOUT before quitting. Press the Esc or Ctrl+C keys
to close the notification manually.
--only-print-escape-code
type=bool-set
Only print the escape code to STDOUT. Useful if using this kitten as part
of a larger application. If this is specified, the --wait-till-closed option
will be used for escape code generation, but no actual waiting will be done.
--icon-cache-id -g
Identifier to use when caching icons in the terminal emulator. Using an identifier means
that icon data needs to be transmitted only once using --icon-path. Subsequent invocations
will use the cached icon data, at least until the terminal instance is restarted. This is useful
if this kitten is being used inside a larger application, with --only-print-escape-code.
'''
help_text = '''\
Send notifications to the user that are displayed to them via the
desktop environment's notifications service. Works over SSH as well.
'''
usage = 'TITLE [BODY ...]'
if __name__ == '__main__':
raise SystemExit('This should be run as kitten clipboard')
elif __name__ == '__doc__':
cd = sys.cli_docs # type: ignore
cd['usage'] = usage
cd['options'] = OPTIONS
cd['help_text'] = help_text
cd['short_desc'] = 'Send notifications to the user'

View File

@@ -1145,7 +1145,7 @@ static bool has_cocoa_pending_actions = false;
typedef struct cocoa_list { char **items; size_t count, capacity; } cocoa_list;
typedef struct {
char* wd;
cocoa_list open_urls, closed_notifications;
cocoa_list open_urls, untracked_notifications;
} CocoaPendingActionsData;
static CocoaPendingActionsData cocoa_pending_actions_data = {0};
@@ -1165,7 +1165,7 @@ static void
cocoa_free_actions_data(void) {
if (cocoa_pending_actions_data.wd) { free(cocoa_pending_actions_data.wd); cocoa_pending_actions_data.wd = NULL; }
cocoa_free_pending_list(&cocoa_pending_actions_data.open_urls);
cocoa_free_pending_list(&cocoa_pending_actions_data.closed_notifications);
cocoa_free_pending_list(&cocoa_pending_actions_data.untracked_notifications);
}
void
@@ -1174,8 +1174,8 @@ set_cocoa_pending_action(CocoaPendingAction action, const char *data) {
switch(action) {
case LAUNCH_URLS:
cocoa_append_to_pending_list(&cocoa_pending_actions_data.open_urls, data); break;
case COCOA_NOTIFICATION_CLOSED:
cocoa_append_to_pending_list(&cocoa_pending_actions_data.closed_notifications, data); break;
case COCOA_NOTIFICATION_UNTRACKED:
cocoa_append_to_pending_list(&cocoa_pending_actions_data.untracked_notifications, data); break;
default:
if (cocoa_pending_actions_data.wd) free(cocoa_pending_actions_data.wd);
cocoa_pending_actions_data.wd = strdup(data);
@@ -1226,14 +1226,17 @@ process_cocoa_pending_actions(void) {
}
}
cocoa_pending_actions_data.open_urls.count = 0;
for (unsigned cpa = 0; cpa < cocoa_pending_actions_data.closed_notifications.count; cpa++) {
if (cocoa_pending_actions_data.closed_notifications.items[cpa]) {
cocoa_report_closed_notification(cocoa_pending_actions_data.closed_notifications.items[cpa]);
free(cocoa_pending_actions_data.closed_notifications.items[cpa]);
cocoa_pending_actions_data.closed_notifications.items[cpa] = NULL;
for (unsigned cpa = 0; cpa < cocoa_pending_actions_data.untracked_notifications.count; cpa++) {
if (cocoa_pending_actions_data.untracked_notifications.items[cpa]) {
cocoa_report_live_notifications(cocoa_pending_actions_data.untracked_notifications.items[cpa]);
free(cocoa_pending_actions_data.untracked_notifications.items[cpa]);
cocoa_pending_actions_data.untracked_notifications.items[cpa] = NULL;
}
}
cocoa_pending_actions_data.closed_notifications.count = 0;
cocoa_pending_actions_data.untracked_notifications.count = 0;
memset(cocoa_pending_actions, 0, sizeof(cocoa_pending_actions));
has_cocoa_pending_actions = false;

View File

@@ -237,7 +237,7 @@ class WriteRequest:
self, is_primary_selection: bool = False, protocol_type: ProtocolType = ProtocolType.osc_52, id: str = '',
rollover_size: int = 16 * 1024 * 1024, max_size: int = -1,
) -> None:
self.decoder = StreamingBase64Decoder(8 * 1024)
self.decoder = StreamingBase64Decoder()
self.id = id
self.is_primary_selection = is_primary_selection
self.protocol_type = protocol_type
@@ -279,24 +279,18 @@ class WriteRequest:
self.currently_writing_mime = mime
self.write_base64_data(data)
@property
def current_leftover_bytes(self) -> memoryview:
return self.decoder.leftover_bytes()
def flush_base64_data(self) -> None:
if self.currently_writing_mime:
self.decoder.flush()
if len(self.decoder):
self.write_base64_data(b'')
self.decoder.reset()
start = self.mime_map[self.currently_writing_mime][0]
self.mime_map[self.currently_writing_mime] = MimePos(start, self.tempfile.tell() - start)
self.currently_writing_mime = ''
def write_base64_data(self, b: bytes) -> None:
if not self.max_size_exceeded:
self.decoder.add(b)
if len(self.decoder):
self.tempfile.write(self.decoder.take_output())
decoded = self.decoder.decode(b)
if decoded:
self.tempfile.write(decoded)
if self.max_size > 0 and self.tempfile.tell() > (self.max_size * 1024 * 1024):
log_error(f'Clipboard write request has more data than allowed by clipboard_max_size ({self.max_size}), truncating')
self.max_size_exceeded = True

View File

@@ -34,7 +34,7 @@ typedef enum {
MINIMIZE,
QUIT,
USER_MENU_ACTION,
COCOA_NOTIFICATION_CLOSED,
COCOA_NOTIFICATION_UNTRACKED,
NUM_COCOA_PENDING_ACTIONS
} CocoaPendingAction;
@@ -59,4 +59,4 @@ bool cocoa_render_line_of_text(const char *text, const color_type fg, const colo
extern uint8_t* render_single_ascii_char_as_mask(const char ch, size_t *result_width, size_t *result_height);
void get_cocoa_key_equivalent(uint32_t, int, char *key, size_t key_sz, int*);
void set_cocoa_pending_action(CocoaPendingAction action, const char*);
void cocoa_report_closed_notification(const char* ident);
void cocoa_report_live_notifications(const char* ident);

View File

@@ -435,7 +435,7 @@ cocoa_send_notification(PyObject *self UNUSED, PyObject *args) {
do_notification_callback([[[response notification] request] identifier], "activated");
} else if ([response.actionIdentifier isEqualToString:UNNotificationDismissActionIdentifier]) {
// this never actually happens on macOS. Bloody Crapple.
do_notification_callback([[[response notification] request] identifier], "closed");
// do_notification_callback([[[response notification] request] identifier], "closed");
}
completionHandler();
}
@@ -457,23 +457,6 @@ get_notification_center_safely(void) {
return center;
}
static bool
remove_delivered_notification(const char *identifier) {
UNUserNotificationCenter *center = get_notification_center_safely();
if (!center) return false;
[center removeDeliveredNotificationsWithIdentifiers:@[ @(identifier) ]];
return true;
}
static NSLock *notifications_polling_lock = NULL;
static bool polling_notifications = false;
typedef struct tn { char *ident; bool closed; monotonic_t creation_time; } tn;
static struct { tn *items; size_t count, capacity; monotonic_t creation_time; } tracked_notifications;
static void dispatch_closed_notifications(void);
#define CLOSE_POLL_TIME 60.
#define CLOSE_POLL_INTERVAL 750
static bool
ident_in_list_of_notifications(NSString *ident, NSArray<UNNotification*> *list) {
for (UNNotification *n in list) {
@@ -482,81 +465,48 @@ ident_in_list_of_notifications(NSString *ident, NSArray<UNNotification*> *list)
return false;
}
static void
poll_for_closed_notifications(void) {
UNUserNotificationCenter *center = get_notification_center_safely();
if (!center) return;
[center getDeliveredNotificationsWithCompletionHandler:^(NSArray<UNNotification *> * notifications) {
// NSLog(@"num of delivered but not closed nots: %lu", (unsigned long)[notifications count]);
[notifications_polling_lock lock];
for (size_t i = 0; i < tracked_notifications.count; i++) {
if (!ident_in_list_of_notifications(@(tracked_notifications.items[i].ident), notifications)) tracked_notifications.items[i].closed = true;
}
[notifications_polling_lock unlock];
dispatch_async(dispatch_get_main_queue(), ^{
dispatch_closed_notifications();
});
}];
}
void
cocoa_report_closed_notification(const char* ident) {
do_notification_callback(@(ident), "closed");
cocoa_report_live_notifications(const char* ident) {
do_notification_callback(@(ident), "live");
}
static void
dispatch_closed_notifications(void) {
bool poll = false;
[notifications_polling_lock lock];
monotonic_t now = monotonic();
for (size_t i = tracked_notifications.count; i-- > 0; ) {
if (tracked_notifications.items[i].closed) {
set_cocoa_pending_action(COCOA_NOTIFICATION_CLOSED, tracked_notifications.items[i].ident);
free(tracked_notifications.items[i].ident);
remove_i_from_array(tracked_notifications.items, i, tracked_notifications.count);
} else if (now - tracked_notifications.items[i].creation_time < s_double_to_monotonic_t(CLOSE_POLL_TIME)) poll = true;
}
polling_notifications = poll;
[notifications_polling_lock unlock];
if (poll) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, CLOSE_POLL_INTERVAL * NSEC_PER_MSEC), dispatch_get_main_queue(), ^{
poll_for_closed_notifications();
});
}
}
static void
track_notification(char *ident) {
static bool
remove_delivered_notification(const char *identifier) {
UNUserNotificationCenter *center = get_notification_center_safely();
if (center) {
[notifications_polling_lock lock];
bool has_existing = false;
for (size_t i = 0; i < tracked_notifications.count; i++) {
if (strcmp(tracked_notifications.items[i].ident, ident) == 0) {
tracked_notifications.items[i].creation_time = monotonic();
has_existing = true;
break;
}
if (!center) return false;
char *ident = strdup(identifier);
[center getDeliveredNotificationsWithCompletionHandler:^(NSArray<UNNotification *> * notifications) {
if (ident_in_list_of_notifications(@(ident), notifications)) {
[center removeDeliveredNotificationsWithIdentifiers:@[ @(ident) ]];
}
if (!has_existing) {
ensure_space_for(&tracked_notifications, items, tn, tracked_notifications.count + 1, capacity, 8, false);
tracked_notifications.items[tracked_notifications.count++] = (tn){.ident=ident, .creation_time=monotonic()};
free(ident);
}];
return true;
}
static bool
live_delivered_notifications(void) {
UNUserNotificationCenter *center = get_notification_center_safely();
if (!center) return false;
[center getDeliveredNotificationsWithCompletionHandler:^(NSArray<UNNotification *> * notifications) {
@autoreleasepool {
NSMutableString *buffer = [NSMutableString stringWithCapacity:1024]; // autoreleased
for (UNNotification *n in notifications) [buffer appendFormat:@"%@,", [[n request] identifier]];
const char *val = [buffer UTF8String];
set_cocoa_pending_action(COCOA_NOTIFICATION_UNTRACKED, val ? val : "");
}
bool needs_poll = !polling_notifications;
[notifications_polling_lock unlock];
if (needs_poll) poll_for_closed_notifications();
}
}];
return true;
}
static void
schedule_notification(const char *identifier, const char *title, const char *body, const char *subtitle, int urgency) {
schedule_notification(const char *identifier, const char *title, const char *body, int urgency) {
UNUserNotificationCenter *center = get_notification_center_safely();
if (!center) return;
// Configure the notification's payload.
UNMutableNotificationContent* content = [[UNMutableNotificationContent alloc] init];
if (title) content.title = @(title);
if (body) content.body = @(body);
if (subtitle) content.subtitle = @(subtitle);
content.sound = [UNNotificationSound defaultSound];
#if __MAC_OS_X_VERSION_MIN_REQUIRED >= 120000
switch (urgency) {
@@ -584,9 +534,8 @@ schedule_notification(const char *identifier, const char *title, const char *bod
if (error != nil) log_error("Failed to show notification: %s", [[error localizedDescription] UTF8String]);
bool ok = error == nil;
dispatch_async(dispatch_get_main_queue(), ^{
do_notification_callback(@(duped_ident), ok ? "created" : "closed");
if (ok) track_notification(duped_ident);
else free(duped_ident);
do_notification_callback(@(duped_ident), ok ? "created" : "creation_failed");
free(duped_ident);
});
}];
[content release];
@@ -594,7 +543,7 @@ schedule_notification(const char *identifier, const char *title, const char *bod
typedef struct {
char *identifier, *title, *body, *subtitle;
char *identifier, *title, *body;
int urgency;
} QueuedNotification;
@@ -605,13 +554,12 @@ typedef struct {
static NotificationQueue notification_queue = {0};
static void
queue_notification(const char *identifier, const char *title, const char* body, const char* subtitle, int urgency) {
queue_notification(const char *identifier, const char *title, const char* body, int urgency) {
ensure_space_for((&notification_queue), notifications, QueuedNotification, notification_queue.count + 16, capacity, 16, true);
QueuedNotification *n = notification_queue.notifications + notification_queue.count++;
n->identifier = identifier ? strdup(identifier) : NULL;
n->title = title ? strdup(title) : NULL;
n->body = body ? strdup(body) : NULL;
n->subtitle = subtitle ? strdup(subtitle) : NULL;
n->urgency = urgency;
}
@@ -620,13 +568,13 @@ drain_pending_notifications(BOOL granted) {
if (granted) {
for (size_t i = 0; i < notification_queue.count; i++) {
QueuedNotification *n = notification_queue.notifications + i;
schedule_notification(n->identifier, n->title, n->body, n->subtitle, n->urgency);
schedule_notification(n->identifier, n->title, n->body, n->urgency);
}
}
while(notification_queue.count) {
QueuedNotification *n = notification_queue.notifications + --notification_queue.count;
if (!granted) do_notification_callback(@(n->identifier), "closed");
free(n->identifier); free(n->title); free(n->body); free(n->subtitle);
if (!granted) do_notification_callback(@(n->identifier), "creation_failed");
free(n->identifier); free(n->title); free(n->body);
memset(n, 0, sizeof(QueuedNotification));
}
}
@@ -638,15 +586,23 @@ cocoa_remove_delivered_notification(PyObject *self UNUSED, PyObject *x) {
Py_RETURN_FALSE;
}
static PyObject*
cocoa_live_delivered_notifications(PyObject *self UNUSED, PyObject *x UNUSED) {
if (live_delivered_notifications()) { Py_RETURN_TRUE; }
Py_RETURN_FALSE;
}
static PyObject*
cocoa_send_notification(PyObject *self UNUSED, PyObject *args) {
char *identifier = NULL, *title = NULL, *body = NULL, *subtitle = NULL; int urgency = 1;
if (!PyArg_ParseTuple(args, "ssz|zi", &identifier, &title, &body, &subtitle, &urgency)) return NULL;
char *identifier = NULL, *title = NULL, *body = NULL; int urgency = 1;
if (!PyArg_ParseTuple(args, "sss|i", &identifier, &title, &body, &urgency)) return NULL;
UNUserNotificationCenter *center = get_notification_center_safely();
if (!center) Py_RETURN_NONE;
if (!center.delegate) center.delegate = [[NotificationDelegate alloc] init];
queue_notification(identifier, title, body, subtitle, urgency);
queue_notification(identifier, title, body, urgency);
// The badge permission needs to be requested as well, even though it is not used,
// otherwise macOS refuses to show the preference checkbox for enable/disable notification sound.
@@ -1116,8 +1072,6 @@ cleanup(void) {
dockMenu = nil;
if (beep_sound) [beep_sound release];
beep_sound = nil;
if (notifications_polling_lock) [notifications_polling_lock release];
notifications_polling_lock = nil;
#ifndef KITTY_USE_DEPRECATED_MACOS_NOTIFICATION_API
drain_pending_notifications(NO);
@@ -1157,6 +1111,7 @@ static PyMethodDef module_methods[] = {
{"cocoa_set_global_shortcut", (PyCFunction)cocoa_set_global_shortcut, METH_VARARGS, ""},
{"cocoa_send_notification", (PyCFunction)cocoa_send_notification, METH_VARARGS, ""},
{"cocoa_remove_delivered_notification", (PyCFunction)cocoa_remove_delivered_notification, METH_O, ""},
{"cocoa_live_delivered_notifications", (PyCFunction)cocoa_live_delivered_notifications, METH_NOARGS, ""},
{"cocoa_set_notification_activated_callback", (PyCFunction)set_notification_activated_callback, METH_O, ""},
{"cocoa_set_url_handler", (PyCFunction)cocoa_set_url_handler, METH_VARARGS, ""},
{"cocoa_set_app_icon", (PyCFunction)cocoa_set_app_icon, METH_VARARGS, ""},
@@ -1168,7 +1123,6 @@ bool
init_cocoa(PyObject *module) {
cocoa_clear_global_shortcuts();
if (PyModule_AddFunctions(module, module_methods) != 0) return false;
notifications_polling_lock = [NSLock new];
register_at_exit_cleanup_func(COCOA_CLEANUP_FUNC, cleanup);
return true;
}

View File

@@ -188,6 +188,20 @@ except KeyError:
# https://github.com/ansible/ansible/issues/11536#issuecomment-153030743
ssh_control_master_template = 'kssh-{kitty_pid}-{ssh_placeholder}'
# See https://specifications.freedesktop.org/icon-naming-spec/latest/ar01s04.html
standard_icon_names = {
'error': 'dialog-error',
'warning': 'dialog-warning',
'warn': 'dialog-warning',
'info': 'dialog-information',
'question': 'dialog-question',
'help': 'system-help',
'file-manager': 'system-file-manager',
'system-monitor': 'utilities-system-monitor',
'text-editor': 'utilities-text-editor',
}
def glfw_path(module: str) -> str:
prefix = 'kitty.' if getattr(sys, 'frozen', False) else ''

View File

@@ -107,134 +107,51 @@ pybase64_decode(PyObject UNUSED *self, PyObject *args) {
typedef struct StreamingBase64Decoder {
PyObject_HEAD
PyObject *output;
size_t output_sz, output_capacity, num_leftover_bytes, initial_capacity;
unsigned char leftover_bytes[8];
struct base64_state state;
} StreamingBase64Decoder;
static int
StreamingBase64Decoder_init(PyObject *s, PyObject *args, PyObject *kwds) {
static char *kwlist[] = {"initial_capacity", NULL};
unsigned long initial_capacity = 8 * 1024;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|k", kwlist, &initial_capacity)) return -1;
StreamingBase64Decoder_init(PyObject *s, PyObject *args, PyObject *kwds UNUSED) {
if (PyTuple_GET_SIZE(args)) { PyErr_SetString(PyExc_TypeError, "constructor takes no arguments"); return -1; }
StreamingBase64Decoder *self = (StreamingBase64Decoder*)s;
self->initial_capacity = initial_capacity;
base64_stream_decode_init(&self->state, 0);
return 0;
}
static void
StreamingBase64Decoder_dealloc(PyObject *self) {
StreamingBase64Decoder *h = (StreamingBase64Decoder*)self;
Py_CLEAR(h->output);
Py_TYPE(self)->tp_free(self);
}
static bool
write_base64_data(StreamingBase64Decoder *self, const void *data, size_t len) {
if (!len) return true;
size_t sz = required_buffer_size_for_base64_decode(len);
if ((self->output_sz + sz) > self->output_capacity) {
size_t cap = MAX(self->output_capacity * 2, self->output_sz + sz + self->initial_capacity);
if (self->output) { if (_PyBytes_Resize(&self->output, cap) != 0) return false; }
else { self->output = PyBytes_FromStringAndSize(NULL, cap); if (!self->output) return false; }
self->output_capacity = cap;
}
if (!base64_decode8(data, len, (unsigned char*)(PyBytes_AS_STRING(self->output) + self->output_sz), &sz)) {
PyErr_SetString(PyExc_ValueError, "Invalid base64 input data");
return false;
}
self->output_sz += sz;
return true;
}
static bool
write_saving_leftover_bytes(StreamingBase64Decoder *self, const unsigned char *data, size_t len) {
size_t extra = len % 4;
if (!write_base64_data(self, data, len - extra)) return false;
self->num_leftover_bytes = extra;
if (extra) memcpy(self->leftover_bytes, data + len - extra, extra);
return true;
}
static PyObject*
StreamingBase64Decoder_add(StreamingBase64Decoder *self, PyObject *a) {
StreamingBase64Decoder_decode(StreamingBase64Decoder *self, PyObject *a) {
RAII_PY_BUFFER(data);
if (PyObject_GetBuffer(a, &data, PyBUF_SIMPLE) != 0) return NULL;
if (!data.buf || !data.len) return PyLong_FromLong(0);
unsigned char *d = data.buf; size_t dlen = data.len;
size_t before = self->output_sz;
if (self->num_leftover_bytes) {
size_t extra = 4 - self->num_leftover_bytes;
if (dlen >= extra) {
memcpy(self->leftover_bytes + self->num_leftover_bytes, d, extra);
if (!write_base64_data(self, self->leftover_bytes, self->num_leftover_bytes + extra)) return NULL;
self->num_leftover_bytes = 0;
d += extra; dlen -= extra;
if (!write_saving_leftover_bytes(self, d, dlen)) return NULL;
} else {
memcpy(self->leftover_bytes + self->num_leftover_bytes, d, dlen);
self->num_leftover_bytes += dlen;
}
} else if (!write_saving_leftover_bytes(self, d, dlen)) return NULL;
return PyLong_FromSize_t(self->output_sz - before);
}
static Py_ssize_t
StreamingBase64Decoder_len(PyObject *s) { return ((StreamingBase64Decoder*)s)->output_sz; }
static PyObject*
StreamingBase64Decoder_leftover_bytes(StreamingBase64Decoder *self, PyObject *a UNUSED) {
return PyMemoryView_FromMemory((char*)self->leftover_bytes, self->num_leftover_bytes, PyBUF_READ);
}
static PyObject*
StreamingBase64Decoder_flush(StreamingBase64Decoder *self, PyObject *args UNUSED) {
size_t padding = 4 - self->num_leftover_bytes;
switch(padding) {
case 1: self->leftover_bytes[self->num_leftover_bytes++] = '='; break;
case 2: self->leftover_bytes[self->num_leftover_bytes++] = '='; self->leftover_bytes[self->num_leftover_bytes++] = '='; break;
if (!data.buf || !data.len) return PyBytes_FromStringAndSize(NULL, 0);
size_t sz = required_buffer_size_for_base64_decode(data.len);
RAII_PyObject(ans, PyBytes_FromStringAndSize(NULL, sz));
if (!ans) return NULL;
if (!base64_stream_decode(&self->state, data.buf, data.len, PyBytes_AS_STRING(ans), &sz)) {
PyErr_SetString(PyExc_ValueError, "Invalid base64 input data");
return NULL;
}
write_base64_data(self, self->leftover_bytes, self->num_leftover_bytes);
self->num_leftover_bytes = 0;
if (_PyBytes_Resize(&ans, sz) != 0) return NULL;
return Py_NewRef(ans);
}
static PyObject*
StreamingBase64Decoder_reset(StreamingBase64Decoder *self, PyObject *args UNUSED) {
base64_stream_decode_init(&self->state, 0);
Py_RETURN_NONE;
}
static PyObject*
StreamingBase64Decoder_copy_output(StreamingBase64Decoder *self, PyObject *args UNUSED) {
return PyBytes_FromStringAndSize(PyBytes_AS_STRING(self->output), self->output_sz);
}
static PyObject*
StreamingBase64Decoder_take_output(StreamingBase64Decoder *self, PyObject *args UNUSED) {
if (!self->output_sz) return PyBytes_FromStringAndSize(NULL, 0);
RAII_PyObject(newbuf, PyBytes_FromStringAndSize(NULL, self->initial_capacity));
if (!newbuf) return NULL;
if (_PyBytes_Resize(&self->output, self->output_sz) != 0) return NULL;
PyObject *ans = self->output;
self->output = Py_NewRef(newbuf); self->output_sz = 0; self->output_capacity = self->initial_capacity;
return ans;
}
static PyTypeObject StreamingBase64Decoder_Type = {
PyVarObject_HEAD_INIT(NULL, 0)
.tp_name = "kitty.fast_data_types.StreamingBase64Decoder",
.tp_basicsize = sizeof(StreamingBase64Decoder),
.tp_dealloc = StreamingBase64Decoder_dealloc,
.tp_flags = Py_TPFLAGS_DEFAULT,
.tp_doc = "StreamingBase64Decoder",
.tp_methods = (PyMethodDef[]){
{"add", (PyCFunction)StreamingBase64Decoder_add, METH_O, ""},
{"flush", (PyCFunction)StreamingBase64Decoder_flush, METH_NOARGS, ""},
{"take_output", (PyCFunction)StreamingBase64Decoder_take_output, METH_NOARGS, ""},
{"copy_output", (PyCFunction)StreamingBase64Decoder_copy_output, METH_NOARGS, ""},
{"leftover_bytes", (PyCFunction)StreamingBase64Decoder_leftover_bytes, METH_NOARGS, ""},
{"decode", (PyCFunction)StreamingBase64Decoder_decode, METH_O, ""},
{"reset", (PyCFunction)StreamingBase64Decoder_reset, METH_NOARGS, ""},
{NULL, NULL, 0, NULL},
},
.tp_new = PyType_GenericNew,
.tp_init = StreamingBase64Decoder_init,
.tp_as_sequence = &(PySequenceMethods){
.sq_length = StreamingBase64Decoder_len,
},
};
static PyObject*

View File

@@ -556,6 +556,7 @@ def dbus_send_notification(
action_text: str = '',
timeout: int = -1,
urgency: int = 1,
replaces: int = 0,
) -> int:
pass
@@ -566,13 +567,13 @@ def dbus_close_notification(dbus_notification_id: int) -> bool: ...
def cocoa_send_notification(
identifier: str,
title: str,
body: Optional[str],
subtitle: Optional[str],
body: str,
urgency: int = 1,
) -> None:
pass
def cocoa_remove_delivered_notification(identifier: str) -> bool: ...
def cocoa_live_delivered_notifications() -> bool: ...
def create_os_window(
get_window_size: Callable[[int, int, int, int, float, float], Tuple[int,
@@ -1704,13 +1705,8 @@ def get_mouse_data_for_window(os_window_id: int, tab_id: int, window_id: int) ->
class StreamingBase64Decoder:
def __init__(self, initial_capacity: int = 8 *1024) -> None: ... # set the initial output buffer capacity
def add(self, data: ReadOnlyBuffer) -> int: ... # add the base64 data
def flush(self) -> None: ... # indicate end of base64 data, left over bytes are processed as if they were followed by padding
def take_output(self) -> bytes: ... # take the output so far. The decoder no longer references this output
def copy_output(self) -> bytes: ... # copy the output so far
def __len__(self) -> int: ... # return the length of the current output
def leftover_bytes(self) -> memoryview: ... # return the currently leftover bytes that will be consumed by flush()
def decode(self, data: ReadOnlyBuffer) -> bytes: ... # decode the specified data
def reset(self) -> None: ... # reset the state to empty to start decoding a new stream
class DiskCache:

3
kitty/glfw-wrapper.h generated
View File

@@ -1055,7 +1055,7 @@ typedef struct GLFWLayerShellConfig {
typedef struct GLFWDBUSNotificationData {
const char *app_name, *icon, *summary, *body, *action_name;
int32_t timeout; uint8_t urgency;
int32_t timeout; uint8_t urgency; uint32_t replaces;
} GLFWDBUSNotificationData;
/*! @brief The function pointer type for error callbacks.
@@ -1705,7 +1705,6 @@ typedef void (* GLFWcocoarenderframefun)(GLFWwindow*);
typedef void (*GLFWwaylandframecallbackfunc)(unsigned long long id);
typedef void (*GLFWDBusnotificationcreatedfun)(unsigned long long, uint32_t, void*);
typedef void (*GLFWDBusnotificationactivatedfun)(uint32_t, int, const char*);
typedef int (*glfwInit_func)(monotonic_t);
GFW_EXTERN glfwInit_func glfwInit_impl;
#define glfwInit glfwInit_impl

View File

@@ -2076,17 +2076,18 @@ dbus_notification_created_callback(unsigned long long notification_id, uint32_t
static PyObject*
dbus_send_notification(PyObject *self UNUSED, PyObject *args, PyObject *kw) {
int timeout = -1, urgency = 1;
int timeout = -1, urgency = 1; unsigned int replaces = 0;
GLFWDBUSNotificationData d = {.action_name=""};
static const char* kwlist[] = {"app_name", "app_icon", "title", "body", "action_text", "timeout", "urgency", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kw, "ssss|sii", (char**)kwlist,
&d.app_name, &d.icon, &d.summary, &d.body, &d.action_name, &timeout, &urgency)) return NULL;
static const char* kwlist[] = {"app_name", "app_icon", "title", "body", "action_text", "timeout", "urgency", "replaces", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kw, "ssss|siiI", (char**)kwlist,
&d.app_name, &d.icon, &d.summary, &d.body, &d.action_name, &timeout, &urgency, &replaces)) return NULL;
if (!glfwDBusUserNotify) {
PyErr_SetString(PyExc_RuntimeError, "Failed to load glfwDBusUserNotify, did you call glfw_init?");
return NULL;
}
d.timeout = timeout;
d.urgency = urgency & 3;
d.replaces = replaces;
unsigned long long notification_id = glfwDBusUserNotify(&d, dbus_notification_created_callback, NULL);
return PyLong_FromUnsignedLongLong(notification_id);
}

View File

@@ -6,12 +6,13 @@ import re
from collections import OrderedDict
from contextlib import suppress
from enum import Enum
from functools import partial
from itertools import count
from typing import Any, Callable, Dict, FrozenSet, Iterator, List, NamedTuple, Optional, Tuple, Union
from typing import Any, Callable, Dict, FrozenSet, Iterator, List, NamedTuple, Optional, Sequence, Set, Tuple, Union
from weakref import ReferenceType, ref
from .constants import cache_dir, is_macos, logo_png_file
from .fast_data_types import ESC_OSC, StreamingBase64Decoder, current_focused_os_window_id, get_boss
from .constants import cache_dir, config_dir, is_macos, logo_png_file, standard_icon_names
from .fast_data_types import ESC_OSC, StreamingBase64Decoder, add_timer, base64_decode, current_focused_os_window_id, get_boss, get_options
from .types import run_once
from .typing import WindowType
from .utils import get_custom_window_icon, log_error, sanitize_control_codes
@@ -24,7 +25,8 @@ class IconDataCache:
def __init__(self, base_cache_dir: str = '', max_cache_size: int = 128 * 1024 * 1024):
self.max_cache_size = max_cache_size
self.key_map: 'OrderedDict[str, str]' = OrderedDict()
self.key_map: Dict[str, str] = {}
self.hash_map: 'OrderedDict[str, Set[str]]' = OrderedDict()
self.base_cache_dir = base_cache_dir
self.cache_dir = ''
self.total_size = 0
@@ -60,31 +62,36 @@ class IconDataCache:
with open(path, 'wb') as f:
f.write(data)
self.total_size += len(data)
self.key_map.pop(key, None) # mark this key as being used recently
self.key_map[key] = data_hash
self.hash_map[data_hash] = self.hash_map.pop(data_hash, set()) | {key} # mark this data as being used recently
if key:
self.key_map[key] = data_hash
self.prune()
return path
def get_icon(self, key: str) -> str:
self._ensure_state()
data_hash = self.key_map.pop(key, None)
data_hash = self.key_map.get(key)
if data_hash:
self.key_map[key] = data_hash # mark this key as being used recently
self.hash_map[data_hash] = self.hash_map.pop(data_hash, set()) | {key} # mark this data as being used recently
return os.path.join(self.cache_dir, data_hash)
return ''
def clear(self) -> None:
while self.key_map:
key, data_hash = self.key_map.popitem(False)
self._remove_data_hash(data_hash)
while self.hash_map:
data_hash, keys = self.hash_map.popitem(False)
for key in keys:
self.key_map.pop(key, None)
self._remove_data_file(data_hash)
def prune(self) -> None:
self._ensure_state()
while self.total_size > self.max_cache_size and self.key_map:
key, data_hash = self.key_map.popitem(False)
self._remove_data_hash(data_hash)
while self.total_size > self.max_cache_size and self.hash_map:
data_hash, keys = self.hash_map.popitem(False)
for key in keys:
self.key_map.pop(key, None)
self._remove_data_file(data_hash)
def _remove_data_hash(self, data_hash: str) -> None:
def _remove_data_file(self, data_hash: str) -> None:
path = os.path.join(self.cache_dir, data_hash)
with suppress(FileNotFoundError):
sz = os.path.getsize(path)
@@ -95,7 +102,9 @@ class IconDataCache:
self._ensure_state()
data_hash = self.key_map.pop(key, None)
if data_hash:
self._remove_data_hash(data_hash)
for key in self.hash_map.pop(data_hash, set()):
self.key_map.pop(key, None)
self._remove_data_file(data_hash)
class Urgency(Enum):
@@ -111,6 +120,7 @@ class PayloadType(Enum):
query = '?'
close = 'close'
icon = 'icon'
alive = 'alive'
@property
def is_text(self) -> bool:
@@ -152,7 +162,7 @@ class DataStore:
class EncodedDataStore:
def __init__(self, data_store: DataStore) -> None:
self.decoder = StreamingBase64Decoder(initial_capacity=4096)
self.decoder = StreamingBase64Decoder()
self.data_store = data_store
@property
@@ -168,14 +178,10 @@ class EncodedDataStore:
def add_base64_data(self, data: Union[str, bytes]) -> None:
if isinstance(data, str):
data = data.encode('ascii')
self.decoder.add(data)
if len(self.decoder) >= self.data_store.max_size:
self.data_store(self.decoder.take_output())
self.data_store(self.decoder.decode(data))
def flush_encoded_data(self) -> None:
self.decoder.flush()
if len(self.decoder):
self.data_store(self.decoder.take_output())
self.decoder.reset()
def finalise(self) -> bytes:
self.flush_encoded_data()
@@ -190,19 +196,30 @@ def limit_size(x: str) -> str:
class NotificationCommand:
done: bool = True
identifier: str = ''
channel_id: int = 0
desktop_notification_id: int = -1
# data received from client and eventually displayed/processed
title: str = ''
body: str = ''
actions: FrozenSet[Action] = frozenset((Action.focus,))
only_when: OnlyWhen = OnlyWhen.unset
urgency: Optional[Urgency] = None
close_response_requested: Optional[bool] = None
icon_data_key: str = ''
icon_names: Tuple[str, ...] = ()
application_name: str = ''
notification_types: tuple[str, ...] = ()
timeout: int = -2
# event callbacks
on_activation: Optional[Callable[['NotificationCommand'], None]] = None
on_close: Optional[Callable[['NotificationCommand'], None]] = None
on_update: Optional[Callable[['NotificationCommand', 'NotificationCommand'], None]] = None
# metadata
identifier: str = ''
done: bool = True
channel_id: int = 0
desktop_notification_id: int = -1
close_response_requested: Optional[bool] = None
icon_path: str = ''
icon_name: str = ''
# payload handling
current_payload_type: PayloadType = PayloadType.title
@@ -212,9 +229,6 @@ class NotificationCommand:
created_by_desktop: bool = False
activation_token: str = ''
# event callbacks
on_activation: Optional[Callable[['NotificationCommand'], None]] = None
def __init__(self, icon_data_cache: 'ReferenceType[IconDataCache]', log: 'Log') -> None:
self.icon_data_cache_ref = icon_data_cache
self.log = log
@@ -228,9 +242,12 @@ class NotificationCommand:
return Action.focus in self.actions
def __repr__(self) -> str:
return (
f'NotificationCommand(identifier={self.identifier!r}, title={self.title!r}, body={self.body!r},'
f'actions={self.actions}, done={self.done!r}, urgency={self.urgency})')
fields = {}
for x in ('title', 'body', 'identifier', 'actions', 'urgency', 'done'):
val = getattr(self, x)
if val:
fields[x] = val
return f'NotificationCommand{fields}'
def parse_metadata(self, metadata: str, prev: 'NotificationCommand') -> Tuple[PayloadType, bool]:
payload_type = PayloadType.title
@@ -275,7 +292,25 @@ class NotificationCommand:
elif k == 'g':
self.icon_data_key = sanitize_id(v)
elif k == 'n':
self.icon_name = v
try:
self.icon_names += (base64_decode(v).decode('utf-8'),)
except Exception:
self.log('Ignoring invalid icon name in notification: {v!r}')
elif k == 'f':
try:
self.application_name = base64_decode(v).decode('utf-8')
except Exception:
self.log('Ignoring invalid application_name in notification: {v!r}')
elif k == 't':
try:
self.notification_types += (base64_decode(v).decode('utf-8'),)
except Exception:
self.log('Ignoring invalid notification type in notification: {v!r}')
elif k == 'w':
try:
self.timeout = max(-1, int(v))
except Exception:
self.log('Ignoring invalid timeout in notification: {v!r}')
if not prev.done and prev.identifier == self.identifier:
self.merge_metadata(prev)
return payload_type, payload_is_encoded
@@ -292,8 +327,14 @@ class NotificationCommand:
self.close_response_requested = prev.close_response_requested
if not self.icon_data_key:
self.icon_data_key = prev.icon_data_key
if not self.icon_name:
self.icon_name = prev.icon_name
if prev.icon_names:
self.icon_names = prev.icon_names + self.icon_names
if not self.application_name:
self.application_name = prev.application_name
if prev.notification_types:
self.notification_types = prev.notification_types + self.notification_types
if self.timeout < -1:
self.timeout = prev.timeout
self.icon_path = prev.icon_path
def create_payload_buffer(self, payload_type: PayloadType) -> EncodedDataStore:
@@ -332,12 +373,9 @@ class NotificationCommand:
if truncated:
self.log('Ignoring too long notification icon data')
else:
if self.icon_data_key:
icd = self.icon_data_cache_ref()
if icd:
self.icon_path = icd.add_icon(self.icon_data_key, data)
else:
self.log('Ignoring notification icon data because no icon data key specified')
icd = self.icon_data_cache_ref()
if icd:
self.icon_path = icd.add_icon(self.icon_data_key, data)
def finalise(self) -> None:
if self.current_payload_buffer:
@@ -347,6 +385,37 @@ class NotificationCommand:
icd = self.icon_data_cache_ref()
if icd:
self.icon_path = icd.get_icon(self.icon_data_key)
if self.title:
self.title = sanitize_text(self.title)
self.body = sanitize_text(self.body)
else:
self.title = sanitize_text(self.body)
self.body = ''
self.urgency = Urgency.Normal if self.urgency is None else self.urgency
self.close_response_requested = bool(self.close_response_requested)
self.timeout = max(-1, self.timeout)
def matches_rule_item(self, location:str, query:str) -> bool:
import re
pat = re.compile(query)
if location == 'type':
for x in self.notification_types:
if pat.search(x) is not None:
return True
val = {'title': self.title, 'body': self.body, 'app': self.application_name}[location]
return pat.search(val) is not None
def matches_rule(self, rule: str) -> bool:
if rule == 'all':
return True
from .search_query_parser import search
def get_matches(location: str, query: str, candidates: Set['NotificationCommand']) -> Set['NotificationCommand']:
return {x for x in candidates if x.matches_rule_item(location, query)}
try:
return self in search(rule, ('title', 'body', 'app', 'type'), {self}, get_matches)
except Exception as e:
self.log(f'Ignoring invalid filter_notification rule: {rule} with error: {e}')
return False
class DesktopIntegration:
@@ -360,18 +429,13 @@ class DesktopIntegration:
def initialize(self) -> None:
pass
def query_live_notifications(self, channel_id: int, identifier: str) -> None:
raise NotImplementedError('Implement me in subclass')
def close_notification(self, desktop_notification_id: int) -> bool:
raise NotImplementedError('Implement me in subclass')
def notify(self,
title: str,
body: str,
timeout: int = -1,
application: str = 'kitty',
icon_name: str = '', icon_path: str = '',
subtitle: Optional[str] = None,
urgency: Urgency = Urgency.Normal,
) -> int:
def notify(self, nc: NotificationCommand, existing_desktop_notification_id: Optional[int]) -> int:
raise NotImplementedError('Implement me in subclass')
def on_new_version_notification_activation(self, cmd: NotificationCommand) -> None:
@@ -385,16 +449,26 @@ class DesktopIntegration:
i = f'i={identifier or "0"}:'
p = ','.join(x.value for x in PayloadType if x.value)
c = ':c=1' if self.supports_close_events else ''
return f'99;{i}p=?;a={actions}:o={when}:u={urgency}:p={p}{c}'
return f'99;{i}p=?;a={actions}:o={when}:u={urgency}:p={p}{c}:w=1'
class MacOSIntegration(DesktopIntegration):
supports_close_events: bool = False
def initialize(self) -> None:
from .fast_data_types import cocoa_set_notification_activated_callback
self.id_counter = count(start=1)
self.live_notification_queries: List[Tuple[int, str]] = []
cocoa_set_notification_activated_callback(self.notification_activated)
def query_live_notifications(self, channel_id: int, identifier: str) -> None:
from .fast_data_types import cocoa_live_delivered_notifications
if not cocoa_live_delivered_notifications():
self.notification_manager.send_live_response(channel_id, identifier, ())
else:
self.live_notification_queries.append((channel_id, identifier))
def close_notification(self, desktop_notification_id: int) -> bool:
from .fast_data_types import cocoa_remove_delivered_notification
close_succeeded = cocoa_remove_delivered_notification(str(desktop_notification_id))
@@ -402,24 +476,30 @@ class MacOSIntegration(DesktopIntegration):
log_error(f'Close request for {desktop_notification_id=} {"succeeded" if close_succeeded else "failed"}')
return close_succeeded
def notify(self,
title: str,
body: str,
timeout: int = -1,
application: str = 'kitty',
icon_name: str = '', icon_path: str = '',
subtitle: Optional[str] = None,
urgency: Urgency = Urgency.Normal,
) -> int:
desktop_notification_id = next(self.id_counter)
def notify(self, nc: NotificationCommand, existing_desktop_notification_id: Optional[int]) -> int:
desktop_notification_id = existing_desktop_notification_id or next(self.id_counter)
from .fast_data_types import cocoa_send_notification
# If the body is not set macos makes the title the body and uses
# "kitty" as the title. So use a single space for the body in this
# case.
cocoa_send_notification(str(desktop_notification_id), title, body or ' ', subtitle, urgency.value)
# case. Although https://developer.apple.com/documentation/usernotifications/unnotificationcontent/body?language=objc
# says printf style strings are stripped this does not actually happen,
# so dont double %
# for %% escaping.
body = (nc.body or ' ')
assert nc.urgency is not None
cocoa_send_notification(str(desktop_notification_id), nc.title, body, nc.urgency.value)
return desktop_notification_id
def notification_activated(self, event: str, ident: str) -> None:
if event == 'live':
if debug_desktop_integration:
log_error('Got list of live notifications:', ident)
live_ids = tuple(int(x) for x in ident.split(',') if x)
self.notification_manager.purge_dead_notifications(live_ids)
self.live_notification_queries, queries = [], self.live_notification_queries
for channel_id, req_id in queries:
self.notification_manager.send_live_response(channel_id, req_id, live_ids)
return
if debug_desktop_integration:
log_error(f'Notification {ident} {event=}')
try:
@@ -429,9 +509,11 @@ class MacOSIntegration(DesktopIntegration):
return
if event == "created":
self.notification_manager.notification_created(desktop_notification_id)
from .fast_data_types import cocoa_live_delivered_notifications
cocoa_live_delivered_notifications() # so that we purge dead notifications
elif event == "activated":
self.notification_manager.notification_activated(desktop_notification_id)
elif event == "closed":
elif event == "creation_failed":
self.notification_manager.notification_closed(desktop_notification_id)
@@ -442,7 +524,11 @@ class FreeDesktopIntegration(DesktopIntegration):
dbus_set_notification_callback(self.dispatch_event_from_desktop)
# map the id returned by the notification daemon to the
# desktop_notification_id we use for the notification
self.creation_id_map: 'OrderedDict[int, int]' = OrderedDict()
self.dbus_to_desktop: 'OrderedDict[int, int]' = OrderedDict()
self.desktop_to_dbus: Dict[int, int] = {}
def query_live_notifications(self, channel_id: int, identifier: str) -> None:
self.notification_manager.send_live_response(channel_id, identifier, tuple(self.desktop_to_dbus))
def close_notification(self, desktop_notification_id: int) -> bool:
from .fast_data_types import dbus_close_notification
@@ -454,28 +540,32 @@ class FreeDesktopIntegration(DesktopIntegration):
return close_succeeded
def get_desktop_notification_id(self, dbus_notification_id: int, event: str) -> Optional[int]:
q = self.creation_id_map.get(dbus_notification_id)
q = self.dbus_to_desktop.get(dbus_notification_id)
if q is None:
if debug_desktop_integration:
log_error(f'Could not find desktop_notification_id for {dbus_notification_id=} for event {event}')
return q
def get_dbus_notification_id(self, desktop_notification_id: int, event: str) ->Optional[int]:
for dbus_id, q in self.creation_id_map.items():
if q == desktop_notification_id:
return dbus_id
if debug_desktop_integration:
log_error(f'Could not find dbus_notification_id for {desktop_notification_id=} for event {event}')
return None
q = self.desktop_to_dbus.get(desktop_notification_id)
if q is None:
if debug_desktop_integration:
log_error(f'Could not find dbus_notification_id for {desktop_notification_id=} for event {event}')
return q
def created(self, dbus_notification_id: int, desktop_notification_id: int) -> None:
self.dbus_to_desktop[desktop_notification_id] = dbus_notification_id
self.desktop_to_dbus[dbus_notification_id] = desktop_notification_id
if len(self.dbus_to_desktop) > 128:
k, v = self.dbus_to_desktop.popitem(False)
self.desktop_to_dbus.pop(v, None)
self.notification_manager.notification_created(dbus_notification_id)
def dispatch_event_from_desktop(self, event_type: str, dbus_notification_id: int, extra: Union[int, str]) -> None:
if debug_desktop_integration:
log_error(f'Got notification event from desktop: {event_type=} {dbus_notification_id=} {extra=}')
if event_type == 'created':
self.creation_id_map[int(extra)] = dbus_notification_id
if len(self.creation_id_map) > 128:
self.creation_id_map.popitem(False)
self.notification_manager.notification_created(dbus_notification_id)
self.created(dbus_notification_id, int(extra))
return
if desktop_notification_id := self.get_desktop_notification_id(dbus_notification_id, event_type):
if event_type == 'activation_token':
@@ -485,21 +575,38 @@ class FreeDesktopIntegration(DesktopIntegration):
elif event_type == 'closed':
self.notification_manager.notification_closed(desktop_notification_id)
def notify(self,
title: str,
body: str,
timeout: int = -1,
application: str = 'kitty',
icon_name: str = '', icon_path: str = '',
subtitle: Optional[str] = None,
urgency: Urgency = Urgency.Normal,
) -> int:
def notify(self, nc: NotificationCommand, existing_desktop_notification_id: Optional[int]) -> int:
from .fast_data_types import dbus_send_notification
app_icon = icon_name or icon_path or get_custom_window_icon()[1] or logo_png_file
from .xdg import icon_exists, icon_for_appname
app_icon = ''
if nc.icon_names:
for name in nc.icon_names:
if sn := standard_icon_names.get(name):
app_icon = sn
break
if icon_exists(name):
app_icon = name
break
if not app_icon:
app_icon = nc.icon_path or nc.icon_names[0]
else:
app_icon = nc.icon_path or icon_for_appname(nc.application_name)
if not app_icon:
app_icon = get_custom_window_icon()[1] or logo_png_file
body = nc.body.replace('<', '<\u200c').replace('&', '&\u200c') # prevent HTML markup from being recognized
assert nc.urgency is not None
replaces_dbus_id = 0
if existing_desktop_notification_id:
replaces_dbus_id = self.get_dbus_notification_id(existing_desktop_notification_id, 'notify') or 0
desktop_notification_id = dbus_send_notification(
app_name=application, app_icon=app_icon, title=title, body=body, timeout=timeout, urgency=urgency.value)
app_name=nc.application_name or 'kitty', app_icon=app_icon, title=nc.title, body=body, timeout=nc.timeout,
urgency=nc.urgency.value, replaces=replaces_dbus_id)
if debug_desktop_integration:
log_error(f'Created notification with {desktop_notification_id=}')
log_error(f'Requested creation of notification with {desktop_notification_id=}')
if existing_desktop_notification_id and replaces_dbus_id:
self.dbus_to_desktop.pop(replaces_dbus_id, None)
self.desktop_to_dbus.pop(existing_desktop_notification_id, None)
return desktop_notification_id
@@ -564,7 +671,8 @@ class NotificationManager:
channel: Channel = Channel(),
log: Log = Log(),
debug: bool = False,
base_cache_dir: str = ''
base_cache_dir: str = '',
cleanup_at_exit: bool = True,
):
global debug_desktop_integration
debug_desktop_integration = debug
@@ -576,7 +684,19 @@ class NotificationManager:
self.base_cache_dir = base_cache_dir
self.log = log
self.icon_data_cache = IconDataCache(base_cache_dir=self.base_cache_dir)
script_path = os.path.join(config_dir, 'notifications.py')
self.filter_script: Callable[[NotificationCommand], bool] = lambda nc: False
if os.path.exists(script_path):
import runpy
try:
m = runpy.run_path(script_path)
self.filter_script = m['main']
except Exception as e:
self.log(f'Failed to load {script_path} with error: {e}')
self.reset()
if cleanup_at_exit:
import atexit
atexit.register(self.cleanup)
def reset(self) -> None:
self.icon_data_cache.clear()
@@ -587,6 +707,8 @@ class NotificationManager:
def notification_created(self, desktop_notification_id: int) -> None:
if n := self.in_progress_notification_commands.get(desktop_notification_id):
n.created_by_desktop = True
if n.timeout > 0:
add_timer(partial(self.expire_notification, desktop_notification_id, id(n)), n.timeout / 1000, False)
def notification_activation_token_received(self, desktop_notification_id: int, token: str) -> None:
if n := self.in_progress_notification_commands.get(desktop_notification_id):
@@ -605,13 +727,27 @@ class NotificationManager:
try:
n.on_activation(n)
except Exception as e:
self.log(e)
self.log('Notification on_activation handler failed with error:', e)
def notification_replaced(self, old_cmd: NotificationCommand, new_cmd: NotificationCommand) -> None:
if old_cmd.desktop_notification_id != new_cmd.desktop_notification_id:
self.in_progress_notification_commands.pop(old_cmd.desktop_notification_id, None)
if old_cmd.on_update is not None:
try:
old_cmd.on_update(old_cmd, new_cmd)
except Exception as e:
self.log('Notification on_update handler failed with error:', e)
def notification_closed(self, desktop_notification_id: int) -> None:
if n := self.in_progress_notification_commands.get(desktop_notification_id):
self.purge_notification(n)
if n.close_response_requested:
self.send_closed_response(n.channel_id, n.identifier)
if n.on_close is not None:
try:
n.on_close(n)
except Exception as e:
self.log('Notification on_close handler failed with error:', e)
def create_notification_cmd(self) -> NotificationCommand:
return NotificationCommand(ref(self.icon_data_cache), self.log)
@@ -634,7 +770,7 @@ class NotificationManager:
cmd.on_activation = self.desktop_integration.on_new_version_notification_activation
self.notify_with_command(cmd, 0)
def is_notification_allowed(self, cmd: NotificationCommand, channel_id: int) -> bool:
def is_notification_allowed(self, cmd: NotificationCommand, channel_id: int, apply_filter_rules: bool = True) -> bool:
if cmd.only_when is not OnlyWhen.always and cmd.only_when is not OnlyWhen.unset:
ui_state = self.channel.ui_state(channel_id)
if ui_state.has_keyboard_focus:
@@ -643,21 +779,42 @@ class NotificationManager:
return False
return True
@property
def filter_rules(self) -> Iterator[str]:
return iter(get_options().filter_notification.keys())
def is_notification_filtered(self, cmd: NotificationCommand) -> bool:
if self.filter_script(cmd):
self.log(f'Notification {cmd.title!r} filtered out by script')
return True
for rule in self.filter_rules:
if cmd.matches_rule(rule):
self.log(f'Notification {cmd.title!r} filtered out by filter_notification rule: {rule}')
return True
return False
def notify_with_command(self, cmd: NotificationCommand, channel_id: int) -> Optional[int]:
cmd.channel_id = channel_id
cmd.finalise()
title = cmd.title or cmd.body
body = cmd.body if cmd.title else ''
if not title or not self.is_notification_allowed(cmd, channel_id):
if not cmd.title or not self.is_notification_allowed(cmd, channel_id) or self.is_notification_filtered(cmd):
return None
urgency = Urgency.Normal if cmd.urgency is None else cmd.urgency
desktop_notification_id = self.desktop_integration.notify(
title=sanitize_text(title), body=sanitize_text(body), urgency=urgency,
icon_name=cmd.icon_name, icon_path=cmd.icon_path,
)
existing_desktop_notification_id: Optional[int] = None
existing_cmd = self.in_progress_notification_commands_by_client_id.get(cmd.identifier) if cmd.identifier else None
if existing_cmd:
existing_desktop_notification_id = existing_cmd.desktop_notification_id
desktop_notification_id = self.desktop_integration.notify(cmd, existing_desktop_notification_id)
self.register_in_progress_notification(cmd, desktop_notification_id)
if existing_cmd:
self.notification_replaced(existing_cmd, cmd)
if not self.desktop_integration.supports_close_events and cmd.close_response_requested:
self.send_closed_response(channel_id, cmd.identifier, untracked=True)
return desktop_notification_id
def expire_notification(self, desktop_notification_id: int, command_id: int, timer_id: int) -> None:
if n := self.in_progress_notification_commands.get(desktop_notification_id):
if id(n) == command_id:
self.desktop_integration.close_notification(desktop_notification_id)
def register_in_progress_notification(self, cmd: NotificationCommand, desktop_notification_id: int) -> None:
cmd.desktop_notification_id = desktop_notification_id
self.in_progress_notification_commands[desktop_notification_id] = cmd
@@ -680,6 +837,10 @@ class NotificationManager:
if payload_type is PayloadType.query:
self.channel.send(channel_id, self.desktop_integration.query_response(cmd.identifier))
return None
if payload_type is PayloadType.alive:
if cmd.identifier:
self.desktop_integration.query_live_notifications(channel_id, cmd.identifier)
return None
if payload_type is PayloadType.close:
if cmd.identifier:
to_close = self.in_progress_notification_commands_by_client_id.get(cmd.identifier)
@@ -697,8 +858,21 @@ class NotificationManager:
cmd.set_payload(payload_type, payload_is_encoded, payload, prev_cmd)
return cmd
def send_closed_response(self, channel_id: int, client_id: str) -> None:
self.channel.send(channel_id, f'99;i={client_id}:p=close;')
def send_closed_response(self, channel_id: int, client_id: str, untracked: bool = False) -> None:
payload = 'untracked' if untracked else ''
self.channel.send(channel_id, f'99;i={client_id}:p={PayloadType.close.value};{payload}')
def send_live_response(self, channel_id: int, client_id: str, live_desktop_ids: Sequence[int]) -> None:
ids = []
for desktop_notification_id in live_desktop_ids:
if n := self.in_progress_notification_commands.get(desktop_notification_id):
if n.identifier and n.channel_id == channel_id:
ids.append(n.identifier)
self.channel.send(channel_id, f'99;i={client_id}:p={PayloadType.alive.value};{",".join(ids)}')
def purge_dead_notifications(self, live_desktop_ids: Sequence[int]) -> None:
for d in set(self.in_progress_notification_commands) - set(live_desktop_ids):
self.purge_notification(self.in_progress_notification_commands[d])
def purge_notification(self, cmd: NotificationCommand) -> None:
self.in_progress_notification_commands_by_client_id.pop(cmd.identifier, None)
@@ -722,3 +896,6 @@ class NotificationManager:
parts = raw.split(';', 1)
n.title, n.body = parts[0], (parts[1] if len(parts) > 1 else '')
self.notify_with_command(n, channel_id)
def cleanup(self) -> None:
del self.icon_data_cache

View File

@@ -3035,6 +3035,29 @@ The value of :code:`VAR2` will be :code:`<path to home directory>/a/b`.
'''
)
opt('+filter_notification', '', option_type='filter_notification', add_to_default=False, long_text='''
Specify rules to filter out notifications sent by applications running in kitty.
Can be specified multiple times to create multiple filter rules. A rule specification
is of the form :code:`field:regexp`. A filter rule
can match on any of the fields: :code:`title`, :code:`body`, :code:`app`, :code:`type`.
The special value of :code:`all` filters out all notifications. Rules can be combined
using Boolean operators. Some examples::
filter_notification title:hello or body:"abc.*def"
# filter out notification from vim except for ones about updates, (?i)
# makes matching case insesitive.
filter_notification app:"[ng]?vim" and not body:"(?i)update"
# filter out all notifications
filter_notification all
The field :code:`app` is the name of the application sending the notification and :code:`type`
is the type of the notification. Not all applications will send these fields, so you can also
match on the title and body of the notification text. More sophisticated programmatic filtering
and custom actions on notifications can be done by creating a notifications.py file in the
kitty config directory (:file:`~/.config/kitty`). An annotated sample is
:link:`available <https://github.com/kovidgoyal/kitty/blob/master/docs/notifications.py>`.
''')
opt('+watcher', '',
option_type='store_multiple',
add_to_default=False,

23
kitty/options/parse.py generated
View File

@@ -12,15 +12,15 @@ from kitty.options.utils import (
config_or_absolute_path, copy_on_select, cursor_blink_interval, cursor_text_color,
deprecated_adjust_line_height, deprecated_hide_window_decorations_aliases,
deprecated_macos_show_window_title_in_menubar_alias, deprecated_send_text, disable_ligatures,
edge_width, env, font_features, hide_window_decorations, macos_option_as_alt, macos_titlebar_color,
menu_map, modify_font, narrow_symbols, notify_on_cmd_finish, optional_edge_width, parse_font_spec,
parse_map, parse_mouse_map, paste_actions, remote_control_password, resize_debounce_time,
scrollback_lines, scrollback_pager_history_size, shell_integration, store_multiple, symbol_map,
tab_activity_symbol, tab_bar_edge, tab_bar_margin_height, tab_bar_min_tabs, tab_fade,
tab_font_style, tab_separator, tab_title_template, titlebar_color, to_cursor_shape,
to_cursor_unfocused_shape, to_font_size, to_layout_names, to_modifiers, url_prefixes, url_style,
visual_bell_duration, visual_window_select_characters, window_border_width, window_logo_scale,
window_size
edge_width, env, filter_notification, font_features, hide_window_decorations, macos_option_as_alt,
macos_titlebar_color, menu_map, modify_font, narrow_symbols, notify_on_cmd_finish,
optional_edge_width, parse_font_spec, parse_map, parse_mouse_map, paste_actions,
remote_control_password, resize_debounce_time, scrollback_lines, scrollback_pager_history_size,
shell_integration, store_multiple, symbol_map, tab_activity_symbol, tab_bar_edge,
tab_bar_margin_height, tab_bar_min_tabs, tab_fade, tab_font_style, tab_separator,
tab_title_template, titlebar_color, to_cursor_shape, to_cursor_unfocused_shape, to_font_size,
to_layout_names, to_modifiers, url_prefixes, url_style, visual_bell_duration,
visual_window_select_characters, window_border_width, window_logo_scale, window_size
)
@@ -976,6 +976,10 @@ class Parser:
def file_transfer_confirmation_bypass(self, val: str, ans: typing.Dict[str, typing.Any]) -> None:
ans['file_transfer_confirmation_bypass'] = str(val)
def filter_notification(self, val: str, ans: typing.Dict[str, typing.Any]) -> None:
for k, v in filter_notification(val, ans["filter_notification"]):
ans["filter_notification"][k] = v
def focus_follows_mouse(self, val: str, ans: typing.Dict[str, typing.Any]) -> None:
ans['focus_follows_mouse'] = to_bool(val)
@@ -1448,6 +1452,7 @@ def create_result_dict() -> typing.Dict[str, typing.Any]:
'action_alias': {},
'env': {},
'exe_search_path': {},
'filter_notification': {},
'font_features': {},
'kitten_alias': {},
'menu_map': {},

View File

@@ -347,6 +347,7 @@ option_names = ( # {{{
'env',
'exe_search_path',
'file_transfer_confirmation_bypass',
'filter_notification',
'focus_follows_mouse',
'font_family',
'font_features',
@@ -633,6 +634,7 @@ class Options:
action_alias: typing.Dict[str, str] = {}
env: typing.Dict[str, str] = {}
exe_search_path: typing.Dict[str, str] = {}
filter_notification: typing.Dict[str, str] = {}
font_features: typing.Dict[str, typing.Tuple[kitty.fast_data_types.ParsedFontFeature, ...]] = {}
kitten_alias: typing.Dict[str, str] = {}
menu_map: typing.Dict[typing.Tuple[str, ...], str] = {}
@@ -755,6 +757,7 @@ defaults = Options()
defaults.action_alias = {}
defaults.env = {}
defaults.exe_search_path = {}
defaults.filter_notification = {}
defaults.font_features = {}
defaults.kitten_alias = {}
defaults.menu_map = {}

View File

@@ -787,6 +787,10 @@ def config_or_absolute_path(x: str, env: Optional[Dict[str, str]] = None) -> Opt
return resolve_abs_or_config_path(x, env)
def filter_notification(val: str, current_val: Dict[str, str]) -> Iterable[Tuple[str, str]]:
yield val, ''
def remote_control_password(val: str, current_val: Dict[str, str]) -> Iterable[Tuple[str, Sequence[str]]]:
val = val.strip()
if val:

137
kitty/xdg.py Normal file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python
# License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net>
import os
import re
from contextlib import suppress
from kitty.types import run_once
@run_once
def xdg_data_dirs() -> tuple[str, ...]:
return tuple(os.environ.get('XDG_DATA_DIRS', '/usr/local/share/:/usr/share/').split(os.pathsep))
@run_once
def icon_dirs() -> list[str]:
ans = []
def a(x: str) -> None:
if os.path.isdir(x):
ans.append(x)
a(os.path.expanduser('~/.icons'))
for x in xdg_data_dirs():
a(os.path.join(x, 'icons'))
return ans
class XDGIconCache:
def __init__(self) -> None:
self.existing_icon_names: set[str] = set()
self.scanned = False
def find_inherited_themes(self, basedir: str, seen_indexes: set[str], themes_to_search: set[str]) -> bool:
if basedir not in seen_indexes:
seen_indexes.add(basedir)
with suppress(OSError), open(os.path.join(basedir, 'index.theme')) as f:
raw = f.read()
if m := re.search(r'^Inherits\s*=\s*(.+?)$', raw, re.MULTILINE):
for x in m.group(1).split(','):
themes_to_search.add(x.strip())
return True
return False
def scan(self) -> None:
themes_to_search: set[str] = set()
self.scanned = True
seen_indexes: set[str] = set()
for icdir in icon_dirs():
if self.find_inherited_themes(os.path.join(icdir, 'default'), seen_indexes, themes_to_search):
break
themes_to_search.add('hicolor')
while True:
before = len(themes_to_search)
for icdir in icon_dirs():
for theme in tuple(themes_to_search):
self.find_inherited_themes(os.path.join(icdir, theme), seen_indexes, themes_to_search)
if len(themes_to_search) == before:
break
for icdir in icon_dirs():
for theme in themes_to_search:
self.scan_theme_dir(os.path.join(icdir, theme))
self.scan_theme_dir('/usr/share/pixmaps')
def scan_theme_dir(self, base: str) -> None:
with suppress(OSError):
for (dirpath, dirnames, filenames) in os.walk(base):
for q in filenames:
icon_name, sep, ext = q.lower().rpartition('.')
if sep == '.' and ext in ('svg', 'png', 'xpm'):
self.existing_icon_names.add(icon_name)
def icon_exists(self, name: str) -> bool:
if not self.scanned:
self.scan()
return name.lower() in self.existing_icon_names
xdg_icon_cache = XDGIconCache()
icon_exists = xdg_icon_cache.icon_exists
class AppIconCache:
def __init__(self) -> None:
self.scanned = False
self.lcase_app_name_to_path: dict[str, str] = {}
self.lcase_full_name_to_path: dict[str, str] = {}
self.icon_name_cache: dict[str, str] = {}
def scan(self) -> None:
self.scanned = True
for d in xdg_data_dirs():
d = os.path.join(d, 'applications')
with suppress(OSError):
for (dirpath, dirnames, filenames) in os.walk(d):
for fname in filenames:
if fname.endswith('.desktop'):
path = os.path.join(dirpath, fname)
self.process_desktop_file(path, os.path.relpath(path, d))
def process_desktop_file(self, path: str, relpath: str) -> None:
# file_id = relpath.replace('/', '-')
bname = os.path.basename(relpath)
parts = bname.split('.')[:-1]
appname = parts[-1]
self.lcase_app_name_to_path[appname.lower()] = path
self.lcase_full_name_to_path['.'.join(parts).lower()] = path
def icon_for_appname(self, appname: str) -> str:
if not self.scanned:
self.scan()
q = appname.lower()
if not appname or q in ('kitty', 'kitten', 'kitten-notify'):
return ''
path = self.lcase_full_name_to_path.get(q) or self.lcase_app_name_to_path.get(q)
if not path:
return ''
ans = self.icon_name_cache.get(path)
if ans is None:
try:
ans = self.icon_name_cache[path] = self.icon_name_from_desktop_file(path)
except OSError:
ans = self.icon_name_cache[path] = ''
return ans
def icon_name_from_desktop_file(self, path: str) -> str:
with open(path) as f:
raw = f.read()
if m := re.search(r'^Icon\s*=\s*(.+?)\s*?$', raw, re.MULTILINE):
return m.group(1)
return ''
app_icon_cache = AppIconCache()
icon_for_appname = app_icon_cache.icon_for_appname

View File

@@ -10,14 +10,17 @@ from . import BaseTest
class TestClipboard(BaseTest):
def test_clipboard_write_request(self):
wr = WriteRequest(max_size=64)
wr.add_base64_data('bGlnaHQgd29yaw')
self.ae(bytes(wr.current_leftover_bytes), b'aw')
wr.flush_base64_data()
self.ae(wr.data_for(), b'light work')
wr = WriteRequest(max_size=64)
wr.add_base64_data('bGlnaHQgd29yaw==')
self.ae(wr.data_for(), b'light work')
def t(data, expected):
wr = WriteRequest(max_size=64)
wr.add_base64_data(data)
self.ae(wr.data_for(), expected)
t('dGl0bGU=', b'title')
t('dGl0bGU', b'title')
t('dGl0bG', b'titl')
t('dGl0bG==', b'titl')
t('dGl0b', b'tit')
t('bGlnaHQgd29yaw', b'light work')
t('bGlnaHQgd29yaw==', b'light work')
wr = WriteRequest(max_size=64)
wr.add_base64_data('bGlnaHQgd29')
for x in b'y', b'a', b'y', b'4', b'=':

View File

@@ -6,15 +6,20 @@ import os
import re
import tempfile
from base64 import standard_b64encode
from typing import Optional
from kitty.notifications import Channel, DesktopIntegration, IconDataCache, NotificationManager, UIState, Urgency
from . import BaseTest
def n(title='title', body='', urgency=Urgency.Normal, desktop_notification_id=1, icon_name='', icon_path=''):
return {'title': title, 'body': body, 'urgency': urgency, 'id': desktop_notification_id, 'icon_name': icon_name, 'icon_path': icon_path}
def n(
title='title', body='', urgency=Urgency.Normal, desktop_notification_id=1, icon_names=(), icon_path='',
application_name='', notification_types=(), timeout=-1,
):
return {
'title': title, 'body': body, 'urgency': urgency, 'id': desktop_notification_id, 'icon_names': icon_names, 'icon_path': icon_path,
'application_name': application_name, 'notification_types': notification_types, 'timeout': timeout
}
class DesktopIntegration(DesktopIntegration):
@@ -28,6 +33,10 @@ class DesktopIntegration(DesktopIntegration):
self.close_succeeds = True
self.counter = 0
def query_live_notifications(self, channel_id, client_id):
ids = [n['id'] for n in self.notifications]
self.notification_manager.send_live_response(channel_id, client_id, tuple(ids))
def on_new_version_notification_activation(self, cmd) -> None:
self.new_version_activated = True
@@ -37,18 +46,14 @@ class DesktopIntegration(DesktopIntegration):
self.notification_manager.notification_closed(desktop_notification_id)
return self.close_succeeds
def notify(self,
title: str,
body: str,
timeout: int = -1,
application: str = 'kitty',
icon_name: str = '', icon_path: str = '',
subtitle: Optional[str] = None,
urgency: Urgency = Urgency.Normal,
) -> int:
self.counter += 1
ans = n(title, body, urgency, self.counter, icon_name)
ans['icon_path'] = os.path.basename(icon_path)
def notify(self, cmd, existing_desktop_notification_id) -> int:
if existing_desktop_notification_id:
did = existing_desktop_notification_id
else:
self.counter += 1
did = self.counter
title, body, urgency = cmd.title, cmd.body, (Urgency.Normal if cmd.urgency is None else cmd.urgency)
ans = n(title, body, urgency, did, cmd.icon_names, os.path.basename(cmd.icon_path), cmd.application_name, cmd.notification_types, timeout=cmd.timeout)
self.notifications.append(ans)
return self.counter
@@ -75,10 +80,17 @@ class Channel(Channel):
self.responses.append(osc_escape_code)
class NotificationManager(NotificationManager):
@property
def filter_rules(self):
yield from ('title:filterme',)
def do_test(self: 'TestNotifications', tdir: str) -> None:
di = DesktopIntegration(None)
ch = Channel()
nm = NotificationManager(di, ch, lambda *a, **kw: None, base_cache_dir=tdir)
nm = NotificationManager(di, ch, lambda *a, **kw: None, base_cache_dir=tdir, cleanup_at_exit=False)
di.notification_manager = nm
def reset():
@@ -97,7 +109,7 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
n = di.notifications[which]
di.close_notification(n['id'])
def assert_events(focus=True, close=0, report='', close_response=''):
def assert_events(focus=True, close=0, report='', close_response='', live=''):
self.ae(ch.focus_events, [''] if focus else [])
if report:
self.assertIn(f'99;i={report};', ch.responses)
@@ -111,6 +123,12 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
for r in ch.responses:
m = re.match(r'99;i=[a-z0-9]+:p=close;', r)
self.assertIsNone(m, f'Unexpectedly found close response: {r}')
if live:
self.assertIn(f'99;i=live:p=alive;{live}', ch.responses)
else:
for r in ch.responses:
m = re.match(r'99;i=[a-z0-9]+:p=alive;', r)
self.assertIsNone(m, f'Unexpectedly found alive response: {r}')
self.ae(di.close_events, [close] if close else [])
h('test it', osc_code=9)
@@ -168,6 +186,12 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
self.ae(di.notifications, [n()])
reset()
# test filtering
h(';title')
h(';filterme please')
self.ae(di.notifications, [n()])
reset()
# test closing interactions with reporting and activation
h('i=c;title')
self.ae(di.notifications, [n()])
@@ -199,6 +223,12 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
assert_events(focus=True, report='c', close=True, close_response='c')
reset()
h('i=a;title')
h('i=b;title')
h('i=live:p=alive;')
assert_events(focus=False, live='a,b')
reset()
h(';title')
self.ae(di.notifications, [n()])
activate()
@@ -208,7 +238,7 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
# Test querying
h('i=xyz:p=?')
self.assertFalse(di.notifications)
qr = 'a=focus,report:o=always,unfocused,invisible:u=0,1,2:p=title,body,?,close,icon:c=1'
qr = 'a=focus,report:o=always,unfocused,invisible:u=0,1,2:p=title,body,?,close,icon,alive:c=1:w=1'
self.ae(ch.responses, [f'99;i=xyz:p=?;{qr}'])
reset()
h('p=?')
@@ -230,6 +260,20 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
self.ae(di.notifications, [n(text, text)])
reset()
# Test application name and notification type
def e(x):
return standard_b64encode(x.encode()).decode()
h(f'i=t:d=0:f={e("app")}:t={e("1")};title')
h(f'i=t:t={e("test")}')
self.ae(di.notifications, [n(application_name='app', notification_types=('1', 'test',))])
reset()
# Test timeout
h('w=3;title')
self.ae(di.notifications, [n(timeout=3)])
reset()
# Test Disk Cache
dc = IconDataCache(base_cache_dir=tdir, max_cache_size=4)
cache_dir = dc._ensure_state()
@@ -243,15 +287,16 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
def send_with_icon(data='', n='', g=''):
m = ''
if n:
m += f'n={n}:'
for x in n.split(','):
m += f'n={standard_b64encode(x.encode()).decode()}:'
if g:
m += f'g={g}:'
h(f'i=9:d=0:{m};title')
h(f'i=9:p=icon;{data}')
dc = nm.icon_data_cache
send_with_icon(n='mycon')
self.ae(di.notifications, [n(icon_name='mycon')])
send_with_icon(n='mycon,ic2')
self.ae(di.notifications, [n(icon_names=('mycon', 'ic2'))])
reset()
send_with_icon(g='gid')
self.ae(di.notifications, [n()])
@@ -259,9 +304,11 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
send_with_icon(g='gid', data='1')
self.ae(di.notifications, [n(icon_path=dc.hash(b'1'))])
send_with_icon(g='gid', n='moose')
self.ae(di.notifications[-1], n(icon_name='moose', icon_path=dc.hash(b'1'), desktop_notification_id=len(di.notifications)))
self.ae(di.notifications[-1], n(icon_names=('moose',), icon_path=dc.hash(b'1')))
send_with_icon(g='gid2', data='2')
self.ae(di.notifications[-1], n(icon_path=dc.hash(b'2'), desktop_notification_id=len(di.notifications)))
self.ae(di.notifications[-1], n(icon_path=dc.hash(b'2')))
send_with_icon(data='3')
self.ae(di.notifications[-1], n(icon_path=dc.hash(b'3')))
reset()

View File

@@ -78,10 +78,10 @@ def run_build(args: Any) -> None:
needs_retry = 'arm64' in cmd or building_nightly
if not needs_retry:
raise
print('Build failed, retrying in a few seconds...', file=sys.stderr)
print('Build failed, retrying in a minute seconds...', file=sys.stderr)
if 'macos' in cmd:
call('python ../bypy macos shutdown')
time.sleep(25)
time.sleep(60)
call(cmd, echo=True)
for x in ('64', 'arm64'):

View File

@@ -12,6 +12,7 @@ import (
"kitty/kittens/hints"
"kitty/kittens/hyperlinked_grep"
"kitty/kittens/icat"
"kitty/kittens/notify"
"kitty/kittens/query_terminal"
"kitty/kittens/show_key"
"kitty/kittens/ssh"
@@ -70,8 +71,10 @@ func KittyToolEntryPoints(root *cli.Command) {
ask.EntryPoint(root)
// hints
hints.EntryPoint(root)
// hints
// diff
diff.EntryPoint(root)
// notify
notify.EntryPoint(root)
// themes
themes.EntryPoint(root)
themes.ParseEntryPoint(root)