mirror of
https://github.com/kovidgoyal/kitty
synced 2026-06-14 04:28:00 +02:00
Compare commits
34 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8b54d19326 | ||
|
|
b52275e0b5 | ||
|
|
212d7accfc | ||
|
|
4bc532a2d0 | ||
|
|
9047df5080 | ||
|
|
4ba9fcaf37 | ||
|
|
eb1bb493a7 | ||
|
|
7023e1506b | ||
|
|
8cfe1d7a16 | ||
|
|
10d62a9596 | ||
|
|
8b8e752ece | ||
|
|
ad48ecad40 | ||
|
|
6e35289f8e | ||
|
|
a38153c890 | ||
|
|
896833a4f7 | ||
|
|
9bd155ae50 | ||
|
|
f6a24af229 | ||
|
|
2e829c1349 | ||
|
|
1b1f5656ee | ||
|
|
3bdbf19bb0 | ||
|
|
00737ebf30 | ||
|
|
a19ce66e62 | ||
|
|
674432f886 | ||
|
|
5c1af0fcb1 | ||
|
|
eca487d15f | ||
|
|
d1a8772ac8 | ||
|
|
083f158bbd | ||
|
|
02bc104091 | ||
|
|
2bffea2bdc | ||
|
|
59c175f312 | ||
|
|
67410c317f | ||
|
|
de21e5e488 | ||
|
|
c59ab759a1 | ||
|
|
45a3a655a7 |
@@ -82,6 +82,8 @@ Detailed list of changes
|
||||
|
||||
- Desktop notifications protocol: Add support for closing notifications and querying if the terminal emulator supports the protocol (:iss:`7658`, :iss:`7659`)
|
||||
|
||||
- A new option :opt:`filter_notification` to filter out or perform arbitrary actions on desktop notifications based on sophisticated criteria (:iss:`7670`)
|
||||
|
||||
- A new protocol to allow terminal applications to change colors in the terminal more robustly than with the legacy XTerm protocol (:ref:`color_control`)
|
||||
|
||||
- Sessions: A new command ``focus_matching_window`` to shift focus to a specific window, useful when creating complex layouts with splits (:disc:`7635`)
|
||||
|
||||
@@ -33,7 +33,7 @@ notification from a shell script::
|
||||
To show a message with a title and a body::
|
||||
|
||||
printf '\x1b]99;i=1:d=0;Hello world\x1b\\'
|
||||
printf '\x1b]99;i=1:d=1:p=body;This is cool\x1b\\'
|
||||
printf '\x1b]99;i=1:p=body;This is cool\x1b\\'
|
||||
|
||||
The most important key in the metadata is the ``p`` key, it controls how the
|
||||
payload is interpreted. A value of ``title`` means the payload is setting the
|
||||
@@ -46,17 +46,46 @@ code can be. Chunking is accomplished by the ``i`` and ``d`` keys. The ``i``
|
||||
key is the *notification id* which can be any string containing the characters
|
||||
``[a-zA-Z0-9_-+.]``. The ``d`` key stands for *done* and can only take the
|
||||
values ``0`` and ``1``. A value of ``0`` means the notification is not yet done
|
||||
and the terminal emulator should hold off displaying it. A value of ``1`` means
|
||||
and the terminal emulator should hold off displaying it. A non-zero value means
|
||||
the notification is done, and should be displayed. You can specify the title or
|
||||
body multiple times and the terminal emulator will concatenate them, thereby
|
||||
allowing arbitrarily long text (terminal emulators are free to impose a sensible
|
||||
limit to avoid Denial-of-Service attacks). The size of the payload must be no
|
||||
longer than ``2048`` bytes, *before being encoded*.
|
||||
longer than ``2048`` bytes, *before being encoded* or ``4096`` encoded bytes.
|
||||
|
||||
Both the ``title`` and ``body`` payloads must be either :ref:`safe_utf8` text ,
|
||||
or UTF-8 text that is :ref:`base64` encoded, in which case there must be an
|
||||
``e=1`` key in the metadata to indicate the payload is :ref:`base64`
|
||||
encoded. No HTML or other markup in the plain text is allowed. It is strictly
|
||||
plain text, to be interpreted as such.
|
||||
|
||||
Allowing users to filter notifications
|
||||
-------------------------------------------------------
|
||||
|
||||
Well behaved applications should identify themselves to the terminal
|
||||
by means of two keys ``f`` which is the application name and ``t``
|
||||
which is the notification type. These are free form keys, they can contain
|
||||
any values, their purpose is to allow users to easily filter out
|
||||
notifications they do not want. Both keys must have :ref:`base64`
|
||||
encoded UTF-8 text as their values. The ``t`` key can be specified multiple
|
||||
times, as notifications can have more than one type. See the `freedesktop.org
|
||||
spec
|
||||
<https://specifications.freedesktop.org/notification-spec/notification-spec-latest.html#categories>`__
|
||||
for examples of notification types.
|
||||
|
||||
.. note::
|
||||
The application name should generally be set to the filename of the
|
||||
applications `desktop file
|
||||
<https://specifications.freedesktop.org/desktop-entry-spec/desktop-entry-spec-latest.html#file-naming>`__
|
||||
(without the ``.desktop`` part) or the bundle identifier for a macOS
|
||||
application. While not strictly necessary, this allows the terminal
|
||||
emulator to deduce an icon for the notification when one is not specified.
|
||||
|
||||
.. note::
|
||||
|
||||
|kitty| has sophisticated notification filtering and management
|
||||
capabilities via :opt:`filter_notification`.
|
||||
|
||||
Both the ``title`` and ``body`` payloads must be either UTF-8 encoded plain
|
||||
text with no embedded escape codes, or UTF-8 text that is :rfc:`base64 <4648>`
|
||||
encoded, in which case there must be an ``e=1`` key in the metadata to indicate
|
||||
the payload is :rfc:`base64 <4648>` encoded.
|
||||
|
||||
Being informed when user activates the notification
|
||||
-------------------------------------------------------
|
||||
@@ -80,21 +109,13 @@ off, so for example if you do not want any action, turn off the default
|
||||
|
||||
a=-focus
|
||||
|
||||
Complete specification of all the metadata keys is in the table below. If a
|
||||
terminal emulator encounters a key in the metadata it does not understand,
|
||||
Complete specification of all the metadata keys is in the :ref:`table below <keys_in_notificatons_protocol>`.
|
||||
If a terminal emulator encounters a key in the metadata it does not understand,
|
||||
the key *must* be ignored, to allow for future extensibility of this escape
|
||||
code. Similarly if values for known keys are unknown, the terminal emulator
|
||||
*should* either ignore the entire escape code or perform a best guess effort
|
||||
to display it based on what it does understand.
|
||||
*should* either ignore the entire escape code or perform a best guess effort to
|
||||
display it based on what it does understand.
|
||||
|
||||
.. note::
|
||||
It is possible to extend this escape code to allow specifying an icon for
|
||||
the notification, however, given that some platforms, such as legacy versions
|
||||
of macOS, don't allow displaying custom images on a notification, it was
|
||||
decided to leave it out of the spec for the time being.
|
||||
|
||||
Similarly, features such as scheduled notifications could be added in future
|
||||
revisions.
|
||||
|
||||
Being informed when a notification is closed
|
||||
------------------------------------------------
|
||||
@@ -114,21 +135,42 @@ escape code to inform when the notification is closed::
|
||||
|
||||
If no notification id was specified ``i=0`` will be used.
|
||||
If ``a=report`` is specified and the notification is activated/clicked on
|
||||
then both the activation report and close notification are sent.
|
||||
then both the activation report and close notification are sent. If the notification
|
||||
is updated then the close event is not sent unless the updated notification
|
||||
also requests a close notification.
|
||||
|
||||
.. note:: On macOS the OS does not supply notification
|
||||
closed events to applications. As such close events must be implemented
|
||||
via polling. It is up to the terminal emulator to decide a reasonable
|
||||
time limit for how long to poll, before giving up. kitty polls for 60
|
||||
seconds. Therefore, terminal applications should not rely on close events
|
||||
being authoritative.
|
||||
Note that on some platforms, such as macOS, the OS does not inform applications
|
||||
when notifications are closed, on such platforms, terminals reply with::
|
||||
|
||||
<OSC> 99 ; i=mynotification : p=close ; untracked <terminator>
|
||||
|
||||
This means that the terminal has no way of knowing when the notification is
|
||||
closed. Instead, applications can poll the terminal to determine which
|
||||
notifications are still alive (not closed), with::
|
||||
|
||||
<OSC> 99 ; i=myid : p=alive ; <terminator>
|
||||
|
||||
The terminal will reply with::
|
||||
|
||||
<OSC> 99 ; i=myid : p=alive ; id1,id2,id3 <terminator>
|
||||
|
||||
Here, ``myid`` is present for multiplexer support. The response from the terminal
|
||||
contains a comma separated list of ids that are still alive.
|
||||
|
||||
|
||||
Closing an existing notification
|
||||
----------------------------------
|
||||
Updating or closing an existing notification
|
||||
----------------------------------------------
|
||||
|
||||
.. versionadded:: 0.36.0
|
||||
The ability to close a previous notification was added in kitty 0.36.0
|
||||
The ability to update and close a previous notification was added in kitty 0.36.0
|
||||
|
||||
To update a previous notification simply send a new notification with the same
|
||||
*notification id* (``i`` key) as the one you want to update. If the original
|
||||
notification is still displayed it will be replaced, otherwise a new
|
||||
notification is displayed. This can be used, for example, to show progress of
|
||||
an operation. Note that how smoothly the existing notification is replaced
|
||||
depends on the underlying OS, for example, on Linux the replacement is usually flicker
|
||||
free, on macOS it isn't, because of Apple's design choices.
|
||||
|
||||
To close a previous notification, send::
|
||||
|
||||
@@ -138,6 +180,22 @@ This will close a previous notification with the specified id. If no such
|
||||
notification exists (perhaps because it was already closed or it was activated)
|
||||
then the request is ignored.
|
||||
|
||||
|
||||
Automatically expiring notifications
|
||||
-------------------------------------
|
||||
|
||||
A notification can be marked as expiring (being closed) automatically after
|
||||
a specified number of milliseconds using the ``w`` key. The default if
|
||||
unspecified is ``-1`` which means to use whatever expiry policy the OS has for
|
||||
notifications. A value of ``0`` means the notification should never expire.
|
||||
Values greater than zero specify the number of milliseconds after which the
|
||||
notification should be auto-closed. Note that the value of ``0``
|
||||
is best effort, some platforms honor it and some do not. Positive values
|
||||
are robust, since they can be implemented by the terminal emulator itself,
|
||||
by manually closing the notification after the expiry time. The notification
|
||||
could still be closed before the expiry time by user interaction or OS policy,
|
||||
but it is guaranteed to be closed once the expiry time has passed.
|
||||
|
||||
.. _notifications_query:
|
||||
|
||||
Querying for support
|
||||
@@ -168,24 +226,26 @@ Key Value
|
||||
implements. If no actions are supported, the ``a`` key must be absent from the
|
||||
query response.
|
||||
|
||||
``o`` Comma separated list of occassions from the ``o`` key that the
|
||||
terminal implements. If no occassions are supported, the value
|
||||
``o=always`` must be sent in the query response.
|
||||
``c`` ``c=1`` if the terminal supports close events, otherwise the ``c``
|
||||
must be omitted.
|
||||
|
||||
``u`` Comma separated list of urgency values that the terminal implements.
|
||||
If urgency is not supported, the ``u`` key must be absent from the
|
||||
query response.
|
||||
``o`` Comma separated list of occassions from the ``o`` key that the
|
||||
terminal implements. If no occasions are supported, the value
|
||||
``o=always`` must be sent in the query response.
|
||||
|
||||
``p`` Comma spearated list of supported payload types (i.e. values of the
|
||||
``p`` key that the terminal implements). These must contain at least
|
||||
``title`` and ``body``.
|
||||
|
||||
``c`` ``c=1`` if the terminal supports close events, otherwise the ``c``
|
||||
must be omitted.
|
||||
``u`` Comma separated list of urgency values that the terminal implements.
|
||||
If urgency is not supported, the ``u`` key must be absent from the
|
||||
query response.
|
||||
|
||||
``w`` ``w=1`` if the terminal supports auto expiring of notifications.
|
||||
======= ================================================================================
|
||||
|
||||
In the future, if this protocol expands, more keys might be added. Clients must
|
||||
ignore keys they dont understand in the query response.
|
||||
ignore keys they do not understand in the query response.
|
||||
|
||||
To check if a terminal emulator supports this notifications protocol the best way is to
|
||||
send the above *query action* followed by a request for the `primary device
|
||||
@@ -193,6 +253,8 @@ attributes <https://vt100.net/docs/vt510-rm/DA1.html>`_. If you get back an
|
||||
answer for the device attributes without getting back an answer for the *query
|
||||
action* the terminal emulator does not support this notifications protocol.
|
||||
|
||||
.. _keys_in_notificatons_protocol:
|
||||
|
||||
Specification of all keys used in the protocol
|
||||
--------------------------------------------------
|
||||
|
||||
@@ -205,23 +267,23 @@ Key Value Default Description
|
||||
optional leading
|
||||
``-``
|
||||
|
||||
``c`` ``0`` or ``1`` ``0`` When non-zero an escape code is sent to the application when the notification is closed.
|
||||
|
||||
``d`` ``0`` or ``1`` ``1`` Indicates if the notification is
|
||||
complete or not. A non-zero value
|
||||
means it is complete.
|
||||
|
||||
``e`` ``0`` or ``1`` ``0`` If set to ``1`` means the payload is :rfc:`base64 <4648>` encoded UTF-8,
|
||||
``e`` ``0`` or ``1`` ``0`` If set to ``1`` means the payload is :ref:`base64` encoded UTF-8,
|
||||
otherwise it is plain UTF-8 text with no C0 control codes in it
|
||||
|
||||
``f`` :ref:`base64` ``unset`` The name of the application sending the notification. Can be used to filter out notifications.
|
||||
encoded UTF-8
|
||||
application name
|
||||
|
||||
``i`` ``[a-zA-Z0-9-_+.]`` ``0`` Identifier for the notification. Make these globally unqiue,
|
||||
like an UUID, so that termial multiplxers can
|
||||
like an UUID, so that terminal multiplexers can
|
||||
direct responses to the correct window.
|
||||
|
||||
``p`` One of ``title``, ``title`` Whether the payload is the notification title or body or query. If a
|
||||
``body``, notification has no title, the body will be used as title. Terminal
|
||||
``close``, emulators should ignore payloads of unknown type to allow for future
|
||||
``?`` expansion of this protocol.
|
||||
|
||||
|
||||
``o`` One of ``always``, ``always`` When to honor the notification request. ``unfocused`` means when the window
|
||||
``unfocused`` or the notification is sent on does not have keyboard focus. ``invisible``
|
||||
``invisible`` means the window both is unfocused
|
||||
@@ -229,10 +291,20 @@ Key Value Default Description
|
||||
its OS window is not currently active.
|
||||
``always`` is the default and always honors the request.
|
||||
|
||||
``p`` One of ``title``, ``title`` Whether the payload is the notification title or body or query. If a
|
||||
``body``, notification has no title, the body will be used as title. Terminal
|
||||
``close``, emulators should ignore payloads of unknown type to allow for future
|
||||
``?``, ``alive`` expansion of this protocol.
|
||||
|
||||
``t`` :ref:`base64` ``unset`` The type of the notification. Can be used to filter out notifications.
|
||||
encoded UTF-8
|
||||
notification type
|
||||
|
||||
``u`` ``0, 1 or 2`` ``unset`` The *urgency* of the notification. ``0`` is low, ``1`` is normal and ``2`` is critical.
|
||||
If not specified normal is used.
|
||||
|
||||
``c`` ``0`` or ``1`` ``0`` When non-zero an escape code is sent to the application when the notification is closed.
|
||||
|
||||
``w`` ``>=-1`` ``-1`` The number of milliseconds to auto-close the notification after.
|
||||
======= ==================== ========== =================
|
||||
|
||||
|
||||
@@ -247,3 +319,31 @@ Key Value Default Description
|
||||
|kitty| also supports the `legacy OSC 9 protocol developed by iTerm2
|
||||
<https://iterm2.com/documentation-escape-codes.html>`__ for desktop
|
||||
notifications.
|
||||
|
||||
|
||||
.. _base64:
|
||||
|
||||
Base64
|
||||
---------------
|
||||
|
||||
The base64 encoding used in the this specification is the one defined in
|
||||
:rfc:`4648`. When a base64 payload is chunked, either the chunking should be
|
||||
done before encoding or after. When the chunking is done before encoding, no
|
||||
more than 2048 bytes of data should be encoded per chunk and the encoded data
|
||||
**must** include the base64 padding bytes, if any. When the chunking is done
|
||||
after encoding, each encoded chunk must be no more than 4096 bytes in size.
|
||||
There may or may not be padding bytes at the end of the last chunk, terminals
|
||||
must handle either case.
|
||||
|
||||
|
||||
.. _safe_utf8:
|
||||
|
||||
Escape code safe UTF-8
|
||||
--------------------------
|
||||
|
||||
This must be valid UTF-8 as per the spec in :rfc:`3629`. In addition, in order
|
||||
to make it safe for transmission embedded inside an escape code, it must
|
||||
contain none of the C0 and C1 control characters, that is, the unicode
|
||||
characters: U+0000 (NUL) - U+1F (Unit separator), U+7F (DEL) and U+80 (PAD) - U+9F
|
||||
(APC). Note that in particular, this means that no newlines, carriage returns,
|
||||
tabs, etc. are allowed.
|
||||
|
||||
@@ -116,14 +116,14 @@ this could be achieved using the ssh kitten with :program:`zsh` and
|
||||
hostname myserver-*
|
||||
|
||||
# Setup zsh to read its files from my-conf/zsh
|
||||
env ZDOTDIR $HOME/my-conf/zsh
|
||||
env ZDOTDIR=$HOME/my-conf/zsh
|
||||
copy --dest my-conf/zsh/.zshrc .zshrc
|
||||
copy --dest my-conf/zsh/.zshenv .zshenv
|
||||
# If you use other zsh init files add them in a similar manner
|
||||
|
||||
# Setup vim to read its config from my-conf/vim
|
||||
env VIMINIT $HOME/my-conf/vim/vimrc
|
||||
env VIMRUNTIME $HOME/my-conf/vim
|
||||
env VIMINIT=$HOME/my-conf/vim/vimrc
|
||||
env VIMRUNTIME=$HOME/my-conf/vim
|
||||
copy --dest my-conf/vim .vim
|
||||
copy --dest my-conf/vim/vimrc .vimrc
|
||||
|
||||
|
||||
58
docs/notifications.py
Normal file
58
docs/notifications.py
Normal file
@@ -0,0 +1,58 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# A sample script to process notifications. Save it as
|
||||
# ~/.config/kitty/notifications.py
|
||||
|
||||
import subprocess
|
||||
|
||||
from kitty.notifications import NotificationCommand, Urgency
|
||||
|
||||
|
||||
def log_notification(nc: NotificationCommand) -> None:
|
||||
# Log notifications to /tmp/notifications-log.txt
|
||||
with open('/tmp/notifications-log.txt', 'a') as log:
|
||||
print(f'title: {nc.title}', file=log)
|
||||
print(f'body: {nc.body}', file=log)
|
||||
print(f'app: {nc.application_name}', file=log)
|
||||
print(f'types: {nc.notification_types}', file=log)
|
||||
print('\n', file=log)
|
||||
|
||||
|
||||
def on_notification_activated(nc: NotificationCommand) -> None:
|
||||
# do something when this notification is activated (clicked on)
|
||||
# remember to assign this to the on_activation field in main()
|
||||
pass
|
||||
|
||||
|
||||
def main(nc: NotificationCommand) -> bool:
|
||||
'''
|
||||
This function should return True to filter out the notification
|
||||
'''
|
||||
log_notification(nc)
|
||||
|
||||
# filter out notifications with 'unwanted' in their titles
|
||||
if 'unwanted' in nc.title.lower():
|
||||
return True
|
||||
|
||||
# filter out notifications from the application badapp
|
||||
if nc.application_name == 'badapp':
|
||||
return True
|
||||
|
||||
# filter out low urgency notifications
|
||||
if nc.urgency is Urgency.Low:
|
||||
return True
|
||||
|
||||
# replace some bad text in the notification body
|
||||
nc.body = nc.body.replace('bad text', 'good text')
|
||||
|
||||
# run a script if this notification is from myapp and has
|
||||
# type foo, passing in the title and body as command line args
|
||||
# to the script.
|
||||
if nc.application_name == 'myapp' and 'foo' in nc.notification_types:
|
||||
subprocess.Popen(['/path/to/my/script', nc.title, nc.body])
|
||||
|
||||
# do some arbitrary actions when this notification is activated
|
||||
nc.on_activation = on_notification_activated
|
||||
|
||||
# dont filter out this notification
|
||||
return False
|
||||
@@ -464,9 +464,20 @@ def generate_extra_cli_parser(name: str, spec: str) -> None:
|
||||
print('}')
|
||||
|
||||
|
||||
def kittens_needing_cli_parsers() -> Iterator[str]:
|
||||
for d in os.scandir('kittens'):
|
||||
if not d.is_dir(follow_symlinks=False):
|
||||
continue
|
||||
if os.path.exists(os.path.join(d.path, 'main.py')) and os.path.exists(os.path.join(d.path, 'main.go')):
|
||||
with open(os.path.join(d.path, 'main.py')) as f:
|
||||
raw = f.read()
|
||||
if 'options' in raw:
|
||||
yield d.name
|
||||
|
||||
|
||||
def kitten_clis() -> None:
|
||||
from kittens.runner import get_kitten_conf_docs, get_kitten_extra_cli_parsers
|
||||
for kitten in wrapped_kittens() + ('pager',):
|
||||
for kitten in kittens_needing_cli_parsers():
|
||||
defn = get_kitten_conf_docs(kitten)
|
||||
if defn is not None:
|
||||
generate_conf_parser(kitten, defn)
|
||||
|
||||
2
glfw/glfw3.h
vendored
2
glfw/glfw3.h
vendored
@@ -1317,7 +1317,7 @@ typedef struct GLFWLayerShellConfig {
|
||||
|
||||
typedef struct GLFWDBUSNotificationData {
|
||||
const char *app_name, *icon, *summary, *body, *action_name;
|
||||
int32_t timeout; uint8_t urgency;
|
||||
int32_t timeout; uint8_t urgency; uint32_t replaces;
|
||||
} GLFWDBUSNotificationData;
|
||||
|
||||
/*! @brief The function pointer type for error callbacks.
|
||||
|
||||
3
glfw/linux_notify.c
vendored
3
glfw/linux_notify.c
vendored
@@ -111,7 +111,6 @@ glfw_dbus_send_user_notification(const GLFWDBUSNotificationData *n, GLFWDBusnoti
|
||||
data->next_id = ++notification_id;
|
||||
data->callback = callback; data->data = user_data;
|
||||
if (!data->next_id) data->next_id = ++notification_id;
|
||||
uint32_t replaces_id = 0;
|
||||
|
||||
RAII_MSG(msg, dbus_message_new_method_call(NOTIFICATIONS_SERVICE, NOTIFICATIONS_PATH, NOTIFICATIONS_IFACE, "Notify"));
|
||||
if (!msg) { return 0; }
|
||||
@@ -120,7 +119,7 @@ glfw_dbus_send_user_notification(const GLFWDBUSNotificationData *n, GLFWDBusnoti
|
||||
#define check_call(func, ...) if (!func(__VA_ARGS__)) { _glfwInputError(GLFW_PLATFORM_ERROR, "%s", "Out of memory allocating DBUS message for notification\n"); return 0; }
|
||||
#define APPEND(to, type, val) check_call(dbus_message_iter_append_basic, &to, type, &val);
|
||||
APPEND(args, DBUS_TYPE_STRING, n->app_name)
|
||||
APPEND(args, DBUS_TYPE_UINT32, replaces_id)
|
||||
APPEND(args, DBUS_TYPE_UINT32, n->replaces)
|
||||
APPEND(args, DBUS_TYPE_STRING, n->icon)
|
||||
APPEND(args, DBUS_TYPE_STRING, n->summary)
|
||||
APPEND(args, DBUS_TYPE_STRING, n->body)
|
||||
|
||||
@@ -427,6 +427,28 @@ func specialize_command(hg *cli.Command) {
|
||||
hg.ArgCompleter = cli.CompletionForWrapper("rg")
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
}
|
||||
|
||||
func create_cmd(root *cli.Command, run_func func(*cli.Command, *Options, []string) (int, error)) {
|
||||
ans := root.AddSubCommand(&cli.Command{
|
||||
Name: "hyperlinked_grep",
|
||||
Run: func(cmd *cli.Command, args []string) (int, error) {
|
||||
opts := Options{}
|
||||
err := cmd.GetOptionValues(&opts)
|
||||
if err != nil {
|
||||
return 1, err
|
||||
}
|
||||
return run_func(cmd, &opts, args)
|
||||
},
|
||||
Hidden: true,
|
||||
})
|
||||
specialize_command(ans)
|
||||
clone := root.AddClone(ans.Group, ans)
|
||||
clone.Hidden = false
|
||||
clone.Name = "hyperlinked-grep"
|
||||
}
|
||||
|
||||
func EntryPoint(parent *cli.Command) {
|
||||
create_cmd(parent, main)
|
||||
}
|
||||
|
||||
0
kittens/notify/__init__.py
Normal file
0
kittens/notify/__init__.py
Normal file
321
kittens/notify/main.go
Normal file
321
kittens/notify/main.go
Normal file
@@ -0,0 +1,321 @@
|
||||
package notify
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"image"
|
||||
"io"
|
||||
"os"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"kitty/tools/cli"
|
||||
"kitty/tools/tty"
|
||||
"kitty/tools/tui/loop"
|
||||
"kitty/tools/utils"
|
||||
)
|
||||
|
||||
var _ = fmt.Print
|
||||
|
||||
const ESC_CODE_PREFIX = "\x1b]99;"
|
||||
const ESC_CODE_SUFFIX = "\x1b\\"
|
||||
const CHUNK_SIZE = 4096
|
||||
|
||||
func b64encode(x string) string {
|
||||
return base64.RawStdEncoding.EncodeToString(utils.UnsafeStringToBytes(x))
|
||||
}
|
||||
|
||||
func check_id_valid(x string) bool {
|
||||
pat := utils.MustCompile(`[^a-zA-Z0-9_+.-]`)
|
||||
return pat.ReplaceAllString(x, "") == x
|
||||
}
|
||||
|
||||
type parsed_data struct {
|
||||
opts *Options
|
||||
wait_till_closed bool
|
||||
expire_time time.Duration
|
||||
title, body, identifier string
|
||||
image_data []byte
|
||||
}
|
||||
|
||||
func (p *parsed_data) create_metadata() string {
|
||||
ans := []string{}
|
||||
if p.opts.AppName != "" {
|
||||
ans = append(ans, "f="+b64encode(p.opts.AppName))
|
||||
}
|
||||
switch p.opts.Urgency {
|
||||
case "low":
|
||||
ans = append(ans, "u=0")
|
||||
case "critical":
|
||||
ans = append(ans, "u=2")
|
||||
}
|
||||
if p.expire_time >= 0 {
|
||||
ans = append(ans, "w="+strconv.FormatInt(p.expire_time.Milliseconds(), 10))
|
||||
}
|
||||
if p.opts.Type != "" {
|
||||
ans = append(ans, "t="+b64encode(p.opts.Type))
|
||||
}
|
||||
if p.wait_till_closed {
|
||||
ans = append(ans, "c=1:a=report")
|
||||
}
|
||||
for _, x := range p.opts.Icon {
|
||||
ans = append(ans, "n="+b64encode(x))
|
||||
}
|
||||
if p.opts.IconCacheId != "" {
|
||||
ans = append(ans, "g="+p.opts.IconCacheId)
|
||||
}
|
||||
m := strings.Join(ans, ":")
|
||||
if m != "" {
|
||||
m = ":" + m
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
var debugprintln = tty.DebugPrintln
|
||||
|
||||
func (p *parsed_data) generate_chunks(callback func(string)) {
|
||||
prefix := ESC_CODE_PREFIX + "i=" + p.identifier
|
||||
write_chunk := func(middle string) {
|
||||
callback(prefix + middle + ESC_CODE_SUFFIX)
|
||||
}
|
||||
|
||||
add_payload := func(payload_type, payload string) {
|
||||
if payload == "" {
|
||||
return
|
||||
}
|
||||
p := utils.IfElse(payload_type == "title", "", ":p="+payload_type)
|
||||
payload = b64encode(payload)
|
||||
for len(payload) > 0 {
|
||||
chunk := payload[:min(CHUNK_SIZE, len(payload))]
|
||||
payload = utils.IfElse(len(payload) > len(chunk), payload[len(chunk):], "")
|
||||
write_chunk(":d=0:e=1" + p + ";" + chunk)
|
||||
}
|
||||
}
|
||||
metadata := p.create_metadata()
|
||||
write_chunk(":d=0" + metadata + ";")
|
||||
add_payload("title", p.title)
|
||||
add_payload("body", p.body)
|
||||
if len(p.image_data) > 0 {
|
||||
add_payload("icon", utils.UnsafeBytesToString(p.image_data))
|
||||
}
|
||||
write_chunk(";")
|
||||
}
|
||||
|
||||
func (p *parsed_data) run_loop() (err error) {
|
||||
lp, err := loop.New(loop.NoAlternateScreen, loop.NoRestoreColors, loop.NoMouseTracking)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
activated := ""
|
||||
prefix := ESC_CODE_PREFIX + "i=" + p.identifier
|
||||
|
||||
poll_for_close := func() {
|
||||
lp.AddTimer(time.Millisecond*50, false, func(_ loop.IdType) error {
|
||||
lp.QueueWriteString(prefix + ":p=alive;" + ESC_CODE_SUFFIX)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
lp.OnInitialize = func() (string, error) {
|
||||
p.generate_chunks(func(x string) { lp.QueueWriteString(x) })
|
||||
return "", nil
|
||||
}
|
||||
lp.OnEscapeCode = func(ect loop.EscapeCodeType, data []byte) error {
|
||||
if ect == loop.OSC && bytes.HasPrefix(data, []byte(ESC_CODE_PREFIX[2:])) {
|
||||
raw := utils.UnsafeBytesToString(data[len(ESC_CODE_PREFIX[2:]):])
|
||||
metadata, payload, _ := strings.Cut(raw, ";")
|
||||
sent_identifier, payload_type := "", ""
|
||||
for _, x := range strings.Split(metadata, ":") {
|
||||
key, val, _ := strings.Cut(x, "=")
|
||||
switch key {
|
||||
case "i":
|
||||
sent_identifier = val
|
||||
case "p":
|
||||
payload_type = val
|
||||
}
|
||||
}
|
||||
if sent_identifier == p.identifier {
|
||||
switch payload_type {
|
||||
case "close":
|
||||
if payload == "untracked" {
|
||||
poll_for_close()
|
||||
} else {
|
||||
lp.Quit(0)
|
||||
}
|
||||
case "alive":
|
||||
live_ids := strings.Split(payload, ",")
|
||||
if slices.Contains(live_ids, p.identifier) {
|
||||
poll_for_close()
|
||||
} else {
|
||||
lp.Quit(0)
|
||||
}
|
||||
case "":
|
||||
activated = utils.IfElse(payload == "", "activated", payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
close_requested := 0
|
||||
lp.OnKeyEvent = func(event *loop.KeyEvent) error {
|
||||
if event.MatchesPressOrRepeat("ctrl+c") || event.MatchesPressOrRepeat("esc") {
|
||||
event.Handled = true
|
||||
switch close_requested {
|
||||
case 0:
|
||||
lp.QueueWriteString(prefix + ":p=close;" + ESC_CODE_SUFFIX)
|
||||
lp.Println("Closing notification, please wait...")
|
||||
close_requested++
|
||||
case 1:
|
||||
key := "Esc"
|
||||
if event.MatchesPressOrRepeat("ctrl+c") {
|
||||
key = "Ctrl+C"
|
||||
}
|
||||
lp.Println(fmt.Sprintf("Waiting for response from terminal, press the %s key again to abort. Note that this might result in garbage being printed to the terminal.", key))
|
||||
close_requested++
|
||||
default:
|
||||
return fmt.Errorf("Aborted by user!")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err = lp.Run()
|
||||
ds := lp.DeathSignalName()
|
||||
if ds != "" {
|
||||
fmt.Println("Killed by signal: ", ds)
|
||||
lp.KillIfSignalled()
|
||||
return
|
||||
}
|
||||
if activated != "" && err == nil {
|
||||
fmt.Println(activated)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func random_ident() (string, error) {
|
||||
return utils.HumanUUID4()
|
||||
}
|
||||
|
||||
func parse_duration(x string) (ans time.Duration, err error) {
|
||||
switch x {
|
||||
case "never":
|
||||
return 0, nil
|
||||
case "":
|
||||
return -1, nil
|
||||
}
|
||||
trailer := x[len(x)-1]
|
||||
multipler := time.Second
|
||||
switch trailer {
|
||||
case 's':
|
||||
x = x[:len(x)-1]
|
||||
case 'm':
|
||||
x = x[:len(x)-1]
|
||||
multipler = time.Minute
|
||||
case 'h':
|
||||
x = x[:len(x)-1]
|
||||
multipler = time.Hour
|
||||
case 'd':
|
||||
x = x[:len(x)-1]
|
||||
multipler = time.Hour * 24
|
||||
}
|
||||
val, err := strconv.ParseFloat(x, 64)
|
||||
if err != nil {
|
||||
return ans, err
|
||||
}
|
||||
ans = time.Duration(float64(multipler) * val)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *parsed_data) load_image_data() (err error) {
|
||||
if p.opts.IconPath == "" {
|
||||
return nil
|
||||
}
|
||||
f, err := os.Open(p.opts.IconPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
_, imgfmt, err := image.DecodeConfig(f)
|
||||
if _, err = f.Seek(0, io.SeekStart); err != nil {
|
||||
return err
|
||||
}
|
||||
if err == nil && imgfmt != "" && strings.Contains("jpeg jpg gif png", strings.ToLower(imgfmt)) {
|
||||
p.image_data, err = io.ReadAll(f)
|
||||
return
|
||||
}
|
||||
return fmt.Errorf("The icon must be in PNG, JPEG or GIF formats")
|
||||
}
|
||||
|
||||
func main(_ *cli.Command, opts *Options, args []string) (rc int, err error) {
|
||||
if len(args) == 0 {
|
||||
return 1, fmt.Errorf("Must specify a TITLE for the notification")
|
||||
}
|
||||
var p parsed_data
|
||||
p.opts = opts
|
||||
p.title = args[0]
|
||||
if len(p.title) == 0 {
|
||||
return 1, fmt.Errorf("Must specify a non-empty TITLE for the notification")
|
||||
}
|
||||
if len(args) > 1 {
|
||||
p.body = strings.Join(args[1:], " ")
|
||||
}
|
||||
ident := opts.Identifier
|
||||
if ident == "" {
|
||||
if ident, err = random_ident(); err != nil {
|
||||
return 1, fmt.Errorf("Failed to generate a random identifier with error: %w", err)
|
||||
}
|
||||
}
|
||||
bad_ident := func(which string) error {
|
||||
return fmt.Errorf("Invalid identifier: %s must be only English letters, numbers, hyphens and underscores.", which)
|
||||
}
|
||||
if !check_id_valid(ident) {
|
||||
return 1, bad_ident(ident)
|
||||
}
|
||||
p.identifier = ident
|
||||
if !check_id_valid(opts.IconCacheId) {
|
||||
return 1, bad_ident(opts.IconCacheId)
|
||||
}
|
||||
if p.expire_time, err = parse_duration(opts.ExpireTime); err != nil {
|
||||
return 1, fmt.Errorf("Invalid expire time: %s with error: %w", opts.ExpireTime, err)
|
||||
}
|
||||
p.wait_till_closed = opts.WaitTillClosed
|
||||
if err = p.load_image_data(); err != nil {
|
||||
return 1, fmt.Errorf("Failed to load image data from %s with error %w", opts.IconPath, err)
|
||||
}
|
||||
if opts.OnlyPrintEscapeCode {
|
||||
p.generate_chunks(func(x string) {
|
||||
if err == nil {
|
||||
_, err = os.Stdout.WriteString(x)
|
||||
}
|
||||
})
|
||||
} else {
|
||||
if opts.PrintIdentifier {
|
||||
fmt.Println(ident)
|
||||
}
|
||||
if p.wait_till_closed {
|
||||
err = p.run_loop()
|
||||
} else {
|
||||
var term *tty.Term
|
||||
if term, err = tty.OpenControllingTerm(); err != nil {
|
||||
return 1, fmt.Errorf("Failed to open controlling terminal with error: %w", err)
|
||||
}
|
||||
p.generate_chunks(func(x string) {
|
||||
if err == nil {
|
||||
_, err = term.WriteString(x)
|
||||
}
|
||||
})
|
||||
term.RestoreAndClose()
|
||||
}
|
||||
|
||||
}
|
||||
if err != nil {
|
||||
rc = 1
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func EntryPoint(parent *cli.Command) {
|
||||
create_cmd(parent, main)
|
||||
}
|
||||
96
kittens/notify/main.py
Normal file
96
kittens/notify/main.py
Normal file
@@ -0,0 +1,96 @@
|
||||
#!/usr/bin/env python
|
||||
# License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
def OPTIONS() -> str:
|
||||
from kitty.constants import standard_icon_names
|
||||
return f'''
|
||||
--icon -i
|
||||
type=list
|
||||
The name of the icon to use for the notification. An icon with this name
|
||||
will be searched for on the computer running the terminal emulator. Can
|
||||
be specified multiple times, the first name that is found will be used.
|
||||
Standard names: {', '.join(sorted(standard_icon_names))}
|
||||
|
||||
|
||||
--icon-path -I
|
||||
Path to an image file in PNG/JPEG/GIF formats to use as the icon. If both
|
||||
name and path are specified then first the name will be looked for and if not found
|
||||
then the path will be used.
|
||||
|
||||
|
||||
--app-name -a
|
||||
default=kitten-notify
|
||||
The application name for the notification.
|
||||
|
||||
|
||||
--urgency -u
|
||||
default=normal
|
||||
choices=normal,low,critical
|
||||
The urgency of the notification.
|
||||
|
||||
|
||||
--expire-time -t
|
||||
The duration, for the notification to appear on screen. The default is to
|
||||
use the policy of the OS notification service. A value of :code:`never` means the notification should
|
||||
never expire, however, this may or may not work depending on the policies of the OS notification
|
||||
service. Time is specified in the form NUMBER[SUFFIX] where SUFFIX can be :code:`s` for seconds, :code:`m` for minutes,
|
||||
:code:`h` for hours or :code:`d` for days. Non-integer numbers are allowed.
|
||||
If not specified, seconds is assumed. The notification is guaranteed to be closed automatically
|
||||
after the specified time has elapsed. The notification could be closed before by user
|
||||
action or OS policy.
|
||||
|
||||
|
||||
--type -c
|
||||
The notification type. Can be any string, it is used by users to create filter rules
|
||||
for notifications, so choose something descriptive of the notifications, purpose.
|
||||
|
||||
|
||||
--identifier
|
||||
The identifier of this notification. If a notification with the same identifier
|
||||
is already displayed, it is replaced/updated.
|
||||
|
||||
|
||||
--print-identifier -p
|
||||
type=bool-set
|
||||
Print the identifier for the notification to STDOUT. Useful when not specifying
|
||||
your own identifier via the --identifier option.
|
||||
|
||||
|
||||
--wait-till-closed -w
|
||||
type=bool-set
|
||||
Wait until the notification is closed. If the user activates the notification,
|
||||
"activated" is printed to STDOUT before quitting. Press the Esc or Ctrl+C keys
|
||||
to close the notification manually.
|
||||
|
||||
|
||||
--only-print-escape-code
|
||||
type=bool-set
|
||||
Only print the escape code to STDOUT. Useful if using this kitten as part
|
||||
of a larger application. If this is specified, the --wait-till-closed option
|
||||
will be used for escape code generation, but no actual waiting will be done.
|
||||
|
||||
|
||||
--icon-cache-id -g
|
||||
Identifier to use when caching icons in the terminal emulator. Using an identifier means
|
||||
that icon data needs to be transmitted only once using --icon-path. Subsequent invocations
|
||||
will use the cached icon data, at least until the terminal instance is restarted. This is useful
|
||||
if this kitten is being used inside a larger application, with --only-print-escape-code.
|
||||
'''
|
||||
|
||||
help_text = '''\
|
||||
Send notifications to the user that are displayed to them via the
|
||||
desktop environment's notifications service. Works over SSH as well.
|
||||
'''
|
||||
|
||||
usage = 'TITLE [BODY ...]'
|
||||
if __name__ == '__main__':
|
||||
raise SystemExit('This should be run as kitten clipboard')
|
||||
elif __name__ == '__doc__':
|
||||
cd = sys.cli_docs # type: ignore
|
||||
cd['usage'] = usage
|
||||
cd['options'] = OPTIONS
|
||||
cd['help_text'] = help_text
|
||||
cd['short_desc'] = 'Send notifications to the user'
|
||||
@@ -1145,7 +1145,7 @@ static bool has_cocoa_pending_actions = false;
|
||||
typedef struct cocoa_list { char **items; size_t count, capacity; } cocoa_list;
|
||||
typedef struct {
|
||||
char* wd;
|
||||
cocoa_list open_urls, closed_notifications;
|
||||
cocoa_list open_urls, untracked_notifications;
|
||||
} CocoaPendingActionsData;
|
||||
static CocoaPendingActionsData cocoa_pending_actions_data = {0};
|
||||
|
||||
@@ -1165,7 +1165,7 @@ static void
|
||||
cocoa_free_actions_data(void) {
|
||||
if (cocoa_pending_actions_data.wd) { free(cocoa_pending_actions_data.wd); cocoa_pending_actions_data.wd = NULL; }
|
||||
cocoa_free_pending_list(&cocoa_pending_actions_data.open_urls);
|
||||
cocoa_free_pending_list(&cocoa_pending_actions_data.closed_notifications);
|
||||
cocoa_free_pending_list(&cocoa_pending_actions_data.untracked_notifications);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -1174,8 +1174,8 @@ set_cocoa_pending_action(CocoaPendingAction action, const char *data) {
|
||||
switch(action) {
|
||||
case LAUNCH_URLS:
|
||||
cocoa_append_to_pending_list(&cocoa_pending_actions_data.open_urls, data); break;
|
||||
case COCOA_NOTIFICATION_CLOSED:
|
||||
cocoa_append_to_pending_list(&cocoa_pending_actions_data.closed_notifications, data); break;
|
||||
case COCOA_NOTIFICATION_UNTRACKED:
|
||||
cocoa_append_to_pending_list(&cocoa_pending_actions_data.untracked_notifications, data); break;
|
||||
default:
|
||||
if (cocoa_pending_actions_data.wd) free(cocoa_pending_actions_data.wd);
|
||||
cocoa_pending_actions_data.wd = strdup(data);
|
||||
@@ -1226,14 +1226,17 @@ process_cocoa_pending_actions(void) {
|
||||
}
|
||||
}
|
||||
cocoa_pending_actions_data.open_urls.count = 0;
|
||||
for (unsigned cpa = 0; cpa < cocoa_pending_actions_data.closed_notifications.count; cpa++) {
|
||||
if (cocoa_pending_actions_data.closed_notifications.items[cpa]) {
|
||||
cocoa_report_closed_notification(cocoa_pending_actions_data.closed_notifications.items[cpa]);
|
||||
free(cocoa_pending_actions_data.closed_notifications.items[cpa]);
|
||||
cocoa_pending_actions_data.closed_notifications.items[cpa] = NULL;
|
||||
|
||||
for (unsigned cpa = 0; cpa < cocoa_pending_actions_data.untracked_notifications.count; cpa++) {
|
||||
if (cocoa_pending_actions_data.untracked_notifications.items[cpa]) {
|
||||
cocoa_report_live_notifications(cocoa_pending_actions_data.untracked_notifications.items[cpa]);
|
||||
free(cocoa_pending_actions_data.untracked_notifications.items[cpa]);
|
||||
cocoa_pending_actions_data.untracked_notifications.items[cpa] = NULL;
|
||||
}
|
||||
}
|
||||
cocoa_pending_actions_data.closed_notifications.count = 0;
|
||||
cocoa_pending_actions_data.untracked_notifications.count = 0;
|
||||
|
||||
|
||||
memset(cocoa_pending_actions, 0, sizeof(cocoa_pending_actions));
|
||||
has_cocoa_pending_actions = false;
|
||||
|
||||
|
||||
@@ -237,7 +237,7 @@ class WriteRequest:
|
||||
self, is_primary_selection: bool = False, protocol_type: ProtocolType = ProtocolType.osc_52, id: str = '',
|
||||
rollover_size: int = 16 * 1024 * 1024, max_size: int = -1,
|
||||
) -> None:
|
||||
self.decoder = StreamingBase64Decoder(8 * 1024)
|
||||
self.decoder = StreamingBase64Decoder()
|
||||
self.id = id
|
||||
self.is_primary_selection = is_primary_selection
|
||||
self.protocol_type = protocol_type
|
||||
@@ -279,24 +279,18 @@ class WriteRequest:
|
||||
self.currently_writing_mime = mime
|
||||
self.write_base64_data(data)
|
||||
|
||||
@property
|
||||
def current_leftover_bytes(self) -> memoryview:
|
||||
return self.decoder.leftover_bytes()
|
||||
|
||||
def flush_base64_data(self) -> None:
|
||||
if self.currently_writing_mime:
|
||||
self.decoder.flush()
|
||||
if len(self.decoder):
|
||||
self.write_base64_data(b'')
|
||||
self.decoder.reset()
|
||||
start = self.mime_map[self.currently_writing_mime][0]
|
||||
self.mime_map[self.currently_writing_mime] = MimePos(start, self.tempfile.tell() - start)
|
||||
self.currently_writing_mime = ''
|
||||
|
||||
def write_base64_data(self, b: bytes) -> None:
|
||||
if not self.max_size_exceeded:
|
||||
self.decoder.add(b)
|
||||
if len(self.decoder):
|
||||
self.tempfile.write(self.decoder.take_output())
|
||||
decoded = self.decoder.decode(b)
|
||||
if decoded:
|
||||
self.tempfile.write(decoded)
|
||||
if self.max_size > 0 and self.tempfile.tell() > (self.max_size * 1024 * 1024):
|
||||
log_error(f'Clipboard write request has more data than allowed by clipboard_max_size ({self.max_size}), truncating')
|
||||
self.max_size_exceeded = True
|
||||
|
||||
@@ -34,7 +34,7 @@ typedef enum {
|
||||
MINIMIZE,
|
||||
QUIT,
|
||||
USER_MENU_ACTION,
|
||||
COCOA_NOTIFICATION_CLOSED,
|
||||
COCOA_NOTIFICATION_UNTRACKED,
|
||||
|
||||
NUM_COCOA_PENDING_ACTIONS
|
||||
} CocoaPendingAction;
|
||||
@@ -59,4 +59,4 @@ bool cocoa_render_line_of_text(const char *text, const color_type fg, const colo
|
||||
extern uint8_t* render_single_ascii_char_as_mask(const char ch, size_t *result_width, size_t *result_height);
|
||||
void get_cocoa_key_equivalent(uint32_t, int, char *key, size_t key_sz, int*);
|
||||
void set_cocoa_pending_action(CocoaPendingAction action, const char*);
|
||||
void cocoa_report_closed_notification(const char* ident);
|
||||
void cocoa_report_live_notifications(const char* ident);
|
||||
|
||||
@@ -435,7 +435,7 @@ cocoa_send_notification(PyObject *self UNUSED, PyObject *args) {
|
||||
do_notification_callback([[[response notification] request] identifier], "activated");
|
||||
} else if ([response.actionIdentifier isEqualToString:UNNotificationDismissActionIdentifier]) {
|
||||
// this never actually happens on macOS. Bloody Crapple.
|
||||
do_notification_callback([[[response notification] request] identifier], "closed");
|
||||
// do_notification_callback([[[response notification] request] identifier], "closed");
|
||||
}
|
||||
completionHandler();
|
||||
}
|
||||
@@ -457,23 +457,6 @@ get_notification_center_safely(void) {
|
||||
return center;
|
||||
}
|
||||
|
||||
static bool
|
||||
remove_delivered_notification(const char *identifier) {
|
||||
UNUserNotificationCenter *center = get_notification_center_safely();
|
||||
if (!center) return false;
|
||||
[center removeDeliveredNotificationsWithIdentifiers:@[ @(identifier) ]];
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
static NSLock *notifications_polling_lock = NULL;
|
||||
static bool polling_notifications = false;
|
||||
typedef struct tn { char *ident; bool closed; monotonic_t creation_time; } tn;
|
||||
static struct { tn *items; size_t count, capacity; monotonic_t creation_time; } tracked_notifications;
|
||||
static void dispatch_closed_notifications(void);
|
||||
#define CLOSE_POLL_TIME 60.
|
||||
#define CLOSE_POLL_INTERVAL 750
|
||||
|
||||
static bool
|
||||
ident_in_list_of_notifications(NSString *ident, NSArray<UNNotification*> *list) {
|
||||
for (UNNotification *n in list) {
|
||||
@@ -482,81 +465,48 @@ ident_in_list_of_notifications(NSString *ident, NSArray<UNNotification*> *list)
|
||||
return false;
|
||||
}
|
||||
|
||||
static void
|
||||
poll_for_closed_notifications(void) {
|
||||
UNUserNotificationCenter *center = get_notification_center_safely();
|
||||
if (!center) return;
|
||||
[center getDeliveredNotificationsWithCompletionHandler:^(NSArray<UNNotification *> * notifications) {
|
||||
// NSLog(@"num of delivered but not closed nots: %lu", (unsigned long)[notifications count]);
|
||||
[notifications_polling_lock lock];
|
||||
for (size_t i = 0; i < tracked_notifications.count; i++) {
|
||||
if (!ident_in_list_of_notifications(@(tracked_notifications.items[i].ident), notifications)) tracked_notifications.items[i].closed = true;
|
||||
}
|
||||
[notifications_polling_lock unlock];
|
||||
dispatch_async(dispatch_get_main_queue(), ^{
|
||||
dispatch_closed_notifications();
|
||||
});
|
||||
}];
|
||||
}
|
||||
|
||||
void
|
||||
cocoa_report_closed_notification(const char* ident) {
|
||||
do_notification_callback(@(ident), "closed");
|
||||
cocoa_report_live_notifications(const char* ident) {
|
||||
do_notification_callback(@(ident), "live");
|
||||
}
|
||||
|
||||
static void
|
||||
dispatch_closed_notifications(void) {
|
||||
bool poll = false;
|
||||
[notifications_polling_lock lock];
|
||||
monotonic_t now = monotonic();
|
||||
for (size_t i = tracked_notifications.count; i-- > 0; ) {
|
||||
if (tracked_notifications.items[i].closed) {
|
||||
set_cocoa_pending_action(COCOA_NOTIFICATION_CLOSED, tracked_notifications.items[i].ident);
|
||||
free(tracked_notifications.items[i].ident);
|
||||
remove_i_from_array(tracked_notifications.items, i, tracked_notifications.count);
|
||||
} else if (now - tracked_notifications.items[i].creation_time < s_double_to_monotonic_t(CLOSE_POLL_TIME)) poll = true;
|
||||
}
|
||||
polling_notifications = poll;
|
||||
[notifications_polling_lock unlock];
|
||||
if (poll) {
|
||||
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, CLOSE_POLL_INTERVAL * NSEC_PER_MSEC), dispatch_get_main_queue(), ^{
|
||||
poll_for_closed_notifications();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
track_notification(char *ident) {
|
||||
static bool
|
||||
remove_delivered_notification(const char *identifier) {
|
||||
UNUserNotificationCenter *center = get_notification_center_safely();
|
||||
if (center) {
|
||||
[notifications_polling_lock lock];
|
||||
bool has_existing = false;
|
||||
for (size_t i = 0; i < tracked_notifications.count; i++) {
|
||||
if (strcmp(tracked_notifications.items[i].ident, ident) == 0) {
|
||||
tracked_notifications.items[i].creation_time = monotonic();
|
||||
has_existing = true;
|
||||
break;
|
||||
}
|
||||
if (!center) return false;
|
||||
char *ident = strdup(identifier);
|
||||
[center getDeliveredNotificationsWithCompletionHandler:^(NSArray<UNNotification *> * notifications) {
|
||||
if (ident_in_list_of_notifications(@(ident), notifications)) {
|
||||
[center removeDeliveredNotificationsWithIdentifiers:@[ @(ident) ]];
|
||||
}
|
||||
if (!has_existing) {
|
||||
ensure_space_for(&tracked_notifications, items, tn, tracked_notifications.count + 1, capacity, 8, false);
|
||||
tracked_notifications.items[tracked_notifications.count++] = (tn){.ident=ident, .creation_time=monotonic()};
|
||||
free(ident);
|
||||
}];
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool
|
||||
live_delivered_notifications(void) {
|
||||
UNUserNotificationCenter *center = get_notification_center_safely();
|
||||
if (!center) return false;
|
||||
[center getDeliveredNotificationsWithCompletionHandler:^(NSArray<UNNotification *> * notifications) {
|
||||
@autoreleasepool {
|
||||
NSMutableString *buffer = [NSMutableString stringWithCapacity:1024]; // autoreleased
|
||||
for (UNNotification *n in notifications) [buffer appendFormat:@"%@,", [[n request] identifier]];
|
||||
const char *val = [buffer UTF8String];
|
||||
set_cocoa_pending_action(COCOA_NOTIFICATION_UNTRACKED, val ? val : "");
|
||||
}
|
||||
bool needs_poll = !polling_notifications;
|
||||
[notifications_polling_lock unlock];
|
||||
if (needs_poll) poll_for_closed_notifications();
|
||||
}
|
||||
}];
|
||||
return true;
|
||||
}
|
||||
|
||||
static void
|
||||
schedule_notification(const char *identifier, const char *title, const char *body, const char *subtitle, int urgency) {
|
||||
schedule_notification(const char *identifier, const char *title, const char *body, int urgency) {
|
||||
UNUserNotificationCenter *center = get_notification_center_safely();
|
||||
if (!center) return;
|
||||
// Configure the notification's payload.
|
||||
UNMutableNotificationContent* content = [[UNMutableNotificationContent alloc] init];
|
||||
if (title) content.title = @(title);
|
||||
if (body) content.body = @(body);
|
||||
if (subtitle) content.subtitle = @(subtitle);
|
||||
content.sound = [UNNotificationSound defaultSound];
|
||||
#if __MAC_OS_X_VERSION_MIN_REQUIRED >= 120000
|
||||
switch (urgency) {
|
||||
@@ -584,9 +534,8 @@ schedule_notification(const char *identifier, const char *title, const char *bod
|
||||
if (error != nil) log_error("Failed to show notification: %s", [[error localizedDescription] UTF8String]);
|
||||
bool ok = error == nil;
|
||||
dispatch_async(dispatch_get_main_queue(), ^{
|
||||
do_notification_callback(@(duped_ident), ok ? "created" : "closed");
|
||||
if (ok) track_notification(duped_ident);
|
||||
else free(duped_ident);
|
||||
do_notification_callback(@(duped_ident), ok ? "created" : "creation_failed");
|
||||
free(duped_ident);
|
||||
});
|
||||
}];
|
||||
[content release];
|
||||
@@ -594,7 +543,7 @@ schedule_notification(const char *identifier, const char *title, const char *bod
|
||||
|
||||
|
||||
typedef struct {
|
||||
char *identifier, *title, *body, *subtitle;
|
||||
char *identifier, *title, *body;
|
||||
int urgency;
|
||||
} QueuedNotification;
|
||||
|
||||
@@ -605,13 +554,12 @@ typedef struct {
|
||||
static NotificationQueue notification_queue = {0};
|
||||
|
||||
static void
|
||||
queue_notification(const char *identifier, const char *title, const char* body, const char* subtitle, int urgency) {
|
||||
queue_notification(const char *identifier, const char *title, const char* body, int urgency) {
|
||||
ensure_space_for((¬ification_queue), notifications, QueuedNotification, notification_queue.count + 16, capacity, 16, true);
|
||||
QueuedNotification *n = notification_queue.notifications + notification_queue.count++;
|
||||
n->identifier = identifier ? strdup(identifier) : NULL;
|
||||
n->title = title ? strdup(title) : NULL;
|
||||
n->body = body ? strdup(body) : NULL;
|
||||
n->subtitle = subtitle ? strdup(subtitle) : NULL;
|
||||
n->urgency = urgency;
|
||||
}
|
||||
|
||||
@@ -620,13 +568,13 @@ drain_pending_notifications(BOOL granted) {
|
||||
if (granted) {
|
||||
for (size_t i = 0; i < notification_queue.count; i++) {
|
||||
QueuedNotification *n = notification_queue.notifications + i;
|
||||
schedule_notification(n->identifier, n->title, n->body, n->subtitle, n->urgency);
|
||||
schedule_notification(n->identifier, n->title, n->body, n->urgency);
|
||||
}
|
||||
}
|
||||
while(notification_queue.count) {
|
||||
QueuedNotification *n = notification_queue.notifications + --notification_queue.count;
|
||||
if (!granted) do_notification_callback(@(n->identifier), "closed");
|
||||
free(n->identifier); free(n->title); free(n->body); free(n->subtitle);
|
||||
if (!granted) do_notification_callback(@(n->identifier), "creation_failed");
|
||||
free(n->identifier); free(n->title); free(n->body);
|
||||
memset(n, 0, sizeof(QueuedNotification));
|
||||
}
|
||||
}
|
||||
@@ -638,15 +586,23 @@ cocoa_remove_delivered_notification(PyObject *self UNUSED, PyObject *x) {
|
||||
Py_RETURN_FALSE;
|
||||
}
|
||||
|
||||
static PyObject*
|
||||
cocoa_live_delivered_notifications(PyObject *self UNUSED, PyObject *x UNUSED) {
|
||||
if (live_delivered_notifications()) { Py_RETURN_TRUE; }
|
||||
Py_RETURN_FALSE;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static PyObject*
|
||||
cocoa_send_notification(PyObject *self UNUSED, PyObject *args) {
|
||||
char *identifier = NULL, *title = NULL, *body = NULL, *subtitle = NULL; int urgency = 1;
|
||||
if (!PyArg_ParseTuple(args, "ssz|zi", &identifier, &title, &body, &subtitle, &urgency)) return NULL;
|
||||
char *identifier = NULL, *title = NULL, *body = NULL; int urgency = 1;
|
||||
if (!PyArg_ParseTuple(args, "sss|i", &identifier, &title, &body, &urgency)) return NULL;
|
||||
|
||||
UNUserNotificationCenter *center = get_notification_center_safely();
|
||||
if (!center) Py_RETURN_NONE;
|
||||
if (!center.delegate) center.delegate = [[NotificationDelegate alloc] init];
|
||||
queue_notification(identifier, title, body, subtitle, urgency);
|
||||
queue_notification(identifier, title, body, urgency);
|
||||
|
||||
// The badge permission needs to be requested as well, even though it is not used,
|
||||
// otherwise macOS refuses to show the preference checkbox for enable/disable notification sound.
|
||||
@@ -1116,8 +1072,6 @@ cleanup(void) {
|
||||
dockMenu = nil;
|
||||
if (beep_sound) [beep_sound release];
|
||||
beep_sound = nil;
|
||||
if (notifications_polling_lock) [notifications_polling_lock release];
|
||||
notifications_polling_lock = nil;
|
||||
|
||||
#ifndef KITTY_USE_DEPRECATED_MACOS_NOTIFICATION_API
|
||||
drain_pending_notifications(NO);
|
||||
@@ -1157,6 +1111,7 @@ static PyMethodDef module_methods[] = {
|
||||
{"cocoa_set_global_shortcut", (PyCFunction)cocoa_set_global_shortcut, METH_VARARGS, ""},
|
||||
{"cocoa_send_notification", (PyCFunction)cocoa_send_notification, METH_VARARGS, ""},
|
||||
{"cocoa_remove_delivered_notification", (PyCFunction)cocoa_remove_delivered_notification, METH_O, ""},
|
||||
{"cocoa_live_delivered_notifications", (PyCFunction)cocoa_live_delivered_notifications, METH_NOARGS, ""},
|
||||
{"cocoa_set_notification_activated_callback", (PyCFunction)set_notification_activated_callback, METH_O, ""},
|
||||
{"cocoa_set_url_handler", (PyCFunction)cocoa_set_url_handler, METH_VARARGS, ""},
|
||||
{"cocoa_set_app_icon", (PyCFunction)cocoa_set_app_icon, METH_VARARGS, ""},
|
||||
@@ -1168,7 +1123,6 @@ bool
|
||||
init_cocoa(PyObject *module) {
|
||||
cocoa_clear_global_shortcuts();
|
||||
if (PyModule_AddFunctions(module, module_methods) != 0) return false;
|
||||
notifications_polling_lock = [NSLock new];
|
||||
register_at_exit_cleanup_func(COCOA_CLEANUP_FUNC, cleanup);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -188,6 +188,20 @@ except KeyError:
|
||||
# https://github.com/ansible/ansible/issues/11536#issuecomment-153030743
|
||||
ssh_control_master_template = 'kssh-{kitty_pid}-{ssh_placeholder}'
|
||||
|
||||
# See https://specifications.freedesktop.org/icon-naming-spec/latest/ar01s04.html
|
||||
standard_icon_names = {
|
||||
'error': 'dialog-error',
|
||||
'warning': 'dialog-warning',
|
||||
'warn': 'dialog-warning',
|
||||
'info': 'dialog-information',
|
||||
'question': 'dialog-question',
|
||||
|
||||
'help': 'system-help',
|
||||
'file-manager': 'system-file-manager',
|
||||
'system-monitor': 'utilities-system-monitor',
|
||||
'text-editor': 'utilities-text-editor',
|
||||
}
|
||||
|
||||
|
||||
def glfw_path(module: str) -> str:
|
||||
prefix = 'kitty.' if getattr(sys, 'frozen', False) else ''
|
||||
|
||||
@@ -107,134 +107,51 @@ pybase64_decode(PyObject UNUSED *self, PyObject *args) {
|
||||
|
||||
typedef struct StreamingBase64Decoder {
|
||||
PyObject_HEAD
|
||||
PyObject *output;
|
||||
size_t output_sz, output_capacity, num_leftover_bytes, initial_capacity;
|
||||
unsigned char leftover_bytes[8];
|
||||
struct base64_state state;
|
||||
} StreamingBase64Decoder;
|
||||
|
||||
static int
|
||||
StreamingBase64Decoder_init(PyObject *s, PyObject *args, PyObject *kwds) {
|
||||
static char *kwlist[] = {"initial_capacity", NULL};
|
||||
unsigned long initial_capacity = 8 * 1024;
|
||||
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|k", kwlist, &initial_capacity)) return -1;
|
||||
StreamingBase64Decoder_init(PyObject *s, PyObject *args, PyObject *kwds UNUSED) {
|
||||
if (PyTuple_GET_SIZE(args)) { PyErr_SetString(PyExc_TypeError, "constructor takes no arguments"); return -1; }
|
||||
StreamingBase64Decoder *self = (StreamingBase64Decoder*)s;
|
||||
self->initial_capacity = initial_capacity;
|
||||
base64_stream_decode_init(&self->state, 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
StreamingBase64Decoder_dealloc(PyObject *self) {
|
||||
StreamingBase64Decoder *h = (StreamingBase64Decoder*)self;
|
||||
Py_CLEAR(h->output);
|
||||
Py_TYPE(self)->tp_free(self);
|
||||
}
|
||||
|
||||
static bool
|
||||
write_base64_data(StreamingBase64Decoder *self, const void *data, size_t len) {
|
||||
if (!len) return true;
|
||||
size_t sz = required_buffer_size_for_base64_decode(len);
|
||||
if ((self->output_sz + sz) > self->output_capacity) {
|
||||
size_t cap = MAX(self->output_capacity * 2, self->output_sz + sz + self->initial_capacity);
|
||||
if (self->output) { if (_PyBytes_Resize(&self->output, cap) != 0) return false; }
|
||||
else { self->output = PyBytes_FromStringAndSize(NULL, cap); if (!self->output) return false; }
|
||||
self->output_capacity = cap;
|
||||
}
|
||||
if (!base64_decode8(data, len, (unsigned char*)(PyBytes_AS_STRING(self->output) + self->output_sz), &sz)) {
|
||||
PyErr_SetString(PyExc_ValueError, "Invalid base64 input data");
|
||||
return false;
|
||||
}
|
||||
self->output_sz += sz;
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool
|
||||
write_saving_leftover_bytes(StreamingBase64Decoder *self, const unsigned char *data, size_t len) {
|
||||
size_t extra = len % 4;
|
||||
if (!write_base64_data(self, data, len - extra)) return false;
|
||||
self->num_leftover_bytes = extra;
|
||||
if (extra) memcpy(self->leftover_bytes, data + len - extra, extra);
|
||||
return true;
|
||||
}
|
||||
|
||||
static PyObject*
|
||||
StreamingBase64Decoder_add(StreamingBase64Decoder *self, PyObject *a) {
|
||||
StreamingBase64Decoder_decode(StreamingBase64Decoder *self, PyObject *a) {
|
||||
RAII_PY_BUFFER(data);
|
||||
if (PyObject_GetBuffer(a, &data, PyBUF_SIMPLE) != 0) return NULL;
|
||||
if (!data.buf || !data.len) return PyLong_FromLong(0);
|
||||
unsigned char *d = data.buf; size_t dlen = data.len;
|
||||
size_t before = self->output_sz;
|
||||
if (self->num_leftover_bytes) {
|
||||
size_t extra = 4 - self->num_leftover_bytes;
|
||||
if (dlen >= extra) {
|
||||
memcpy(self->leftover_bytes + self->num_leftover_bytes, d, extra);
|
||||
if (!write_base64_data(self, self->leftover_bytes, self->num_leftover_bytes + extra)) return NULL;
|
||||
self->num_leftover_bytes = 0;
|
||||
d += extra; dlen -= extra;
|
||||
if (!write_saving_leftover_bytes(self, d, dlen)) return NULL;
|
||||
} else {
|
||||
memcpy(self->leftover_bytes + self->num_leftover_bytes, d, dlen);
|
||||
self->num_leftover_bytes += dlen;
|
||||
}
|
||||
} else if (!write_saving_leftover_bytes(self, d, dlen)) return NULL;
|
||||
return PyLong_FromSize_t(self->output_sz - before);
|
||||
}
|
||||
|
||||
static Py_ssize_t
|
||||
StreamingBase64Decoder_len(PyObject *s) { return ((StreamingBase64Decoder*)s)->output_sz; }
|
||||
|
||||
static PyObject*
|
||||
StreamingBase64Decoder_leftover_bytes(StreamingBase64Decoder *self, PyObject *a UNUSED) {
|
||||
return PyMemoryView_FromMemory((char*)self->leftover_bytes, self->num_leftover_bytes, PyBUF_READ);
|
||||
}
|
||||
|
||||
static PyObject*
|
||||
StreamingBase64Decoder_flush(StreamingBase64Decoder *self, PyObject *args UNUSED) {
|
||||
size_t padding = 4 - self->num_leftover_bytes;
|
||||
switch(padding) {
|
||||
case 1: self->leftover_bytes[self->num_leftover_bytes++] = '='; break;
|
||||
case 2: self->leftover_bytes[self->num_leftover_bytes++] = '='; self->leftover_bytes[self->num_leftover_bytes++] = '='; break;
|
||||
if (!data.buf || !data.len) return PyBytes_FromStringAndSize(NULL, 0);
|
||||
size_t sz = required_buffer_size_for_base64_decode(data.len);
|
||||
RAII_PyObject(ans, PyBytes_FromStringAndSize(NULL, sz));
|
||||
if (!ans) return NULL;
|
||||
if (!base64_stream_decode(&self->state, data.buf, data.len, PyBytes_AS_STRING(ans), &sz)) {
|
||||
PyErr_SetString(PyExc_ValueError, "Invalid base64 input data");
|
||||
return NULL;
|
||||
}
|
||||
write_base64_data(self, self->leftover_bytes, self->num_leftover_bytes);
|
||||
self->num_leftover_bytes = 0;
|
||||
if (_PyBytes_Resize(&ans, sz) != 0) return NULL;
|
||||
return Py_NewRef(ans);
|
||||
}
|
||||
|
||||
static PyObject*
|
||||
StreamingBase64Decoder_reset(StreamingBase64Decoder *self, PyObject *args UNUSED) {
|
||||
base64_stream_decode_init(&self->state, 0);
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
static PyObject*
|
||||
StreamingBase64Decoder_copy_output(StreamingBase64Decoder *self, PyObject *args UNUSED) {
|
||||
return PyBytes_FromStringAndSize(PyBytes_AS_STRING(self->output), self->output_sz);
|
||||
}
|
||||
|
||||
static PyObject*
|
||||
StreamingBase64Decoder_take_output(StreamingBase64Decoder *self, PyObject *args UNUSED) {
|
||||
if (!self->output_sz) return PyBytes_FromStringAndSize(NULL, 0);
|
||||
RAII_PyObject(newbuf, PyBytes_FromStringAndSize(NULL, self->initial_capacity));
|
||||
if (!newbuf) return NULL;
|
||||
if (_PyBytes_Resize(&self->output, self->output_sz) != 0) return NULL;
|
||||
PyObject *ans = self->output;
|
||||
self->output = Py_NewRef(newbuf); self->output_sz = 0; self->output_capacity = self->initial_capacity;
|
||||
return ans;
|
||||
}
|
||||
|
||||
static PyTypeObject StreamingBase64Decoder_Type = {
|
||||
PyVarObject_HEAD_INIT(NULL, 0)
|
||||
.tp_name = "kitty.fast_data_types.StreamingBase64Decoder",
|
||||
.tp_basicsize = sizeof(StreamingBase64Decoder),
|
||||
.tp_dealloc = StreamingBase64Decoder_dealloc,
|
||||
.tp_flags = Py_TPFLAGS_DEFAULT,
|
||||
.tp_doc = "StreamingBase64Decoder",
|
||||
.tp_methods = (PyMethodDef[]){
|
||||
{"add", (PyCFunction)StreamingBase64Decoder_add, METH_O, ""},
|
||||
{"flush", (PyCFunction)StreamingBase64Decoder_flush, METH_NOARGS, ""},
|
||||
{"take_output", (PyCFunction)StreamingBase64Decoder_take_output, METH_NOARGS, ""},
|
||||
{"copy_output", (PyCFunction)StreamingBase64Decoder_copy_output, METH_NOARGS, ""},
|
||||
{"leftover_bytes", (PyCFunction)StreamingBase64Decoder_leftover_bytes, METH_NOARGS, ""},
|
||||
{"decode", (PyCFunction)StreamingBase64Decoder_decode, METH_O, ""},
|
||||
{"reset", (PyCFunction)StreamingBase64Decoder_reset, METH_NOARGS, ""},
|
||||
{NULL, NULL, 0, NULL},
|
||||
},
|
||||
.tp_new = PyType_GenericNew,
|
||||
.tp_init = StreamingBase64Decoder_init,
|
||||
.tp_as_sequence = &(PySequenceMethods){
|
||||
.sq_length = StreamingBase64Decoder_len,
|
||||
},
|
||||
};
|
||||
|
||||
static PyObject*
|
||||
|
||||
@@ -556,6 +556,7 @@ def dbus_send_notification(
|
||||
action_text: str = '',
|
||||
timeout: int = -1,
|
||||
urgency: int = 1,
|
||||
replaces: int = 0,
|
||||
) -> int:
|
||||
pass
|
||||
|
||||
@@ -566,13 +567,13 @@ def dbus_close_notification(dbus_notification_id: int) -> bool: ...
|
||||
def cocoa_send_notification(
|
||||
identifier: str,
|
||||
title: str,
|
||||
body: Optional[str],
|
||||
subtitle: Optional[str],
|
||||
body: str,
|
||||
urgency: int = 1,
|
||||
) -> None:
|
||||
pass
|
||||
|
||||
def cocoa_remove_delivered_notification(identifier: str) -> bool: ...
|
||||
def cocoa_live_delivered_notifications() -> bool: ...
|
||||
|
||||
def create_os_window(
|
||||
get_window_size: Callable[[int, int, int, int, float, float], Tuple[int,
|
||||
@@ -1704,13 +1705,8 @@ def get_mouse_data_for_window(os_window_id: int, tab_id: int, window_id: int) ->
|
||||
|
||||
|
||||
class StreamingBase64Decoder:
|
||||
def __init__(self, initial_capacity: int = 8 *1024) -> None: ... # set the initial output buffer capacity
|
||||
def add(self, data: ReadOnlyBuffer) -> int: ... # add the base64 data
|
||||
def flush(self) -> None: ... # indicate end of base64 data, left over bytes are processed as if they were followed by padding
|
||||
def take_output(self) -> bytes: ... # take the output so far. The decoder no longer references this output
|
||||
def copy_output(self) -> bytes: ... # copy the output so far
|
||||
def __len__(self) -> int: ... # return the length of the current output
|
||||
def leftover_bytes(self) -> memoryview: ... # return the currently leftover bytes that will be consumed by flush()
|
||||
def decode(self, data: ReadOnlyBuffer) -> bytes: ... # decode the specified data
|
||||
def reset(self) -> None: ... # reset the state to empty to start decoding a new stream
|
||||
|
||||
|
||||
class DiskCache:
|
||||
|
||||
3
kitty/glfw-wrapper.h
generated
3
kitty/glfw-wrapper.h
generated
@@ -1055,7 +1055,7 @@ typedef struct GLFWLayerShellConfig {
|
||||
|
||||
typedef struct GLFWDBUSNotificationData {
|
||||
const char *app_name, *icon, *summary, *body, *action_name;
|
||||
int32_t timeout; uint8_t urgency;
|
||||
int32_t timeout; uint8_t urgency; uint32_t replaces;
|
||||
} GLFWDBUSNotificationData;
|
||||
|
||||
/*! @brief The function pointer type for error callbacks.
|
||||
@@ -1705,7 +1705,6 @@ typedef void (* GLFWcocoarenderframefun)(GLFWwindow*);
|
||||
typedef void (*GLFWwaylandframecallbackfunc)(unsigned long long id);
|
||||
typedef void (*GLFWDBusnotificationcreatedfun)(unsigned long long, uint32_t, void*);
|
||||
typedef void (*GLFWDBusnotificationactivatedfun)(uint32_t, int, const char*);
|
||||
|
||||
typedef int (*glfwInit_func)(monotonic_t);
|
||||
GFW_EXTERN glfwInit_func glfwInit_impl;
|
||||
#define glfwInit glfwInit_impl
|
||||
|
||||
@@ -2076,17 +2076,18 @@ dbus_notification_created_callback(unsigned long long notification_id, uint32_t
|
||||
|
||||
static PyObject*
|
||||
dbus_send_notification(PyObject *self UNUSED, PyObject *args, PyObject *kw) {
|
||||
int timeout = -1, urgency = 1;
|
||||
int timeout = -1, urgency = 1; unsigned int replaces = 0;
|
||||
GLFWDBUSNotificationData d = {.action_name=""};
|
||||
static const char* kwlist[] = {"app_name", "app_icon", "title", "body", "action_text", "timeout", "urgency", NULL};
|
||||
if (!PyArg_ParseTupleAndKeywords(args, kw, "ssss|sii", (char**)kwlist,
|
||||
&d.app_name, &d.icon, &d.summary, &d.body, &d.action_name, &timeout, &urgency)) return NULL;
|
||||
static const char* kwlist[] = {"app_name", "app_icon", "title", "body", "action_text", "timeout", "urgency", "replaces", NULL};
|
||||
if (!PyArg_ParseTupleAndKeywords(args, kw, "ssss|siiI", (char**)kwlist,
|
||||
&d.app_name, &d.icon, &d.summary, &d.body, &d.action_name, &timeout, &urgency, &replaces)) return NULL;
|
||||
if (!glfwDBusUserNotify) {
|
||||
PyErr_SetString(PyExc_RuntimeError, "Failed to load glfwDBusUserNotify, did you call glfw_init?");
|
||||
return NULL;
|
||||
}
|
||||
d.timeout = timeout;
|
||||
d.urgency = urgency & 3;
|
||||
d.replaces = replaces;
|
||||
unsigned long long notification_id = glfwDBusUserNotify(&d, dbus_notification_created_callback, NULL);
|
||||
return PyLong_FromUnsignedLongLong(notification_id);
|
||||
}
|
||||
|
||||
@@ -6,12 +6,13 @@ import re
|
||||
from collections import OrderedDict
|
||||
from contextlib import suppress
|
||||
from enum import Enum
|
||||
from functools import partial
|
||||
from itertools import count
|
||||
from typing import Any, Callable, Dict, FrozenSet, Iterator, List, NamedTuple, Optional, Tuple, Union
|
||||
from typing import Any, Callable, Dict, FrozenSet, Iterator, List, NamedTuple, Optional, Sequence, Set, Tuple, Union
|
||||
from weakref import ReferenceType, ref
|
||||
|
||||
from .constants import cache_dir, is_macos, logo_png_file
|
||||
from .fast_data_types import ESC_OSC, StreamingBase64Decoder, current_focused_os_window_id, get_boss
|
||||
from .constants import cache_dir, config_dir, is_macos, logo_png_file, standard_icon_names
|
||||
from .fast_data_types import ESC_OSC, StreamingBase64Decoder, add_timer, base64_decode, current_focused_os_window_id, get_boss, get_options
|
||||
from .types import run_once
|
||||
from .typing import WindowType
|
||||
from .utils import get_custom_window_icon, log_error, sanitize_control_codes
|
||||
@@ -24,7 +25,8 @@ class IconDataCache:
|
||||
|
||||
def __init__(self, base_cache_dir: str = '', max_cache_size: int = 128 * 1024 * 1024):
|
||||
self.max_cache_size = max_cache_size
|
||||
self.key_map: 'OrderedDict[str, str]' = OrderedDict()
|
||||
self.key_map: Dict[str, str] = {}
|
||||
self.hash_map: 'OrderedDict[str, Set[str]]' = OrderedDict()
|
||||
self.base_cache_dir = base_cache_dir
|
||||
self.cache_dir = ''
|
||||
self.total_size = 0
|
||||
@@ -60,31 +62,36 @@ class IconDataCache:
|
||||
with open(path, 'wb') as f:
|
||||
f.write(data)
|
||||
self.total_size += len(data)
|
||||
self.key_map.pop(key, None) # mark this key as being used recently
|
||||
self.key_map[key] = data_hash
|
||||
self.hash_map[data_hash] = self.hash_map.pop(data_hash, set()) | {key} # mark this data as being used recently
|
||||
if key:
|
||||
self.key_map[key] = data_hash
|
||||
self.prune()
|
||||
return path
|
||||
|
||||
def get_icon(self, key: str) -> str:
|
||||
self._ensure_state()
|
||||
data_hash = self.key_map.pop(key, None)
|
||||
data_hash = self.key_map.get(key)
|
||||
if data_hash:
|
||||
self.key_map[key] = data_hash # mark this key as being used recently
|
||||
self.hash_map[data_hash] = self.hash_map.pop(data_hash, set()) | {key} # mark this data as being used recently
|
||||
return os.path.join(self.cache_dir, data_hash)
|
||||
return ''
|
||||
|
||||
def clear(self) -> None:
|
||||
while self.key_map:
|
||||
key, data_hash = self.key_map.popitem(False)
|
||||
self._remove_data_hash(data_hash)
|
||||
while self.hash_map:
|
||||
data_hash, keys = self.hash_map.popitem(False)
|
||||
for key in keys:
|
||||
self.key_map.pop(key, None)
|
||||
self._remove_data_file(data_hash)
|
||||
|
||||
def prune(self) -> None:
|
||||
self._ensure_state()
|
||||
while self.total_size > self.max_cache_size and self.key_map:
|
||||
key, data_hash = self.key_map.popitem(False)
|
||||
self._remove_data_hash(data_hash)
|
||||
while self.total_size > self.max_cache_size and self.hash_map:
|
||||
data_hash, keys = self.hash_map.popitem(False)
|
||||
for key in keys:
|
||||
self.key_map.pop(key, None)
|
||||
self._remove_data_file(data_hash)
|
||||
|
||||
def _remove_data_hash(self, data_hash: str) -> None:
|
||||
def _remove_data_file(self, data_hash: str) -> None:
|
||||
path = os.path.join(self.cache_dir, data_hash)
|
||||
with suppress(FileNotFoundError):
|
||||
sz = os.path.getsize(path)
|
||||
@@ -95,7 +102,9 @@ class IconDataCache:
|
||||
self._ensure_state()
|
||||
data_hash = self.key_map.pop(key, None)
|
||||
if data_hash:
|
||||
self._remove_data_hash(data_hash)
|
||||
for key in self.hash_map.pop(data_hash, set()):
|
||||
self.key_map.pop(key, None)
|
||||
self._remove_data_file(data_hash)
|
||||
|
||||
|
||||
class Urgency(Enum):
|
||||
@@ -111,6 +120,7 @@ class PayloadType(Enum):
|
||||
query = '?'
|
||||
close = 'close'
|
||||
icon = 'icon'
|
||||
alive = 'alive'
|
||||
|
||||
@property
|
||||
def is_text(self) -> bool:
|
||||
@@ -152,7 +162,7 @@ class DataStore:
|
||||
class EncodedDataStore:
|
||||
|
||||
def __init__(self, data_store: DataStore) -> None:
|
||||
self.decoder = StreamingBase64Decoder(initial_capacity=4096)
|
||||
self.decoder = StreamingBase64Decoder()
|
||||
self.data_store = data_store
|
||||
|
||||
@property
|
||||
@@ -168,14 +178,10 @@ class EncodedDataStore:
|
||||
def add_base64_data(self, data: Union[str, bytes]) -> None:
|
||||
if isinstance(data, str):
|
||||
data = data.encode('ascii')
|
||||
self.decoder.add(data)
|
||||
if len(self.decoder) >= self.data_store.max_size:
|
||||
self.data_store(self.decoder.take_output())
|
||||
self.data_store(self.decoder.decode(data))
|
||||
|
||||
def flush_encoded_data(self) -> None:
|
||||
self.decoder.flush()
|
||||
if len(self.decoder):
|
||||
self.data_store(self.decoder.take_output())
|
||||
self.decoder.reset()
|
||||
|
||||
def finalise(self) -> bytes:
|
||||
self.flush_encoded_data()
|
||||
@@ -190,19 +196,30 @@ def limit_size(x: str) -> str:
|
||||
|
||||
class NotificationCommand:
|
||||
|
||||
done: bool = True
|
||||
identifier: str = ''
|
||||
channel_id: int = 0
|
||||
desktop_notification_id: int = -1
|
||||
# data received from client and eventually displayed/processed
|
||||
title: str = ''
|
||||
body: str = ''
|
||||
actions: FrozenSet[Action] = frozenset((Action.focus,))
|
||||
only_when: OnlyWhen = OnlyWhen.unset
|
||||
urgency: Optional[Urgency] = None
|
||||
close_response_requested: Optional[bool] = None
|
||||
icon_data_key: str = ''
|
||||
icon_names: Tuple[str, ...] = ()
|
||||
application_name: str = ''
|
||||
notification_types: tuple[str, ...] = ()
|
||||
timeout: int = -2
|
||||
|
||||
# event callbacks
|
||||
on_activation: Optional[Callable[['NotificationCommand'], None]] = None
|
||||
on_close: Optional[Callable[['NotificationCommand'], None]] = None
|
||||
on_update: Optional[Callable[['NotificationCommand', 'NotificationCommand'], None]] = None
|
||||
|
||||
# metadata
|
||||
identifier: str = ''
|
||||
done: bool = True
|
||||
channel_id: int = 0
|
||||
desktop_notification_id: int = -1
|
||||
close_response_requested: Optional[bool] = None
|
||||
icon_path: str = ''
|
||||
icon_name: str = ''
|
||||
|
||||
# payload handling
|
||||
current_payload_type: PayloadType = PayloadType.title
|
||||
@@ -212,9 +229,6 @@ class NotificationCommand:
|
||||
created_by_desktop: bool = False
|
||||
activation_token: str = ''
|
||||
|
||||
# event callbacks
|
||||
on_activation: Optional[Callable[['NotificationCommand'], None]] = None
|
||||
|
||||
def __init__(self, icon_data_cache: 'ReferenceType[IconDataCache]', log: 'Log') -> None:
|
||||
self.icon_data_cache_ref = icon_data_cache
|
||||
self.log = log
|
||||
@@ -228,9 +242,12 @@ class NotificationCommand:
|
||||
return Action.focus in self.actions
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f'NotificationCommand(identifier={self.identifier!r}, title={self.title!r}, body={self.body!r},'
|
||||
f'actions={self.actions}, done={self.done!r}, urgency={self.urgency})')
|
||||
fields = {}
|
||||
for x in ('title', 'body', 'identifier', 'actions', 'urgency', 'done'):
|
||||
val = getattr(self, x)
|
||||
if val:
|
||||
fields[x] = val
|
||||
return f'NotificationCommand{fields}'
|
||||
|
||||
def parse_metadata(self, metadata: str, prev: 'NotificationCommand') -> Tuple[PayloadType, bool]:
|
||||
payload_type = PayloadType.title
|
||||
@@ -275,7 +292,25 @@ class NotificationCommand:
|
||||
elif k == 'g':
|
||||
self.icon_data_key = sanitize_id(v)
|
||||
elif k == 'n':
|
||||
self.icon_name = v
|
||||
try:
|
||||
self.icon_names += (base64_decode(v).decode('utf-8'),)
|
||||
except Exception:
|
||||
self.log('Ignoring invalid icon name in notification: {v!r}')
|
||||
elif k == 'f':
|
||||
try:
|
||||
self.application_name = base64_decode(v).decode('utf-8')
|
||||
except Exception:
|
||||
self.log('Ignoring invalid application_name in notification: {v!r}')
|
||||
elif k == 't':
|
||||
try:
|
||||
self.notification_types += (base64_decode(v).decode('utf-8'),)
|
||||
except Exception:
|
||||
self.log('Ignoring invalid notification type in notification: {v!r}')
|
||||
elif k == 'w':
|
||||
try:
|
||||
self.timeout = max(-1, int(v))
|
||||
except Exception:
|
||||
self.log('Ignoring invalid timeout in notification: {v!r}')
|
||||
if not prev.done and prev.identifier == self.identifier:
|
||||
self.merge_metadata(prev)
|
||||
return payload_type, payload_is_encoded
|
||||
@@ -292,8 +327,14 @@ class NotificationCommand:
|
||||
self.close_response_requested = prev.close_response_requested
|
||||
if not self.icon_data_key:
|
||||
self.icon_data_key = prev.icon_data_key
|
||||
if not self.icon_name:
|
||||
self.icon_name = prev.icon_name
|
||||
if prev.icon_names:
|
||||
self.icon_names = prev.icon_names + self.icon_names
|
||||
if not self.application_name:
|
||||
self.application_name = prev.application_name
|
||||
if prev.notification_types:
|
||||
self.notification_types = prev.notification_types + self.notification_types
|
||||
if self.timeout < -1:
|
||||
self.timeout = prev.timeout
|
||||
self.icon_path = prev.icon_path
|
||||
|
||||
def create_payload_buffer(self, payload_type: PayloadType) -> EncodedDataStore:
|
||||
@@ -332,12 +373,9 @@ class NotificationCommand:
|
||||
if truncated:
|
||||
self.log('Ignoring too long notification icon data')
|
||||
else:
|
||||
if self.icon_data_key:
|
||||
icd = self.icon_data_cache_ref()
|
||||
if icd:
|
||||
self.icon_path = icd.add_icon(self.icon_data_key, data)
|
||||
else:
|
||||
self.log('Ignoring notification icon data because no icon data key specified')
|
||||
icd = self.icon_data_cache_ref()
|
||||
if icd:
|
||||
self.icon_path = icd.add_icon(self.icon_data_key, data)
|
||||
|
||||
def finalise(self) -> None:
|
||||
if self.current_payload_buffer:
|
||||
@@ -347,6 +385,37 @@ class NotificationCommand:
|
||||
icd = self.icon_data_cache_ref()
|
||||
if icd:
|
||||
self.icon_path = icd.get_icon(self.icon_data_key)
|
||||
if self.title:
|
||||
self.title = sanitize_text(self.title)
|
||||
self.body = sanitize_text(self.body)
|
||||
else:
|
||||
self.title = sanitize_text(self.body)
|
||||
self.body = ''
|
||||
self.urgency = Urgency.Normal if self.urgency is None else self.urgency
|
||||
self.close_response_requested = bool(self.close_response_requested)
|
||||
self.timeout = max(-1, self.timeout)
|
||||
|
||||
def matches_rule_item(self, location:str, query:str) -> bool:
|
||||
import re
|
||||
pat = re.compile(query)
|
||||
if location == 'type':
|
||||
for x in self.notification_types:
|
||||
if pat.search(x) is not None:
|
||||
return True
|
||||
val = {'title': self.title, 'body': self.body, 'app': self.application_name}[location]
|
||||
return pat.search(val) is not None
|
||||
|
||||
def matches_rule(self, rule: str) -> bool:
|
||||
if rule == 'all':
|
||||
return True
|
||||
from .search_query_parser import search
|
||||
def get_matches(location: str, query: str, candidates: Set['NotificationCommand']) -> Set['NotificationCommand']:
|
||||
return {x for x in candidates if x.matches_rule_item(location, query)}
|
||||
try:
|
||||
return self in search(rule, ('title', 'body', 'app', 'type'), {self}, get_matches)
|
||||
except Exception as e:
|
||||
self.log(f'Ignoring invalid filter_notification rule: {rule} with error: {e}')
|
||||
return False
|
||||
|
||||
|
||||
class DesktopIntegration:
|
||||
@@ -360,18 +429,13 @@ class DesktopIntegration:
|
||||
def initialize(self) -> None:
|
||||
pass
|
||||
|
||||
def query_live_notifications(self, channel_id: int, identifier: str) -> None:
|
||||
raise NotImplementedError('Implement me in subclass')
|
||||
|
||||
def close_notification(self, desktop_notification_id: int) -> bool:
|
||||
raise NotImplementedError('Implement me in subclass')
|
||||
|
||||
def notify(self,
|
||||
title: str,
|
||||
body: str,
|
||||
timeout: int = -1,
|
||||
application: str = 'kitty',
|
||||
icon_name: str = '', icon_path: str = '',
|
||||
subtitle: Optional[str] = None,
|
||||
urgency: Urgency = Urgency.Normal,
|
||||
) -> int:
|
||||
def notify(self, nc: NotificationCommand, existing_desktop_notification_id: Optional[int]) -> int:
|
||||
raise NotImplementedError('Implement me in subclass')
|
||||
|
||||
def on_new_version_notification_activation(self, cmd: NotificationCommand) -> None:
|
||||
@@ -385,16 +449,26 @@ class DesktopIntegration:
|
||||
i = f'i={identifier or "0"}:'
|
||||
p = ','.join(x.value for x in PayloadType if x.value)
|
||||
c = ':c=1' if self.supports_close_events else ''
|
||||
return f'99;{i}p=?;a={actions}:o={when}:u={urgency}:p={p}{c}'
|
||||
return f'99;{i}p=?;a={actions}:o={when}:u={urgency}:p={p}{c}:w=1'
|
||||
|
||||
|
||||
class MacOSIntegration(DesktopIntegration):
|
||||
|
||||
supports_close_events: bool = False
|
||||
|
||||
def initialize(self) -> None:
|
||||
from .fast_data_types import cocoa_set_notification_activated_callback
|
||||
self.id_counter = count(start=1)
|
||||
self.live_notification_queries: List[Tuple[int, str]] = []
|
||||
cocoa_set_notification_activated_callback(self.notification_activated)
|
||||
|
||||
def query_live_notifications(self, channel_id: int, identifier: str) -> None:
|
||||
from .fast_data_types import cocoa_live_delivered_notifications
|
||||
if not cocoa_live_delivered_notifications():
|
||||
self.notification_manager.send_live_response(channel_id, identifier, ())
|
||||
else:
|
||||
self.live_notification_queries.append((channel_id, identifier))
|
||||
|
||||
def close_notification(self, desktop_notification_id: int) -> bool:
|
||||
from .fast_data_types import cocoa_remove_delivered_notification
|
||||
close_succeeded = cocoa_remove_delivered_notification(str(desktop_notification_id))
|
||||
@@ -402,24 +476,30 @@ class MacOSIntegration(DesktopIntegration):
|
||||
log_error(f'Close request for {desktop_notification_id=} {"succeeded" if close_succeeded else "failed"}')
|
||||
return close_succeeded
|
||||
|
||||
def notify(self,
|
||||
title: str,
|
||||
body: str,
|
||||
timeout: int = -1,
|
||||
application: str = 'kitty',
|
||||
icon_name: str = '', icon_path: str = '',
|
||||
subtitle: Optional[str] = None,
|
||||
urgency: Urgency = Urgency.Normal,
|
||||
) -> int:
|
||||
desktop_notification_id = next(self.id_counter)
|
||||
def notify(self, nc: NotificationCommand, existing_desktop_notification_id: Optional[int]) -> int:
|
||||
desktop_notification_id = existing_desktop_notification_id or next(self.id_counter)
|
||||
from .fast_data_types import cocoa_send_notification
|
||||
# If the body is not set macos makes the title the body and uses
|
||||
# "kitty" as the title. So use a single space for the body in this
|
||||
# case.
|
||||
cocoa_send_notification(str(desktop_notification_id), title, body or ' ', subtitle, urgency.value)
|
||||
# case. Although https://developer.apple.com/documentation/usernotifications/unnotificationcontent/body?language=objc
|
||||
# says printf style strings are stripped this does not actually happen,
|
||||
# so dont double %
|
||||
# for %% escaping.
|
||||
body = (nc.body or ' ')
|
||||
assert nc.urgency is not None
|
||||
cocoa_send_notification(str(desktop_notification_id), nc.title, body, nc.urgency.value)
|
||||
return desktop_notification_id
|
||||
|
||||
def notification_activated(self, event: str, ident: str) -> None:
|
||||
if event == 'live':
|
||||
if debug_desktop_integration:
|
||||
log_error('Got list of live notifications:', ident)
|
||||
live_ids = tuple(int(x) for x in ident.split(',') if x)
|
||||
self.notification_manager.purge_dead_notifications(live_ids)
|
||||
self.live_notification_queries, queries = [], self.live_notification_queries
|
||||
for channel_id, req_id in queries:
|
||||
self.notification_manager.send_live_response(channel_id, req_id, live_ids)
|
||||
return
|
||||
if debug_desktop_integration:
|
||||
log_error(f'Notification {ident} {event=}')
|
||||
try:
|
||||
@@ -429,9 +509,11 @@ class MacOSIntegration(DesktopIntegration):
|
||||
return
|
||||
if event == "created":
|
||||
self.notification_manager.notification_created(desktop_notification_id)
|
||||
from .fast_data_types import cocoa_live_delivered_notifications
|
||||
cocoa_live_delivered_notifications() # so that we purge dead notifications
|
||||
elif event == "activated":
|
||||
self.notification_manager.notification_activated(desktop_notification_id)
|
||||
elif event == "closed":
|
||||
elif event == "creation_failed":
|
||||
self.notification_manager.notification_closed(desktop_notification_id)
|
||||
|
||||
|
||||
@@ -442,7 +524,11 @@ class FreeDesktopIntegration(DesktopIntegration):
|
||||
dbus_set_notification_callback(self.dispatch_event_from_desktop)
|
||||
# map the id returned by the notification daemon to the
|
||||
# desktop_notification_id we use for the notification
|
||||
self.creation_id_map: 'OrderedDict[int, int]' = OrderedDict()
|
||||
self.dbus_to_desktop: 'OrderedDict[int, int]' = OrderedDict()
|
||||
self.desktop_to_dbus: Dict[int, int] = {}
|
||||
|
||||
def query_live_notifications(self, channel_id: int, identifier: str) -> None:
|
||||
self.notification_manager.send_live_response(channel_id, identifier, tuple(self.desktop_to_dbus))
|
||||
|
||||
def close_notification(self, desktop_notification_id: int) -> bool:
|
||||
from .fast_data_types import dbus_close_notification
|
||||
@@ -454,28 +540,32 @@ class FreeDesktopIntegration(DesktopIntegration):
|
||||
return close_succeeded
|
||||
|
||||
def get_desktop_notification_id(self, dbus_notification_id: int, event: str) -> Optional[int]:
|
||||
q = self.creation_id_map.get(dbus_notification_id)
|
||||
q = self.dbus_to_desktop.get(dbus_notification_id)
|
||||
if q is None:
|
||||
if debug_desktop_integration:
|
||||
log_error(f'Could not find desktop_notification_id for {dbus_notification_id=} for event {event}')
|
||||
return q
|
||||
|
||||
def get_dbus_notification_id(self, desktop_notification_id: int, event: str) ->Optional[int]:
|
||||
for dbus_id, q in self.creation_id_map.items():
|
||||
if q == desktop_notification_id:
|
||||
return dbus_id
|
||||
if debug_desktop_integration:
|
||||
log_error(f'Could not find dbus_notification_id for {desktop_notification_id=} for event {event}')
|
||||
return None
|
||||
q = self.desktop_to_dbus.get(desktop_notification_id)
|
||||
if q is None:
|
||||
if debug_desktop_integration:
|
||||
log_error(f'Could not find dbus_notification_id for {desktop_notification_id=} for event {event}')
|
||||
return q
|
||||
|
||||
def created(self, dbus_notification_id: int, desktop_notification_id: int) -> None:
|
||||
self.dbus_to_desktop[desktop_notification_id] = dbus_notification_id
|
||||
self.desktop_to_dbus[dbus_notification_id] = desktop_notification_id
|
||||
if len(self.dbus_to_desktop) > 128:
|
||||
k, v = self.dbus_to_desktop.popitem(False)
|
||||
self.desktop_to_dbus.pop(v, None)
|
||||
self.notification_manager.notification_created(dbus_notification_id)
|
||||
|
||||
def dispatch_event_from_desktop(self, event_type: str, dbus_notification_id: int, extra: Union[int, str]) -> None:
|
||||
if debug_desktop_integration:
|
||||
log_error(f'Got notification event from desktop: {event_type=} {dbus_notification_id=} {extra=}')
|
||||
if event_type == 'created':
|
||||
self.creation_id_map[int(extra)] = dbus_notification_id
|
||||
if len(self.creation_id_map) > 128:
|
||||
self.creation_id_map.popitem(False)
|
||||
self.notification_manager.notification_created(dbus_notification_id)
|
||||
self.created(dbus_notification_id, int(extra))
|
||||
return
|
||||
if desktop_notification_id := self.get_desktop_notification_id(dbus_notification_id, event_type):
|
||||
if event_type == 'activation_token':
|
||||
@@ -485,21 +575,38 @@ class FreeDesktopIntegration(DesktopIntegration):
|
||||
elif event_type == 'closed':
|
||||
self.notification_manager.notification_closed(desktop_notification_id)
|
||||
|
||||
def notify(self,
|
||||
title: str,
|
||||
body: str,
|
||||
timeout: int = -1,
|
||||
application: str = 'kitty',
|
||||
icon_name: str = '', icon_path: str = '',
|
||||
subtitle: Optional[str] = None,
|
||||
urgency: Urgency = Urgency.Normal,
|
||||
) -> int:
|
||||
def notify(self, nc: NotificationCommand, existing_desktop_notification_id: Optional[int]) -> int:
|
||||
from .fast_data_types import dbus_send_notification
|
||||
app_icon = icon_name or icon_path or get_custom_window_icon()[1] or logo_png_file
|
||||
from .xdg import icon_exists, icon_for_appname
|
||||
app_icon = ''
|
||||
if nc.icon_names:
|
||||
for name in nc.icon_names:
|
||||
if sn := standard_icon_names.get(name):
|
||||
app_icon = sn
|
||||
break
|
||||
if icon_exists(name):
|
||||
app_icon = name
|
||||
break
|
||||
if not app_icon:
|
||||
app_icon = nc.icon_path or nc.icon_names[0]
|
||||
else:
|
||||
app_icon = nc.icon_path or icon_for_appname(nc.application_name)
|
||||
if not app_icon:
|
||||
app_icon = get_custom_window_icon()[1] or logo_png_file
|
||||
|
||||
body = nc.body.replace('<', '<\u200c').replace('&', '&\u200c') # prevent HTML markup from being recognized
|
||||
assert nc.urgency is not None
|
||||
replaces_dbus_id = 0
|
||||
if existing_desktop_notification_id:
|
||||
replaces_dbus_id = self.get_dbus_notification_id(existing_desktop_notification_id, 'notify') or 0
|
||||
desktop_notification_id = dbus_send_notification(
|
||||
app_name=application, app_icon=app_icon, title=title, body=body, timeout=timeout, urgency=urgency.value)
|
||||
app_name=nc.application_name or 'kitty', app_icon=app_icon, title=nc.title, body=body, timeout=nc.timeout,
|
||||
urgency=nc.urgency.value, replaces=replaces_dbus_id)
|
||||
if debug_desktop_integration:
|
||||
log_error(f'Created notification with {desktop_notification_id=}')
|
||||
log_error(f'Requested creation of notification with {desktop_notification_id=}')
|
||||
if existing_desktop_notification_id and replaces_dbus_id:
|
||||
self.dbus_to_desktop.pop(replaces_dbus_id, None)
|
||||
self.desktop_to_dbus.pop(existing_desktop_notification_id, None)
|
||||
return desktop_notification_id
|
||||
|
||||
|
||||
@@ -564,7 +671,8 @@ class NotificationManager:
|
||||
channel: Channel = Channel(),
|
||||
log: Log = Log(),
|
||||
debug: bool = False,
|
||||
base_cache_dir: str = ''
|
||||
base_cache_dir: str = '',
|
||||
cleanup_at_exit: bool = True,
|
||||
):
|
||||
global debug_desktop_integration
|
||||
debug_desktop_integration = debug
|
||||
@@ -576,7 +684,19 @@ class NotificationManager:
|
||||
self.base_cache_dir = base_cache_dir
|
||||
self.log = log
|
||||
self.icon_data_cache = IconDataCache(base_cache_dir=self.base_cache_dir)
|
||||
script_path = os.path.join(config_dir, 'notifications.py')
|
||||
self.filter_script: Callable[[NotificationCommand], bool] = lambda nc: False
|
||||
if os.path.exists(script_path):
|
||||
import runpy
|
||||
try:
|
||||
m = runpy.run_path(script_path)
|
||||
self.filter_script = m['main']
|
||||
except Exception as e:
|
||||
self.log(f'Failed to load {script_path} with error: {e}')
|
||||
self.reset()
|
||||
if cleanup_at_exit:
|
||||
import atexit
|
||||
atexit.register(self.cleanup)
|
||||
|
||||
def reset(self) -> None:
|
||||
self.icon_data_cache.clear()
|
||||
@@ -587,6 +707,8 @@ class NotificationManager:
|
||||
def notification_created(self, desktop_notification_id: int) -> None:
|
||||
if n := self.in_progress_notification_commands.get(desktop_notification_id):
|
||||
n.created_by_desktop = True
|
||||
if n.timeout > 0:
|
||||
add_timer(partial(self.expire_notification, desktop_notification_id, id(n)), n.timeout / 1000, False)
|
||||
|
||||
def notification_activation_token_received(self, desktop_notification_id: int, token: str) -> None:
|
||||
if n := self.in_progress_notification_commands.get(desktop_notification_id):
|
||||
@@ -605,13 +727,27 @@ class NotificationManager:
|
||||
try:
|
||||
n.on_activation(n)
|
||||
except Exception as e:
|
||||
self.log(e)
|
||||
self.log('Notification on_activation handler failed with error:', e)
|
||||
|
||||
def notification_replaced(self, old_cmd: NotificationCommand, new_cmd: NotificationCommand) -> None:
|
||||
if old_cmd.desktop_notification_id != new_cmd.desktop_notification_id:
|
||||
self.in_progress_notification_commands.pop(old_cmd.desktop_notification_id, None)
|
||||
if old_cmd.on_update is not None:
|
||||
try:
|
||||
old_cmd.on_update(old_cmd, new_cmd)
|
||||
except Exception as e:
|
||||
self.log('Notification on_update handler failed with error:', e)
|
||||
|
||||
def notification_closed(self, desktop_notification_id: int) -> None:
|
||||
if n := self.in_progress_notification_commands.get(desktop_notification_id):
|
||||
self.purge_notification(n)
|
||||
if n.close_response_requested:
|
||||
self.send_closed_response(n.channel_id, n.identifier)
|
||||
if n.on_close is not None:
|
||||
try:
|
||||
n.on_close(n)
|
||||
except Exception as e:
|
||||
self.log('Notification on_close handler failed with error:', e)
|
||||
|
||||
def create_notification_cmd(self) -> NotificationCommand:
|
||||
return NotificationCommand(ref(self.icon_data_cache), self.log)
|
||||
@@ -634,7 +770,7 @@ class NotificationManager:
|
||||
cmd.on_activation = self.desktop_integration.on_new_version_notification_activation
|
||||
self.notify_with_command(cmd, 0)
|
||||
|
||||
def is_notification_allowed(self, cmd: NotificationCommand, channel_id: int) -> bool:
|
||||
def is_notification_allowed(self, cmd: NotificationCommand, channel_id: int, apply_filter_rules: bool = True) -> bool:
|
||||
if cmd.only_when is not OnlyWhen.always and cmd.only_when is not OnlyWhen.unset:
|
||||
ui_state = self.channel.ui_state(channel_id)
|
||||
if ui_state.has_keyboard_focus:
|
||||
@@ -643,21 +779,42 @@ class NotificationManager:
|
||||
return False
|
||||
return True
|
||||
|
||||
@property
|
||||
def filter_rules(self) -> Iterator[str]:
|
||||
return iter(get_options().filter_notification.keys())
|
||||
|
||||
def is_notification_filtered(self, cmd: NotificationCommand) -> bool:
|
||||
if self.filter_script(cmd):
|
||||
self.log(f'Notification {cmd.title!r} filtered out by script')
|
||||
return True
|
||||
for rule in self.filter_rules:
|
||||
if cmd.matches_rule(rule):
|
||||
self.log(f'Notification {cmd.title!r} filtered out by filter_notification rule: {rule}')
|
||||
return True
|
||||
return False
|
||||
|
||||
def notify_with_command(self, cmd: NotificationCommand, channel_id: int) -> Optional[int]:
|
||||
cmd.channel_id = channel_id
|
||||
cmd.finalise()
|
||||
title = cmd.title or cmd.body
|
||||
body = cmd.body if cmd.title else ''
|
||||
if not title or not self.is_notification_allowed(cmd, channel_id):
|
||||
if not cmd.title or not self.is_notification_allowed(cmd, channel_id) or self.is_notification_filtered(cmd):
|
||||
return None
|
||||
urgency = Urgency.Normal if cmd.urgency is None else cmd.urgency
|
||||
desktop_notification_id = self.desktop_integration.notify(
|
||||
title=sanitize_text(title), body=sanitize_text(body), urgency=urgency,
|
||||
icon_name=cmd.icon_name, icon_path=cmd.icon_path,
|
||||
)
|
||||
existing_desktop_notification_id: Optional[int] = None
|
||||
existing_cmd = self.in_progress_notification_commands_by_client_id.get(cmd.identifier) if cmd.identifier else None
|
||||
if existing_cmd:
|
||||
existing_desktop_notification_id = existing_cmd.desktop_notification_id
|
||||
desktop_notification_id = self.desktop_integration.notify(cmd, existing_desktop_notification_id)
|
||||
self.register_in_progress_notification(cmd, desktop_notification_id)
|
||||
if existing_cmd:
|
||||
self.notification_replaced(existing_cmd, cmd)
|
||||
if not self.desktop_integration.supports_close_events and cmd.close_response_requested:
|
||||
self.send_closed_response(channel_id, cmd.identifier, untracked=True)
|
||||
return desktop_notification_id
|
||||
|
||||
def expire_notification(self, desktop_notification_id: int, command_id: int, timer_id: int) -> None:
|
||||
if n := self.in_progress_notification_commands.get(desktop_notification_id):
|
||||
if id(n) == command_id:
|
||||
self.desktop_integration.close_notification(desktop_notification_id)
|
||||
|
||||
def register_in_progress_notification(self, cmd: NotificationCommand, desktop_notification_id: int) -> None:
|
||||
cmd.desktop_notification_id = desktop_notification_id
|
||||
self.in_progress_notification_commands[desktop_notification_id] = cmd
|
||||
@@ -680,6 +837,10 @@ class NotificationManager:
|
||||
if payload_type is PayloadType.query:
|
||||
self.channel.send(channel_id, self.desktop_integration.query_response(cmd.identifier))
|
||||
return None
|
||||
if payload_type is PayloadType.alive:
|
||||
if cmd.identifier:
|
||||
self.desktop_integration.query_live_notifications(channel_id, cmd.identifier)
|
||||
return None
|
||||
if payload_type is PayloadType.close:
|
||||
if cmd.identifier:
|
||||
to_close = self.in_progress_notification_commands_by_client_id.get(cmd.identifier)
|
||||
@@ -697,8 +858,21 @@ class NotificationManager:
|
||||
cmd.set_payload(payload_type, payload_is_encoded, payload, prev_cmd)
|
||||
return cmd
|
||||
|
||||
def send_closed_response(self, channel_id: int, client_id: str) -> None:
|
||||
self.channel.send(channel_id, f'99;i={client_id}:p=close;')
|
||||
def send_closed_response(self, channel_id: int, client_id: str, untracked: bool = False) -> None:
|
||||
payload = 'untracked' if untracked else ''
|
||||
self.channel.send(channel_id, f'99;i={client_id}:p={PayloadType.close.value};{payload}')
|
||||
|
||||
def send_live_response(self, channel_id: int, client_id: str, live_desktop_ids: Sequence[int]) -> None:
|
||||
ids = []
|
||||
for desktop_notification_id in live_desktop_ids:
|
||||
if n := self.in_progress_notification_commands.get(desktop_notification_id):
|
||||
if n.identifier and n.channel_id == channel_id:
|
||||
ids.append(n.identifier)
|
||||
self.channel.send(channel_id, f'99;i={client_id}:p={PayloadType.alive.value};{",".join(ids)}')
|
||||
|
||||
def purge_dead_notifications(self, live_desktop_ids: Sequence[int]) -> None:
|
||||
for d in set(self.in_progress_notification_commands) - set(live_desktop_ids):
|
||||
self.purge_notification(self.in_progress_notification_commands[d])
|
||||
|
||||
def purge_notification(self, cmd: NotificationCommand) -> None:
|
||||
self.in_progress_notification_commands_by_client_id.pop(cmd.identifier, None)
|
||||
@@ -722,3 +896,6 @@ class NotificationManager:
|
||||
parts = raw.split(';', 1)
|
||||
n.title, n.body = parts[0], (parts[1] if len(parts) > 1 else '')
|
||||
self.notify_with_command(n, channel_id)
|
||||
|
||||
def cleanup(self) -> None:
|
||||
del self.icon_data_cache
|
||||
|
||||
@@ -3035,6 +3035,29 @@ The value of :code:`VAR2` will be :code:`<path to home directory>/a/b`.
|
||||
'''
|
||||
)
|
||||
|
||||
opt('+filter_notification', '', option_type='filter_notification', add_to_default=False, long_text='''
|
||||
Specify rules to filter out notifications sent by applications running in kitty.
|
||||
Can be specified multiple times to create multiple filter rules. A rule specification
|
||||
is of the form :code:`field:regexp`. A filter rule
|
||||
can match on any of the fields: :code:`title`, :code:`body`, :code:`app`, :code:`type`.
|
||||
The special value of :code:`all` filters out all notifications. Rules can be combined
|
||||
using Boolean operators. Some examples::
|
||||
|
||||
filter_notification title:hello or body:"abc.*def"
|
||||
# filter out notification from vim except for ones about updates, (?i)
|
||||
# makes matching case insesitive.
|
||||
filter_notification app:"[ng]?vim" and not body:"(?i)update"
|
||||
# filter out all notifications
|
||||
filter_notification all
|
||||
|
||||
The field :code:`app` is the name of the application sending the notification and :code:`type`
|
||||
is the type of the notification. Not all applications will send these fields, so you can also
|
||||
match on the title and body of the notification text. More sophisticated programmatic filtering
|
||||
and custom actions on notifications can be done by creating a notifications.py file in the
|
||||
kitty config directory (:file:`~/.config/kitty`). An annotated sample is
|
||||
:link:`available <https://github.com/kovidgoyal/kitty/blob/master/docs/notifications.py>`.
|
||||
''')
|
||||
|
||||
opt('+watcher', '',
|
||||
option_type='store_multiple',
|
||||
add_to_default=False,
|
||||
|
||||
23
kitty/options/parse.py
generated
23
kitty/options/parse.py
generated
@@ -12,15 +12,15 @@ from kitty.options.utils import (
|
||||
config_or_absolute_path, copy_on_select, cursor_blink_interval, cursor_text_color,
|
||||
deprecated_adjust_line_height, deprecated_hide_window_decorations_aliases,
|
||||
deprecated_macos_show_window_title_in_menubar_alias, deprecated_send_text, disable_ligatures,
|
||||
edge_width, env, font_features, hide_window_decorations, macos_option_as_alt, macos_titlebar_color,
|
||||
menu_map, modify_font, narrow_symbols, notify_on_cmd_finish, optional_edge_width, parse_font_spec,
|
||||
parse_map, parse_mouse_map, paste_actions, remote_control_password, resize_debounce_time,
|
||||
scrollback_lines, scrollback_pager_history_size, shell_integration, store_multiple, symbol_map,
|
||||
tab_activity_symbol, tab_bar_edge, tab_bar_margin_height, tab_bar_min_tabs, tab_fade,
|
||||
tab_font_style, tab_separator, tab_title_template, titlebar_color, to_cursor_shape,
|
||||
to_cursor_unfocused_shape, to_font_size, to_layout_names, to_modifiers, url_prefixes, url_style,
|
||||
visual_bell_duration, visual_window_select_characters, window_border_width, window_logo_scale,
|
||||
window_size
|
||||
edge_width, env, filter_notification, font_features, hide_window_decorations, macos_option_as_alt,
|
||||
macos_titlebar_color, menu_map, modify_font, narrow_symbols, notify_on_cmd_finish,
|
||||
optional_edge_width, parse_font_spec, parse_map, parse_mouse_map, paste_actions,
|
||||
remote_control_password, resize_debounce_time, scrollback_lines, scrollback_pager_history_size,
|
||||
shell_integration, store_multiple, symbol_map, tab_activity_symbol, tab_bar_edge,
|
||||
tab_bar_margin_height, tab_bar_min_tabs, tab_fade, tab_font_style, tab_separator,
|
||||
tab_title_template, titlebar_color, to_cursor_shape, to_cursor_unfocused_shape, to_font_size,
|
||||
to_layout_names, to_modifiers, url_prefixes, url_style, visual_bell_duration,
|
||||
visual_window_select_characters, window_border_width, window_logo_scale, window_size
|
||||
)
|
||||
|
||||
|
||||
@@ -976,6 +976,10 @@ class Parser:
|
||||
def file_transfer_confirmation_bypass(self, val: str, ans: typing.Dict[str, typing.Any]) -> None:
|
||||
ans['file_transfer_confirmation_bypass'] = str(val)
|
||||
|
||||
def filter_notification(self, val: str, ans: typing.Dict[str, typing.Any]) -> None:
|
||||
for k, v in filter_notification(val, ans["filter_notification"]):
|
||||
ans["filter_notification"][k] = v
|
||||
|
||||
def focus_follows_mouse(self, val: str, ans: typing.Dict[str, typing.Any]) -> None:
|
||||
ans['focus_follows_mouse'] = to_bool(val)
|
||||
|
||||
@@ -1448,6 +1452,7 @@ def create_result_dict() -> typing.Dict[str, typing.Any]:
|
||||
'action_alias': {},
|
||||
'env': {},
|
||||
'exe_search_path': {},
|
||||
'filter_notification': {},
|
||||
'font_features': {},
|
||||
'kitten_alias': {},
|
||||
'menu_map': {},
|
||||
|
||||
3
kitty/options/types.py
generated
3
kitty/options/types.py
generated
@@ -347,6 +347,7 @@ option_names = ( # {{{
|
||||
'env',
|
||||
'exe_search_path',
|
||||
'file_transfer_confirmation_bypass',
|
||||
'filter_notification',
|
||||
'focus_follows_mouse',
|
||||
'font_family',
|
||||
'font_features',
|
||||
@@ -633,6 +634,7 @@ class Options:
|
||||
action_alias: typing.Dict[str, str] = {}
|
||||
env: typing.Dict[str, str] = {}
|
||||
exe_search_path: typing.Dict[str, str] = {}
|
||||
filter_notification: typing.Dict[str, str] = {}
|
||||
font_features: typing.Dict[str, typing.Tuple[kitty.fast_data_types.ParsedFontFeature, ...]] = {}
|
||||
kitten_alias: typing.Dict[str, str] = {}
|
||||
menu_map: typing.Dict[typing.Tuple[str, ...], str] = {}
|
||||
@@ -755,6 +757,7 @@ defaults = Options()
|
||||
defaults.action_alias = {}
|
||||
defaults.env = {}
|
||||
defaults.exe_search_path = {}
|
||||
defaults.filter_notification = {}
|
||||
defaults.font_features = {}
|
||||
defaults.kitten_alias = {}
|
||||
defaults.menu_map = {}
|
||||
|
||||
@@ -787,6 +787,10 @@ def config_or_absolute_path(x: str, env: Optional[Dict[str, str]] = None) -> Opt
|
||||
return resolve_abs_or_config_path(x, env)
|
||||
|
||||
|
||||
def filter_notification(val: str, current_val: Dict[str, str]) -> Iterable[Tuple[str, str]]:
|
||||
yield val, ''
|
||||
|
||||
|
||||
def remote_control_password(val: str, current_val: Dict[str, str]) -> Iterable[Tuple[str, Sequence[str]]]:
|
||||
val = val.strip()
|
||||
if val:
|
||||
|
||||
137
kitty/xdg.py
Normal file
137
kitty/xdg.py
Normal file
@@ -0,0 +1,137 @@
|
||||
#!/usr/bin/env python
|
||||
# License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
import os
|
||||
import re
|
||||
from contextlib import suppress
|
||||
|
||||
from kitty.types import run_once
|
||||
|
||||
|
||||
@run_once
|
||||
def xdg_data_dirs() -> tuple[str, ...]:
|
||||
return tuple(os.environ.get('XDG_DATA_DIRS', '/usr/local/share/:/usr/share/').split(os.pathsep))
|
||||
|
||||
|
||||
@run_once
|
||||
def icon_dirs() -> list[str]:
|
||||
ans = []
|
||||
def a(x: str) -> None:
|
||||
if os.path.isdir(x):
|
||||
ans.append(x)
|
||||
|
||||
a(os.path.expanduser('~/.icons'))
|
||||
for x in xdg_data_dirs():
|
||||
a(os.path.join(x, 'icons'))
|
||||
return ans
|
||||
|
||||
|
||||
class XDGIconCache:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.existing_icon_names: set[str] = set()
|
||||
self.scanned = False
|
||||
|
||||
def find_inherited_themes(self, basedir: str, seen_indexes: set[str], themes_to_search: set[str]) -> bool:
|
||||
if basedir not in seen_indexes:
|
||||
seen_indexes.add(basedir)
|
||||
with suppress(OSError), open(os.path.join(basedir, 'index.theme')) as f:
|
||||
raw = f.read()
|
||||
if m := re.search(r'^Inherits\s*=\s*(.+?)$', raw, re.MULTILINE):
|
||||
for x in m.group(1).split(','):
|
||||
themes_to_search.add(x.strip())
|
||||
return True
|
||||
return False
|
||||
|
||||
def scan(self) -> None:
|
||||
themes_to_search: set[str] = set()
|
||||
self.scanned = True
|
||||
seen_indexes: set[str] = set()
|
||||
for icdir in icon_dirs():
|
||||
if self.find_inherited_themes(os.path.join(icdir, 'default'), seen_indexes, themes_to_search):
|
||||
break
|
||||
themes_to_search.add('hicolor')
|
||||
while True:
|
||||
before = len(themes_to_search)
|
||||
for icdir in icon_dirs():
|
||||
for theme in tuple(themes_to_search):
|
||||
self.find_inherited_themes(os.path.join(icdir, theme), seen_indexes, themes_to_search)
|
||||
if len(themes_to_search) == before:
|
||||
break
|
||||
for icdir in icon_dirs():
|
||||
for theme in themes_to_search:
|
||||
self.scan_theme_dir(os.path.join(icdir, theme))
|
||||
self.scan_theme_dir('/usr/share/pixmaps')
|
||||
|
||||
def scan_theme_dir(self, base: str) -> None:
|
||||
with suppress(OSError):
|
||||
for (dirpath, dirnames, filenames) in os.walk(base):
|
||||
for q in filenames:
|
||||
icon_name, sep, ext = q.lower().rpartition('.')
|
||||
if sep == '.' and ext in ('svg', 'png', 'xpm'):
|
||||
self.existing_icon_names.add(icon_name)
|
||||
|
||||
def icon_exists(self, name: str) -> bool:
|
||||
if not self.scanned:
|
||||
self.scan()
|
||||
return name.lower() in self.existing_icon_names
|
||||
|
||||
|
||||
xdg_icon_cache = XDGIconCache()
|
||||
icon_exists = xdg_icon_cache.icon_exists
|
||||
|
||||
|
||||
class AppIconCache:
|
||||
def __init__(self) -> None:
|
||||
self.scanned = False
|
||||
self.lcase_app_name_to_path: dict[str, str] = {}
|
||||
self.lcase_full_name_to_path: dict[str, str] = {}
|
||||
self.icon_name_cache: dict[str, str] = {}
|
||||
|
||||
def scan(self) -> None:
|
||||
self.scanned = True
|
||||
for d in xdg_data_dirs():
|
||||
d = os.path.join(d, 'applications')
|
||||
with suppress(OSError):
|
||||
for (dirpath, dirnames, filenames) in os.walk(d):
|
||||
for fname in filenames:
|
||||
if fname.endswith('.desktop'):
|
||||
path = os.path.join(dirpath, fname)
|
||||
self.process_desktop_file(path, os.path.relpath(path, d))
|
||||
|
||||
def process_desktop_file(self, path: str, relpath: str) -> None:
|
||||
# file_id = relpath.replace('/', '-')
|
||||
bname = os.path.basename(relpath)
|
||||
parts = bname.split('.')[:-1]
|
||||
appname = parts[-1]
|
||||
self.lcase_app_name_to_path[appname.lower()] = path
|
||||
self.lcase_full_name_to_path['.'.join(parts).lower()] = path
|
||||
|
||||
def icon_for_appname(self, appname: str) -> str:
|
||||
if not self.scanned:
|
||||
self.scan()
|
||||
q = appname.lower()
|
||||
if not appname or q in ('kitty', 'kitten', 'kitten-notify'):
|
||||
return ''
|
||||
path = self.lcase_full_name_to_path.get(q) or self.lcase_app_name_to_path.get(q)
|
||||
if not path:
|
||||
return ''
|
||||
ans = self.icon_name_cache.get(path)
|
||||
if ans is None:
|
||||
try:
|
||||
ans = self.icon_name_cache[path] = self.icon_name_from_desktop_file(path)
|
||||
except OSError:
|
||||
ans = self.icon_name_cache[path] = ''
|
||||
|
||||
return ans
|
||||
|
||||
def icon_name_from_desktop_file(self, path: str) -> str:
|
||||
with open(path) as f:
|
||||
raw = f.read()
|
||||
if m := re.search(r'^Icon\s*=\s*(.+?)\s*?$', raw, re.MULTILINE):
|
||||
return m.group(1)
|
||||
return ''
|
||||
|
||||
|
||||
app_icon_cache = AppIconCache()
|
||||
icon_for_appname = app_icon_cache.icon_for_appname
|
||||
@@ -10,14 +10,17 @@ from . import BaseTest
|
||||
class TestClipboard(BaseTest):
|
||||
|
||||
def test_clipboard_write_request(self):
|
||||
wr = WriteRequest(max_size=64)
|
||||
wr.add_base64_data('bGlnaHQgd29yaw')
|
||||
self.ae(bytes(wr.current_leftover_bytes), b'aw')
|
||||
wr.flush_base64_data()
|
||||
self.ae(wr.data_for(), b'light work')
|
||||
wr = WriteRequest(max_size=64)
|
||||
wr.add_base64_data('bGlnaHQgd29yaw==')
|
||||
self.ae(wr.data_for(), b'light work')
|
||||
def t(data, expected):
|
||||
wr = WriteRequest(max_size=64)
|
||||
wr.add_base64_data(data)
|
||||
self.ae(wr.data_for(), expected)
|
||||
t('dGl0bGU=', b'title')
|
||||
t('dGl0bGU', b'title')
|
||||
t('dGl0bG', b'titl')
|
||||
t('dGl0bG==', b'titl')
|
||||
t('dGl0b', b'tit')
|
||||
t('bGlnaHQgd29yaw', b'light work')
|
||||
t('bGlnaHQgd29yaw==', b'light work')
|
||||
wr = WriteRequest(max_size=64)
|
||||
wr.add_base64_data('bGlnaHQgd29')
|
||||
for x in b'y', b'a', b'y', b'4', b'=':
|
||||
|
||||
@@ -6,15 +6,20 @@ import os
|
||||
import re
|
||||
import tempfile
|
||||
from base64 import standard_b64encode
|
||||
from typing import Optional
|
||||
|
||||
from kitty.notifications import Channel, DesktopIntegration, IconDataCache, NotificationManager, UIState, Urgency
|
||||
|
||||
from . import BaseTest
|
||||
|
||||
|
||||
def n(title='title', body='', urgency=Urgency.Normal, desktop_notification_id=1, icon_name='', icon_path=''):
|
||||
return {'title': title, 'body': body, 'urgency': urgency, 'id': desktop_notification_id, 'icon_name': icon_name, 'icon_path': icon_path}
|
||||
def n(
|
||||
title='title', body='', urgency=Urgency.Normal, desktop_notification_id=1, icon_names=(), icon_path='',
|
||||
application_name='', notification_types=(), timeout=-1,
|
||||
):
|
||||
return {
|
||||
'title': title, 'body': body, 'urgency': urgency, 'id': desktop_notification_id, 'icon_names': icon_names, 'icon_path': icon_path,
|
||||
'application_name': application_name, 'notification_types': notification_types, 'timeout': timeout
|
||||
}
|
||||
|
||||
|
||||
class DesktopIntegration(DesktopIntegration):
|
||||
@@ -28,6 +33,10 @@ class DesktopIntegration(DesktopIntegration):
|
||||
self.close_succeeds = True
|
||||
self.counter = 0
|
||||
|
||||
def query_live_notifications(self, channel_id, client_id):
|
||||
ids = [n['id'] for n in self.notifications]
|
||||
self.notification_manager.send_live_response(channel_id, client_id, tuple(ids))
|
||||
|
||||
def on_new_version_notification_activation(self, cmd) -> None:
|
||||
self.new_version_activated = True
|
||||
|
||||
@@ -37,18 +46,14 @@ class DesktopIntegration(DesktopIntegration):
|
||||
self.notification_manager.notification_closed(desktop_notification_id)
|
||||
return self.close_succeeds
|
||||
|
||||
def notify(self,
|
||||
title: str,
|
||||
body: str,
|
||||
timeout: int = -1,
|
||||
application: str = 'kitty',
|
||||
icon_name: str = '', icon_path: str = '',
|
||||
subtitle: Optional[str] = None,
|
||||
urgency: Urgency = Urgency.Normal,
|
||||
) -> int:
|
||||
self.counter += 1
|
||||
ans = n(title, body, urgency, self.counter, icon_name)
|
||||
ans['icon_path'] = os.path.basename(icon_path)
|
||||
def notify(self, cmd, existing_desktop_notification_id) -> int:
|
||||
if existing_desktop_notification_id:
|
||||
did = existing_desktop_notification_id
|
||||
else:
|
||||
self.counter += 1
|
||||
did = self.counter
|
||||
title, body, urgency = cmd.title, cmd.body, (Urgency.Normal if cmd.urgency is None else cmd.urgency)
|
||||
ans = n(title, body, urgency, did, cmd.icon_names, os.path.basename(cmd.icon_path), cmd.application_name, cmd.notification_types, timeout=cmd.timeout)
|
||||
self.notifications.append(ans)
|
||||
return self.counter
|
||||
|
||||
@@ -75,10 +80,17 @@ class Channel(Channel):
|
||||
self.responses.append(osc_escape_code)
|
||||
|
||||
|
||||
class NotificationManager(NotificationManager):
|
||||
|
||||
@property
|
||||
def filter_rules(self):
|
||||
yield from ('title:filterme',)
|
||||
|
||||
|
||||
def do_test(self: 'TestNotifications', tdir: str) -> None:
|
||||
di = DesktopIntegration(None)
|
||||
ch = Channel()
|
||||
nm = NotificationManager(di, ch, lambda *a, **kw: None, base_cache_dir=tdir)
|
||||
nm = NotificationManager(di, ch, lambda *a, **kw: None, base_cache_dir=tdir, cleanup_at_exit=False)
|
||||
di.notification_manager = nm
|
||||
|
||||
def reset():
|
||||
@@ -97,7 +109,7 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
|
||||
n = di.notifications[which]
|
||||
di.close_notification(n['id'])
|
||||
|
||||
def assert_events(focus=True, close=0, report='', close_response=''):
|
||||
def assert_events(focus=True, close=0, report='', close_response='', live=''):
|
||||
self.ae(ch.focus_events, [''] if focus else [])
|
||||
if report:
|
||||
self.assertIn(f'99;i={report};', ch.responses)
|
||||
@@ -111,6 +123,12 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
|
||||
for r in ch.responses:
|
||||
m = re.match(r'99;i=[a-z0-9]+:p=close;', r)
|
||||
self.assertIsNone(m, f'Unexpectedly found close response: {r}')
|
||||
if live:
|
||||
self.assertIn(f'99;i=live:p=alive;{live}', ch.responses)
|
||||
else:
|
||||
for r in ch.responses:
|
||||
m = re.match(r'99;i=[a-z0-9]+:p=alive;', r)
|
||||
self.assertIsNone(m, f'Unexpectedly found alive response: {r}')
|
||||
self.ae(di.close_events, [close] if close else [])
|
||||
|
||||
h('test it', osc_code=9)
|
||||
@@ -168,6 +186,12 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
|
||||
self.ae(di.notifications, [n()])
|
||||
reset()
|
||||
|
||||
# test filtering
|
||||
h(';title')
|
||||
h(';filterme please')
|
||||
self.ae(di.notifications, [n()])
|
||||
reset()
|
||||
|
||||
# test closing interactions with reporting and activation
|
||||
h('i=c;title')
|
||||
self.ae(di.notifications, [n()])
|
||||
@@ -199,6 +223,12 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
|
||||
assert_events(focus=True, report='c', close=True, close_response='c')
|
||||
reset()
|
||||
|
||||
h('i=a;title')
|
||||
h('i=b;title')
|
||||
h('i=live:p=alive;')
|
||||
assert_events(focus=False, live='a,b')
|
||||
reset()
|
||||
|
||||
h(';title')
|
||||
self.ae(di.notifications, [n()])
|
||||
activate()
|
||||
@@ -208,7 +238,7 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
|
||||
# Test querying
|
||||
h('i=xyz:p=?')
|
||||
self.assertFalse(di.notifications)
|
||||
qr = 'a=focus,report:o=always,unfocused,invisible:u=0,1,2:p=title,body,?,close,icon:c=1'
|
||||
qr = 'a=focus,report:o=always,unfocused,invisible:u=0,1,2:p=title,body,?,close,icon,alive:c=1:w=1'
|
||||
self.ae(ch.responses, [f'99;i=xyz:p=?;{qr}'])
|
||||
reset()
|
||||
h('p=?')
|
||||
@@ -230,6 +260,20 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
|
||||
self.ae(di.notifications, [n(text, text)])
|
||||
reset()
|
||||
|
||||
# Test application name and notification type
|
||||
def e(x):
|
||||
return standard_b64encode(x.encode()).decode()
|
||||
|
||||
h(f'i=t:d=0:f={e("app")}:t={e("1")};title')
|
||||
h(f'i=t:t={e("test")}')
|
||||
self.ae(di.notifications, [n(application_name='app', notification_types=('1', 'test',))])
|
||||
reset()
|
||||
|
||||
# Test timeout
|
||||
h('w=3;title')
|
||||
self.ae(di.notifications, [n(timeout=3)])
|
||||
reset()
|
||||
|
||||
# Test Disk Cache
|
||||
dc = IconDataCache(base_cache_dir=tdir, max_cache_size=4)
|
||||
cache_dir = dc._ensure_state()
|
||||
@@ -243,15 +287,16 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
|
||||
def send_with_icon(data='', n='', g=''):
|
||||
m = ''
|
||||
if n:
|
||||
m += f'n={n}:'
|
||||
for x in n.split(','):
|
||||
m += f'n={standard_b64encode(x.encode()).decode()}:'
|
||||
if g:
|
||||
m += f'g={g}:'
|
||||
h(f'i=9:d=0:{m};title')
|
||||
h(f'i=9:p=icon;{data}')
|
||||
|
||||
dc = nm.icon_data_cache
|
||||
send_with_icon(n='mycon')
|
||||
self.ae(di.notifications, [n(icon_name='mycon')])
|
||||
send_with_icon(n='mycon,ic2')
|
||||
self.ae(di.notifications, [n(icon_names=('mycon', 'ic2'))])
|
||||
reset()
|
||||
send_with_icon(g='gid')
|
||||
self.ae(di.notifications, [n()])
|
||||
@@ -259,9 +304,11 @@ def do_test(self: 'TestNotifications', tdir: str) -> None:
|
||||
send_with_icon(g='gid', data='1')
|
||||
self.ae(di.notifications, [n(icon_path=dc.hash(b'1'))])
|
||||
send_with_icon(g='gid', n='moose')
|
||||
self.ae(di.notifications[-1], n(icon_name='moose', icon_path=dc.hash(b'1'), desktop_notification_id=len(di.notifications)))
|
||||
self.ae(di.notifications[-1], n(icon_names=('moose',), icon_path=dc.hash(b'1')))
|
||||
send_with_icon(g='gid2', data='2')
|
||||
self.ae(di.notifications[-1], n(icon_path=dc.hash(b'2'), desktop_notification_id=len(di.notifications)))
|
||||
self.ae(di.notifications[-1], n(icon_path=dc.hash(b'2')))
|
||||
send_with_icon(data='3')
|
||||
self.ae(di.notifications[-1], n(icon_path=dc.hash(b'3')))
|
||||
reset()
|
||||
|
||||
|
||||
|
||||
@@ -78,10 +78,10 @@ def run_build(args: Any) -> None:
|
||||
needs_retry = 'arm64' in cmd or building_nightly
|
||||
if not needs_retry:
|
||||
raise
|
||||
print('Build failed, retrying in a few seconds...', file=sys.stderr)
|
||||
print('Build failed, retrying in a minute seconds...', file=sys.stderr)
|
||||
if 'macos' in cmd:
|
||||
call('python ../bypy macos shutdown')
|
||||
time.sleep(25)
|
||||
time.sleep(60)
|
||||
call(cmd, echo=True)
|
||||
|
||||
for x in ('64', 'arm64'):
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"kitty/kittens/hints"
|
||||
"kitty/kittens/hyperlinked_grep"
|
||||
"kitty/kittens/icat"
|
||||
"kitty/kittens/notify"
|
||||
"kitty/kittens/query_terminal"
|
||||
"kitty/kittens/show_key"
|
||||
"kitty/kittens/ssh"
|
||||
@@ -70,8 +71,10 @@ func KittyToolEntryPoints(root *cli.Command) {
|
||||
ask.EntryPoint(root)
|
||||
// hints
|
||||
hints.EntryPoint(root)
|
||||
// hints
|
||||
// diff
|
||||
diff.EntryPoint(root)
|
||||
// notify
|
||||
notify.EntryPoint(root)
|
||||
// themes
|
||||
themes.EntryPoint(root)
|
||||
themes.ParseEntryPoint(root)
|
||||
|
||||
Reference in New Issue
Block a user