diff --git a/kitty/child-monitor.c b/kitty/child-monitor.c index d73abdc3b..e70790bd1 100644 --- a/kitty/child-monitor.c +++ b/kitty/child-monitor.c @@ -983,9 +983,19 @@ typedef struct { static PeerReadData empty_prd = {.fd = -1, 0}; typedef struct { - size_t num_listen_fds, num_talk_fds, num_reads; + char *data; + size_t sz, pos; + int fd; + bool finished; +} PeerWriteData; +static PeerWriteData empty_pwd = {.fd = -1, 0}; + +typedef struct { + size_t num_listen_fds, num_talk_fds, num_reads, num_writes, num_queued_writes; struct pollfd fds[MAX_PEERS + MAX_LISTENERS + 1]; PeerReadData reads[MAX_LISTENERS]; + PeerWriteData writes[MAX_LISTENERS]; + PeerWriteData queued_writes[MAX_LISTENERS]; int wakeup_fds[2]; pthread_mutex_t peer_lock; } TalkData; @@ -1001,14 +1011,12 @@ accept_peer(int listen_fd, bool shutting_down) { if (!shutting_down) perror("accept() on talk socket failed!"); return false; } - peer_mutex(lock); size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds; if (fd_idx < arraysz(talk_data.fds) && talk_data.num_reads < arraysz(talk_data.reads)) { talk_data.fds[fd_idx].fd = peer; talk_data.fds[fd_idx].events = POLLIN; talk_data.reads[talk_data.num_reads] = empty_prd; talk_data.reads[talk_data.num_reads++].fd = peer; talk_data.num_talk_fds++; } else nuke_socket(peer); - peer_mutex(unlock); return true; } @@ -1047,9 +1055,31 @@ read_from_peer(ChildMonitor *self, int s) { return read_finished; } +static inline bool +write_to_peer(int fd) { + bool write_finished = false; + for (size_t i = 0; i < talk_data.num_writes; i++) { + PeerWriteData *wd = talk_data.writes + i; +#define failed(msg) { write_finished = true; fprintf(stderr, "%s\n", msg); wd->finished = true; break; } + if (wd->fd == fd) { + ssize_t n = send(fd, wd->data + wd->pos, wd->sz - wd->pos, MSG_NOSIGNAL); + if (n == 0) { failed("send() to peer failed to send any data"); } + else if (n < 0) { + if (errno != EINTR) { perror("write() to peer socket failed with error"); failed(""); } + } else { + wd->pos += n; + if (wd->pos >= wd->sz) { write_finished = true; wd->finished = true; } + } + break; + } + + } +#undef failed + return write_finished; +} + static inline void remove_poll_fd(int fd) { - peer_mutex(lock); size_t count = talk_data.num_talk_fds + talk_data.num_listen_fds; for (size_t i = talk_data.num_listen_fds; i < count; i++) { struct pollfd *pfd = talk_data.fds + i; @@ -1060,7 +1090,6 @@ remove_poll_fd(int fd) { break; } } - peer_mutex(unlock); } static inline void @@ -1081,6 +1110,23 @@ prune_finished_reads() { } } +static inline void +prune_finished_writes() { + if (!talk_data.num_writes) return; + for (ssize_t i = talk_data.num_writes - 1; i >= 0; i--) { + PeerWriteData *wd = talk_data.writes + i; + if (wd->finished) { + remove_poll_fd(wd->fd); + shutdown(wd->fd, SHUT_WR); close(wd->fd); + free(wd->data); + ssize_t num_to_right = talk_data.num_writes - 1 - i; + if (num_to_right > 0) memmove(talk_data.writes + i, talk_data.writes + i + 1, num_to_right * sizeof(PeerWriteData)); + else talk_data.writes[i] = empty_pwd; + talk_data.num_writes--; + } + } +} + static void wakeup_talk_loop(bool in_signal_handler) { if (talk_data.wakeup_fds[1] <= 0) return; @@ -1094,6 +1140,23 @@ wakeup_talk_loop(bool in_signal_handler) { } } +static inline void +move_queued_writes() { + while (talk_data.num_queued_writes) { + PeerWriteData *src = talk_data.queued_writes + talk_data.num_queued_writes--; + size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds; + if (fd_idx < arraysz(talk_data.fds) && talk_data.num_writes < arraysz(talk_data.writes)) { + talk_data.fds[fd_idx].fd = src->fd; talk_data.fds[fd_idx].events = POLLOUT; + talk_data.writes[talk_data.num_writes++] = *src; + talk_data.num_talk_fds++; + } else { + fprintf(stderr, "Cannot send response to peer, too many peers\n"); + free(src->data); nuke_socket(src->fd); + } + *src = empty_pwd; + } +} + static void* talk_loop(void *data) { // The talk thread loop @@ -1114,15 +1177,20 @@ talk_loop(void *data) { for (size_t i = 0; i < talk_data.num_listen_fds + talk_data.num_talk_fds; i++) { talk_data.fds[i].revents = 0; } int ret = poll(talk_data.fds, talk_data.num_listen_fds + talk_data.num_talk_fds, -1); if (ret > 0) { - bool has_finished_reads = false; + bool has_finished_reads = false, has_finished_writes = false; for (size_t i = 0; i < talk_data.num_listen_fds - 1; i++) { if (talk_data.fds[i].revents & POLLIN) {if (!accept_peer(talk_data.fds[i].fd, self->shutting_down)) goto end; } } if (talk_data.fds[talk_data.num_listen_fds - 1].revents & POLLIN) drain_fd(talk_data.fds[talk_data.num_listen_fds - 1].fd); // wakeup for (size_t i = talk_data.num_listen_fds; i < talk_data.num_talk_fds + talk_data.num_listen_fds; i++) { if (talk_data.fds[i].revents & (POLLIN | POLLHUP)) { if (read_from_peer(self, talk_data.fds[i].fd)) has_finished_reads = true; } + if (talk_data.fds[i].revents & POLLOUT) { if (write_to_peer(talk_data.fds[i].fd)) has_finished_writes = true; } } if (has_finished_reads) prune_finished_reads(); + if (has_finished_writes) prune_finished_writes(); + peer_mutex(lock); + if (talk_data.num_queued_writes) move_queued_writes(); + peer_mutex(unlock); } else if (ret < 0) { if (errno != EAGAIN && errno != EINTR) perror("poll() on talk fds failed"); } } end: @@ -1130,11 +1198,29 @@ end: return 0; } +static inline bool +add_peer_writer(int fd, const char* msg, size_t msg_sz) { + bool ok = false; + peer_mutex(lock); + if (talk_data.num_queued_writes < arraysz(talk_data.queued_writes)) { + talk_data.queued_writes[talk_data.num_queued_writes] = empty_pwd; + talk_data.queued_writes[talk_data.num_queued_writes].data = malloc(msg_sz); + if (talk_data.queued_writes[talk_data.num_queued_writes].data) { + memcpy(talk_data.queued_writes[talk_data.num_queued_writes].data, msg, msg_sz); + talk_data.queued_writes[talk_data.num_queued_writes].sz = msg_sz; + talk_data.queued_writes[talk_data.num_queued_writes++].fd = fd; + ok = true; + } + } else fprintf(stderr, "Cannot send response to peer, too many peers\n"); + peer_mutex(unlock); + return ok; +} + static void send_response(int fd, const char *msg, size_t msg_sz) { if (msg == NULL) { shutdown(fd, SHUT_WR); close(fd); return; } - (void)msg_sz; // TODO: Implement this - shutdown(fd, SHUT_WR); close(fd); + if (!add_peer_writer(fd, msg, msg_sz)) { shutdown(fd, SHUT_WR); close(fd); } + else wakeup_talk_loop(false); } // }}}