4#ifdef USE_SOCKET_IMPL_LWIP_TCP
26static volatile bool s_socket_woke =
false;
32 s_socket_woke =
false;
33 esp_delay(ms, []() {
return !s_socket_woke; });
42static const char *
const TAG =
"socket.lwip";
46#define LWIP_LOG(msg, ...) ESP_LOGVV(TAG, "socket %p: " msg, this, ##__VA_ARGS__)
48#define LWIP_LOG(msg, ...)
51class LWIPRawImpl :
public Socket {
53 LWIPRawImpl(
sa_family_t family,
struct tcp_pcb *pcb) : pcb_(pcb), family_(family) {}
54 ~LWIPRawImpl()
override {
55 if (pcb_ !=
nullptr) {
56 LWIP_LOG(
"tcp_abort(%p)", pcb_);
63 LWIP_LOG(
"init(%p)", pcb_);
65 tcp_recv(pcb_, LWIPRawImpl::s_recv_fn);
66 tcp_err(pcb_, LWIPRawImpl::s_err_fn);
69 std::unique_ptr<Socket> accept(
struct sockaddr *addr,
socklen_t *addrlen)
override {
75 if (pcb_ ==
nullptr) {
79 if (name ==
nullptr) {
86 if (family_ == AF_INET) {
91 auto *addr4 =
reinterpret_cast<const sockaddr_in *
>(name);
92 port = ntohs(addr4->sin_port);
93 ip.type = IPADDR_TYPE_V4;
94 ip.u_addr.ip4.addr = addr4->sin_addr.s_addr;
95 LWIP_LOG(
"tcp_bind(%p ip=%s port=%u)", pcb_, ip4addr_ntoa(&ip.u_addr.ip4), port);
96 }
else if (family_ == AF_INET6) {
101 auto *addr6 =
reinterpret_cast<const sockaddr_in6 *
>(name);
102 port = ntohs(addr6->sin6_port);
103 ip.type = IPADDR_TYPE_ANY;
104 memcpy(&ip.u_addr.ip6.addr, &addr6->sin6_addr.un.u8_addr, 16);
105 LWIP_LOG(
"tcp_bind(%p ip=%s port=%u)", pcb_, ip6addr_ntoa(&ip.u_addr.ip6), port);
111 if (family_ != AF_INET) {
115 auto *addr4 =
reinterpret_cast<const sockaddr_in *
>(name);
116 port = ntohs(addr4->sin_port);
117 ip.addr = addr4->sin_addr.s_addr;
118 LWIP_LOG(
"tcp_bind(%p ip=%u port=%u)", pcb_, ip.addr, port);
120 err_t err = tcp_bind(pcb_, &ip, port);
121 if (err == ERR_USE) {
122 LWIP_LOG(
" -> err ERR_USE");
126 if (err == ERR_VAL) {
127 LWIP_LOG(
" -> err ERR_VAL");
132 LWIP_LOG(
" -> err %d", err);
138 int close()
override {
139 if (pcb_ ==
nullptr) {
143 LWIP_LOG(
"tcp_close(%p)", pcb_);
144 err_t err = tcp_close(pcb_);
146 LWIP_LOG(
" -> err %d", err);
149 errno = err == ERR_MEM ? ENOMEM : EIO;
155 int shutdown(
int how)
override {
156 if (pcb_ ==
nullptr) {
160 bool shut_rx =
false, shut_tx =
false;
161 if (how == SHUT_RD) {
163 }
else if (how == SHUT_WR) {
165 }
else if (how == SHUT_RDWR) {
166 shut_rx = shut_tx =
true;
171 LWIP_LOG(
"tcp_shutdown(%p shut_rx=%d shut_tx=%d)", pcb_, shut_rx ? 1 : 0, shut_tx ? 1 : 0);
172 err_t err = tcp_shutdown(pcb_, shut_rx, shut_tx);
174 LWIP_LOG(
" -> err %d", err);
175 errno = err == ERR_MEM ? ENOMEM : EIO;
182 if (pcb_ ==
nullptr) {
186 if (name ==
nullptr || addrlen ==
nullptr) {
190 return this->ip2sockaddr_(&pcb_->remote_ip, pcb_->remote_port, name, addrlen);
192 std::string getpeername()
override {
193 if (pcb_ ==
nullptr) {
197 return this->format_ip_address_(pcb_->remote_ip);
200 if (pcb_ ==
nullptr) {
204 if (name ==
nullptr || addrlen ==
nullptr) {
208 return this->ip2sockaddr_(&pcb_->local_ip, pcb_->local_port, name, addrlen);
210 std::string getsockname()
override {
211 if (pcb_ ==
nullptr) {
215 return this->format_ip_address_(pcb_->local_ip);
217 int getsockopt(
int level,
int optname,
void *optval,
socklen_t *optlen)
override {
218 if (pcb_ ==
nullptr) {
222 if (optlen ==
nullptr || optval ==
nullptr) {
226 if (level == SOL_SOCKET && optname == SO_REUSEADDR) {
234 *
reinterpret_cast<int *
>(optval) = 1;
238 if (level == IPPROTO_TCP && optname == TCP_NODELAY) {
243 *
reinterpret_cast<int *
>(optval) = nodelay_;
251 int setsockopt(
int level,
int optname,
const void *optval,
socklen_t optlen)
override {
252 if (pcb_ ==
nullptr) {
256 if (level == SOL_SOCKET && optname == SO_REUSEADDR) {
266 if (level == IPPROTO_TCP && optname == TCP_NODELAY) {
271 int val = *
reinterpret_cast<const int *
>(optval);
279 int listen(
int backlog)
override {
285 ssize_t read(
void *buf,
size_t len)
override {
286 if (pcb_ ==
nullptr) {
290 if (rx_closed_ && rx_buf_ ==
nullptr) {
296 if (rx_buf_ ==
nullptr) {
302 uint8_t *buf8 =
reinterpret_cast<uint8_t *
>(buf);
303 while (
len && rx_buf_ !=
nullptr) {
304 size_t pb_len = rx_buf_->len;
305 size_t pb_left = pb_len - rx_buf_offset_;
308 size_t copysize = std::min(
len, pb_left);
309 memcpy(buf8,
reinterpret_cast<uint8_t *
>(rx_buf_->payload) + rx_buf_offset_, copysize);
311 if (pb_left == copysize) {
313 if (rx_buf_->next ==
nullptr) {
319 auto *old_buf = rx_buf_;
320 rx_buf_ = rx_buf_->next;
326 rx_buf_offset_ += copysize;
328 LWIP_LOG(
"tcp_recved(%p %u)", pcb_, copysize);
329 tcp_recved(pcb_, copysize);
343 ssize_t readv(
const struct iovec *iov,
int iovcnt)
override {
345 for (
int i = 0; i < iovcnt; i++) {
346 ssize_t err = read(
reinterpret_cast<uint8_t *
>(iov[i].iov_base), iov[i].iov_len);
355 if ((
size_t) err != iov[i].iov_len)
366 ssize_t internal_write(
const void *buf,
size_t len) {
367 if (pcb_ ==
nullptr) {
373 if (buf ==
nullptr) {
377 auto space = tcp_sndbuf(pcb_);
382 size_t to_send = std::min((
size_t) space,
len);
383 LWIP_LOG(
"tcp_write(%p buf=%p %u)", pcb_, buf, to_send);
384 err_t err = tcp_write(pcb_, buf, to_send, TCP_WRITE_FLAG_COPY);
385 if (err == ERR_MEM) {
386 LWIP_LOG(
" -> err ERR_MEM");
391 LWIP_LOG(
" -> err %d", err);
397 int internal_output() {
398 LWIP_LOG(
"tcp_output(%p)", pcb_);
399 err_t err = tcp_output(pcb_);
400 if (err == ERR_ABRT) {
401 LWIP_LOG(
" -> err ERR_ABRT");
409 LWIP_LOG(
" -> err %d", err);
415 ssize_t write(
const void *buf,
size_t len)
override {
424 int err = internal_output();
430 ssize_t writev(
const struct iovec *iov,
int iovcnt)
override {
432 for (
int i = 0; i < iovcnt; i++) {
433 ssize_t err = internal_write(
reinterpret_cast<uint8_t *
>(iov[i].iov_base), iov[i].iov_len);
442 if ((
size_t) err != iov[i].iov_len)
450 int err = internal_output();
461 int setblocking(
bool blocking)
override {
462 if (pcb_ ==
nullptr) {
474 void err_fn(err_t err) {
475 LWIP_LOG(
"err(err=%d)", err);
483 err_t recv_fn(
struct pbuf *pb, err_t err) {
484 LWIP_LOG(
"recv(pb=%p err=%d)", pb, err);
495 if (rx_buf_ ==
nullptr) {
500 pbuf_cat(rx_buf_, pb);
509 static void s_err_fn(
void *arg, err_t err) {
510 LWIPRawImpl *arg_this =
reinterpret_cast<LWIPRawImpl *
>(arg);
511 arg_this->err_fn(err);
514 static err_t s_recv_fn(
void *arg,
struct tcp_pcb *pcb,
struct pbuf *pb, err_t err) {
515 LWIPRawImpl *arg_this =
reinterpret_cast<LWIPRawImpl *
>(arg);
516 return arg_this->recv_fn(pb, err);
520 std::string format_ip_address_(
const ip_addr_t &ip) {
521 char buffer[50] = {};
522 if (IP_IS_V4_VAL(ip)) {
523 inet_ntoa_r(ip, buffer,
sizeof(buffer));
526 else if (IP_IS_V6_VAL(ip)) {
527 inet6_ntoa_r(ip, buffer,
sizeof(buffer));
530 return std::string(buffer);
534 if (family_ == AF_INET) {
544 inet_addr_from_ip4addr(&addr->
sin_addr, ip_2_ip4(ip));
548 else if (family_ == AF_INET6) {
562 ip4_2_ipv4_mapped_ipv6(ip_2_ip6(&mapped), ip_2_ip4(ip));
563 inet6_addr_from_ip6addr(&addr->
sin6_addr, ip_2_ip6(&mapped));
565 inet6_addr_from_ip6addr(&addr->
sin6_addr, ip_2_ip6(ip));
575 struct tcp_pcb *pcb_;
576 pbuf *rx_buf_ =
nullptr;
577 size_t rx_buf_offset_ = 0;
578 bool rx_closed_ =
false;
581 bool nodelay_ =
false;
587class LWIPRawListenImpl :
public LWIPRawImpl {
589 LWIPRawListenImpl(
sa_family_t family,
struct tcp_pcb *pcb) : LWIPRawImpl(family, pcb) {}
592 LWIP_LOG(
"init(%p)", pcb_);
594 tcp_accept(pcb_, LWIPRawListenImpl::s_accept_fn);
595 tcp_err(pcb_, LWIPRawImpl::s_err_fn);
598 std::unique_ptr<Socket> accept(
struct sockaddr *addr,
socklen_t *addrlen)
override {
599 if (pcb_ ==
nullptr) {
603 if (accepted_socket_count_ == 0) {
608 std::unique_ptr<LWIPRawImpl> sock = std::move(accepted_sockets_[0]);
610 for (uint8_t i = 1; i < accepted_socket_count_; i++) {
611 accepted_sockets_[i - 1] = std::move(accepted_sockets_[i]);
613 accepted_socket_count_--;
614 LWIP_LOG(
"Connection accepted by application, queue size: %d", accepted_socket_count_);
615 if (addr !=
nullptr) {
616 sock->getpeername(addr, addrlen);
618 LWIP_LOG(
"accept(%p)", sock.get());
619 return std::unique_ptr<Socket>(std::move(sock));
622 int listen(
int backlog)
override {
623 if (pcb_ ==
nullptr) {
627 LWIP_LOG(
"tcp_listen_with_backlog(%p backlog=%d)", pcb_, backlog);
628 struct tcp_pcb *listen_pcb = tcp_listen_with_backlog(pcb_, backlog);
629 if (listen_pcb ==
nullptr) {
638 LWIP_LOG(
"tcp_arg(%p)", pcb_);
640 tcp_accept(pcb_, LWIPRawListenImpl::s_accept_fn);
645 err_t accept_fn_(
struct tcp_pcb *newpcb, err_t err) {
646 LWIP_LOG(
"accept(newpcb=%p err=%d)", newpcb, err);
647 if (err != ERR_OK || newpcb ==
nullptr) {
655 if (accepted_socket_count_ >= MAX_ACCEPTED_SOCKETS) {
656 LWIP_LOG(
"Rejecting connection, queue full (%d)", accepted_socket_count_);
662 auto sock = make_unique<LWIPRawImpl>(family_, newpcb);
664 accepted_sockets_[accepted_socket_count_++] = std::move(sock);
665 LWIP_LOG(
"Accepted connection, queue size: %d", accepted_socket_count_);
673 static err_t s_accept_fn(
void *arg,
struct tcp_pcb *newpcb, err_t err) {
674 LWIPRawListenImpl *arg_this =
reinterpret_cast<LWIPRawListenImpl *
>(arg);
675 return arg_this->accept_fn_(newpcb, err);
692 static constexpr size_t MAX_ACCEPTED_SOCKETS = 3;
693 std::array<std::unique_ptr<LWIPRawImpl>, MAX_ACCEPTED_SOCKETS> accepted_sockets_;
694 uint8_t accepted_socket_count_ = 0;
697std::unique_ptr<Socket>
socket(
int domain,
int type,
int protocol) {
698 auto *pcb = tcp_new();
703 auto *sock =
new LWIPRawListenImpl((
sa_family_t) domain, pcb);
705 return std::unique_ptr<Socket>{sock};
void socket_wake()
Called by lwip callbacks to signal socket activity and wake delay.
std::unique_ptr< Socket > socket(int domain, int type, int protocol)
Create a socket of the given domain, type and protocol.
std::unique_ptr< Socket > socket_loop_monitored(int domain, int type, int protocol)
Create a socket and monitor it for data in the main loop.
void socket_delay(uint32_t ms)
Delay that can be woken early by socket activity.
struct in6_addr sin6_addr