Implement sending responses to peers

This commit is contained in:
Kovid Goyal
2018-03-02 12:35:32 +05:30
parent bfbbe3db14
commit e8286c50b2

View File

@@ -983,9 +983,19 @@ typedef struct {
static PeerReadData empty_prd = {.fd = -1, 0};
typedef struct {
size_t num_listen_fds, num_talk_fds, num_reads;
char *data;
size_t sz, pos;
int fd;
bool finished;
} PeerWriteData;
static PeerWriteData empty_pwd = {.fd = -1, 0};
typedef struct {
size_t num_listen_fds, num_talk_fds, num_reads, num_writes, num_queued_writes;
struct pollfd fds[MAX_PEERS + MAX_LISTENERS + 1];
PeerReadData reads[MAX_LISTENERS];
PeerWriteData writes[MAX_LISTENERS];
PeerWriteData queued_writes[MAX_LISTENERS];
int wakeup_fds[2];
pthread_mutex_t peer_lock;
} TalkData;
@@ -1001,14 +1011,12 @@ accept_peer(int listen_fd, bool shutting_down) {
if (!shutting_down) perror("accept() on talk socket failed!");
return false;
}
peer_mutex(lock);
size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds;
if (fd_idx < arraysz(talk_data.fds) && talk_data.num_reads < arraysz(talk_data.reads)) {
talk_data.fds[fd_idx].fd = peer; talk_data.fds[fd_idx].events = POLLIN;
talk_data.reads[talk_data.num_reads] = empty_prd; talk_data.reads[talk_data.num_reads++].fd = peer;
talk_data.num_talk_fds++;
} else nuke_socket(peer);
peer_mutex(unlock);
return true;
}
@@ -1047,9 +1055,31 @@ read_from_peer(ChildMonitor *self, int s) {
return read_finished;
}
static inline bool
write_to_peer(int fd) {
bool write_finished = false;
for (size_t i = 0; i < talk_data.num_writes; i++) {
PeerWriteData *wd = talk_data.writes + i;
#define failed(msg) { write_finished = true; fprintf(stderr, "%s\n", msg); wd->finished = true; break; }
if (wd->fd == fd) {
ssize_t n = send(fd, wd->data + wd->pos, wd->sz - wd->pos, MSG_NOSIGNAL);
if (n == 0) { failed("send() to peer failed to send any data"); }
else if (n < 0) {
if (errno != EINTR) { perror("write() to peer socket failed with error"); failed(""); }
} else {
wd->pos += n;
if (wd->pos >= wd->sz) { write_finished = true; wd->finished = true; }
}
break;
}
}
#undef failed
return write_finished;
}
static inline void
remove_poll_fd(int fd) {
peer_mutex(lock);
size_t count = talk_data.num_talk_fds + talk_data.num_listen_fds;
for (size_t i = talk_data.num_listen_fds; i < count; i++) {
struct pollfd *pfd = talk_data.fds + i;
@@ -1060,7 +1090,6 @@ remove_poll_fd(int fd) {
break;
}
}
peer_mutex(unlock);
}
static inline void
@@ -1081,6 +1110,23 @@ prune_finished_reads() {
}
}
static inline void
prune_finished_writes() {
if (!talk_data.num_writes) return;
for (ssize_t i = talk_data.num_writes - 1; i >= 0; i--) {
PeerWriteData *wd = talk_data.writes + i;
if (wd->finished) {
remove_poll_fd(wd->fd);
shutdown(wd->fd, SHUT_WR); close(wd->fd);
free(wd->data);
ssize_t num_to_right = talk_data.num_writes - 1 - i;
if (num_to_right > 0) memmove(talk_data.writes + i, talk_data.writes + i + 1, num_to_right * sizeof(PeerWriteData));
else talk_data.writes[i] = empty_pwd;
talk_data.num_writes--;
}
}
}
static void
wakeup_talk_loop(bool in_signal_handler) {
if (talk_data.wakeup_fds[1] <= 0) return;
@@ -1094,6 +1140,23 @@ wakeup_talk_loop(bool in_signal_handler) {
}
}
static inline void
move_queued_writes() {
while (talk_data.num_queued_writes) {
PeerWriteData *src = talk_data.queued_writes + talk_data.num_queued_writes--;
size_t fd_idx = talk_data.num_listen_fds + talk_data.num_talk_fds;
if (fd_idx < arraysz(talk_data.fds) && talk_data.num_writes < arraysz(talk_data.writes)) {
talk_data.fds[fd_idx].fd = src->fd; talk_data.fds[fd_idx].events = POLLOUT;
talk_data.writes[talk_data.num_writes++] = *src;
talk_data.num_talk_fds++;
} else {
fprintf(stderr, "Cannot send response to peer, too many peers\n");
free(src->data); nuke_socket(src->fd);
}
*src = empty_pwd;
}
}
static void*
talk_loop(void *data) {
// The talk thread loop
@@ -1114,15 +1177,20 @@ talk_loop(void *data) {
for (size_t i = 0; i < talk_data.num_listen_fds + talk_data.num_talk_fds; i++) { talk_data.fds[i].revents = 0; }
int ret = poll(talk_data.fds, talk_data.num_listen_fds + talk_data.num_talk_fds, -1);
if (ret > 0) {
bool has_finished_reads = false;
bool has_finished_reads = false, has_finished_writes = false;
for (size_t i = 0; i < talk_data.num_listen_fds - 1; i++) {
if (talk_data.fds[i].revents & POLLIN) {if (!accept_peer(talk_data.fds[i].fd, self->shutting_down)) goto end; }
}
if (talk_data.fds[talk_data.num_listen_fds - 1].revents & POLLIN) drain_fd(talk_data.fds[talk_data.num_listen_fds - 1].fd); // wakeup
for (size_t i = talk_data.num_listen_fds; i < talk_data.num_talk_fds + talk_data.num_listen_fds; i++) {
if (talk_data.fds[i].revents & (POLLIN | POLLHUP)) { if (read_from_peer(self, talk_data.fds[i].fd)) has_finished_reads = true; }
if (talk_data.fds[i].revents & POLLOUT) { if (write_to_peer(talk_data.fds[i].fd)) has_finished_writes = true; }
}
if (has_finished_reads) prune_finished_reads();
if (has_finished_writes) prune_finished_writes();
peer_mutex(lock);
if (talk_data.num_queued_writes) move_queued_writes();
peer_mutex(unlock);
} else if (ret < 0) { if (errno != EAGAIN && errno != EINTR) perror("poll() on talk fds failed"); }
}
end:
@@ -1130,11 +1198,29 @@ end:
return 0;
}
static inline bool
add_peer_writer(int fd, const char* msg, size_t msg_sz) {
bool ok = false;
peer_mutex(lock);
if (talk_data.num_queued_writes < arraysz(talk_data.queued_writes)) {
talk_data.queued_writes[talk_data.num_queued_writes] = empty_pwd;
talk_data.queued_writes[talk_data.num_queued_writes].data = malloc(msg_sz);
if (talk_data.queued_writes[talk_data.num_queued_writes].data) {
memcpy(talk_data.queued_writes[talk_data.num_queued_writes].data, msg, msg_sz);
talk_data.queued_writes[talk_data.num_queued_writes].sz = msg_sz;
talk_data.queued_writes[talk_data.num_queued_writes++].fd = fd;
ok = true;
}
} else fprintf(stderr, "Cannot send response to peer, too many peers\n");
peer_mutex(unlock);
return ok;
}
static void
send_response(int fd, const char *msg, size_t msg_sz) {
if (msg == NULL) { shutdown(fd, SHUT_WR); close(fd); return; }
(void)msg_sz; // TODO: Implement this
shutdown(fd, SHUT_WR); close(fd);
if (!add_peer_writer(fd, msg, msg_sz)) { shutdown(fd, SHUT_WR); close(fd); }
else wakeup_talk_loop(false);
}
// }}}