Skip to main content

compio_net/
tcp.rs

1use std::{
2    future::Future,
3    io,
4    net::SocketAddr,
5    pin::Pin,
6    task::{Context, Poll},
7    time::Duration,
8};
9
10use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
11use compio_driver::{
12    BufferRef, SharedFd, impl_raw_fd,
13    op::{RecvFlags, RecvMsgMultiResult, SendFlags, SendMsgZc, SendVectoredZc, SendZc},
14};
15use compio_io::{
16    AsyncRead, AsyncReadManaged, AsyncReadMulti, AsyncWrite, AsyncWriteZerocopy,
17    ancillary::{
18        AsyncReadAncillary, AsyncReadAncillaryManaged, AsyncReadAncillaryMulti,
19        AsyncWriteAncillary, AsyncWriteAncillaryZerocopy,
20    },
21    util::Splittable,
22};
23use compio_runtime::fd::PollFd;
24use futures_util::{Stream, StreamExt, stream::FusedStream};
25use socket2::{Protocol, SockAddr, Socket as Socket2, Type};
26
27use crate::{
28    Extract, Incoming, MSG_NOSIGNAL, ReadHalf, Socket, ToSocketAddrsAsync, WriteHalf, Zerocopy,
29};
30
31/// A TCP socket server, listening for connections.
32///
33/// You can accept a new connection by using the
34/// [`accept`](`TcpListener::accept`) method.
35///
36/// # Examples
37///
38/// ```
39/// use std::net::SocketAddr;
40///
41/// use compio_io::{AsyncReadExt, AsyncWriteExt};
42/// use compio_net::{TcpListener, TcpStream};
43/// use socket2::SockAddr;
44///
45/// # compio_runtime::Runtime::new().unwrap().block_on(async move {
46/// let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
47///
48/// let addr = listener.local_addr().unwrap();
49///
50/// let tx_fut = TcpStream::connect(&addr);
51///
52/// let rx_fut = listener.accept();
53///
54/// let (mut tx, (mut rx, _)) = futures_util::try_join!(tx_fut, rx_fut).unwrap();
55///
56/// tx.write_all("test").await.0.unwrap();
57///
58/// let (_, buf) = rx.read_exact(Vec::with_capacity(4)).await.unwrap();
59///
60/// assert_eq!(buf, b"test");
61/// # });
62/// ```
63#[derive(Debug, Clone)]
64pub struct TcpListener {
65    inner: Socket,
66}
67
68impl TcpListener {
69    /// Creates a new `TcpListener`, which will be bound to the specified
70    /// address.
71    ///
72    /// The returned listener is ready for accepting connections.
73    ///
74    /// Binding with a port number of 0 will request that the OS assigns a port
75    /// to this listener.
76    ///
77    /// It enables the `SO_REUSEADDR` option by default.
78    ///
79    /// To configure the socket before binding, you can use the [`TcpSocket`]
80    /// type.
81    pub async fn bind(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
82        super::each_addr(addr, |addr| async move {
83            let sa = SockAddr::from(addr);
84            let socket = Socket::new(sa.domain(), Type::STREAM, Some(Protocol::TCP)).await?;
85            socket.socket.set_reuse_address(true)?;
86            socket.bind(&sa).await?;
87            socket.listen(128).await?;
88            Ok(Self { inner: socket })
89        })
90        .await
91    }
92
93    /// Creates new TcpListener from a [`std::net::TcpListener`].
94    pub fn from_std(stream: std::net::TcpListener) -> io::Result<Self> {
95        Ok(Self {
96            inner: Socket::from_socket2(Socket2::from(stream))?,
97        })
98    }
99
100    /// Close the socket. If the returned future is dropped before polling, the
101    /// socket won't be closed.
102    ///
103    /// See [`TcpStream::close`] for more details.
104    ///
105    /// [`TcpStream::close`]: crate::tcp::TcpStream::close
106    pub fn close(self) -> impl Future<Output = io::Result<()>> {
107        self.inner.close()
108    }
109
110    /// Accepts a new incoming connection from this listener.
111    ///
112    /// This function will yield once a new TCP connection is established. When
113    /// established, the corresponding [`TcpStream`] and the remote peer's
114    /// address will be returned.
115    pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
116        let (socket, addr) = self.inner.accept().await?;
117        let stream = TcpStream { inner: socket };
118        Ok((stream, addr.as_socket().expect("should be SocketAddr")))
119    }
120
121    /// Accepts a new incoming connection from this listener using the provided
122    /// socket.
123    #[cfg(windows)]
124    pub async fn accept_with(&self, sock: TcpSocket) -> io::Result<(TcpStream, SocketAddr)> {
125        let (socket, addr) = self.inner.accept_with(sock.inner).await?;
126        let stream = TcpStream { inner: socket };
127        Ok((stream, addr.as_socket().expect("should be SocketAddr")))
128    }
129
130    /// Returns a stream of incoming connections to this listener.
131    pub fn incoming(&self) -> TcpIncoming<'_> {
132        TcpIncoming {
133            inner: self.inner.incoming(),
134        }
135    }
136
137    /// Returns the local address that this listener is bound to.
138    ///
139    /// This can be useful, for example, when binding to port 0 to
140    /// figure out which port was actually bound.
141    ///
142    /// # Examples
143    ///
144    /// ```
145    /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
146    ///
147    /// use compio_net::TcpListener;
148    /// use socket2::SockAddr;
149    ///
150    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
151    /// let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
152    ///
153    /// let addr = listener.local_addr().expect("Couldn't get local address");
154    /// assert_eq!(
155    ///     addr,
156    ///     SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))
157    /// );
158    /// # });
159    /// ```
160    pub fn local_addr(&self) -> io::Result<SocketAddr> {
161        self.inner
162            .local_addr()
163            .map(|addr| addr.as_socket().expect("should be SocketAddr"))
164    }
165
166    /// Returns the value of the `SO_ERROR` option.
167    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
168        self.inner.socket.take_error()
169    }
170
171    /// Gets the value of the `IP_TTL` option for this socket.
172    ///
173    /// For more information about this option, see [`set_ttl_v4`].
174    ///
175    /// [`set_ttl_v4`]: method@Self::set_ttl_v4
176    pub fn ttl_v4(&self) -> io::Result<u32> {
177        self.inner.socket.ttl_v4()
178    }
179
180    /// Sets the value for the `IP_TTL` option on this socket.
181    ///
182    /// This value sets the time-to-live field that is used in every packet sent
183    /// from this socket.
184    pub fn set_ttl_v4(&self, ttl: u32) -> io::Result<()> {
185        self.inner.socket.set_ttl_v4(ttl)
186    }
187}
188
189impl_raw_fd!(TcpListener, socket2::Socket, inner, socket);
190
191/// A stream of incoming TCP connections.
192pub struct TcpIncoming<'a> {
193    inner: Incoming<'a>,
194}
195
196impl Stream for TcpIncoming<'_> {
197    type Item = io::Result<TcpStream>;
198
199    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
200        let this = self.get_mut();
201        this.inner.poll_next_unpin(cx).map(|res| {
202            res.map(|res| {
203                let socket = res?;
204                Ok(TcpStream { inner: socket })
205            })
206        })
207    }
208}
209
210impl FusedStream for TcpIncoming<'_> {
211    fn is_terminated(&self) -> bool {
212        self.inner.is_terminated()
213    }
214}
215
216/// A TCP stream between a local and a remote socket.
217///
218/// A TCP stream can either be created by connecting to an endpoint, via the
219/// `connect` method, or by accepting a connection from a listener.
220///
221/// # Examples
222///
223/// ```no_run
224/// use std::net::SocketAddr;
225///
226/// use compio_io::AsyncWrite;
227/// use compio_net::TcpStream;
228///
229/// # compio_runtime::Runtime::new().unwrap().block_on(async {
230/// // Connect to a peer
231/// let mut stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
232///
233/// // Write some data.
234/// stream.write("hello world!").await.unwrap();
235/// # })
236/// ```
237#[derive(Debug, Clone)]
238pub struct TcpStream {
239    inner: Socket,
240}
241
242impl TcpStream {
243    /// Opens a TCP connection to a remote host.
244    ///
245    /// To configure the socket before connecting, you can use the [`TcpSocket`]
246    /// type.
247    pub async fn connect(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
248        use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
249
250        super::each_addr(addr, |addr| async move {
251            let addr2 = SockAddr::from(addr);
252            let socket = Socket::new(addr2.domain(), Type::STREAM, Some(Protocol::TCP)).await?;
253            if cfg!(windows) {
254                let bind_addr = if addr.is_ipv4() {
255                    SockAddr::from(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
256                } else if addr.is_ipv6() {
257                    SockAddr::from(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0))
258                } else {
259                    return Err(io::Error::new(
260                        io::ErrorKind::AddrNotAvailable,
261                        "Unsupported address domain.",
262                    ));
263                };
264                socket.bind(&bind_addr).await?;
265            };
266            socket.connect_async(&addr2).await?;
267            Ok(Self { inner: socket })
268        })
269        .await
270    }
271
272    /// Creates new TcpStream from a [`std::net::TcpStream`].
273    pub fn from_std(stream: std::net::TcpStream) -> io::Result<Self> {
274        Ok(Self {
275            inner: Socket::from_socket2(Socket2::from(stream))?,
276        })
277    }
278
279    /// Close the socket. If the returned future is dropped before polling, the
280    /// socket won't be closed.
281    ///
282    /// As the socket is clonable, users can call `close` on a clone, but the
283    /// future will never complete until all clones are dropped. Some
284    /// operations may keep a strong reference to the socket, so the future
285    /// may never complete if there are pending operations.
286    ///
287    /// It's OK to drop the socket directly without calling `close`, but the
288    /// socket may not be closed immediately.
289    pub fn close(self) -> impl Future<Output = io::Result<()>> {
290        self.inner.close()
291    }
292
293    /// Returns the socket address of the remote peer of this TCP connection.
294    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
295        self.inner
296            .peer_addr()
297            .map(|addr| addr.as_socket().expect("should be SocketAddr"))
298    }
299
300    /// Returns the socket address of the local half of this TCP connection.
301    pub fn local_addr(&self) -> io::Result<SocketAddr> {
302        self.inner
303            .local_addr()
304            .map(|addr| addr.as_socket().expect("should be SocketAddr"))
305    }
306
307    /// Returns the value of the `SO_ERROR` option.
308    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
309        self.inner.socket.take_error()
310    }
311
312    /// Splits a [`TcpStream`] into a read half and a write half, which can be
313    /// used to read and write the stream concurrently.
314    ///
315    /// This method is more efficient than
316    /// [`into_split`](TcpStream::into_split), but the halves cannot
317    /// be moved into independently spawned tasks.
318    pub fn split(&self) -> (ReadHalf<'_, Self>, WriteHalf<'_, Self>) {
319        crate::split(self)
320    }
321
322    /// Splits a [`TcpStream`] into a read half and a write half, which can be
323    /// used to read and write the stream concurrently.
324    ///
325    /// Unlike [`split`](TcpStream::split), the owned halves can be moved to
326    /// separate tasks.
327    pub fn into_split(self) -> (Self, Self) {
328        (self.clone(), self)
329    }
330
331    /// Create [`PollFd`] from inner socket.
332    pub fn to_poll_fd(&self) -> io::Result<PollFd<Socket2>> {
333        self.inner.to_poll_fd()
334    }
335
336    /// Create [`PollFd`] from inner socket.
337    pub fn into_poll_fd(self) -> io::Result<PollFd<Socket2>> {
338        self.inner.into_poll_fd()
339    }
340
341    /// Close the connection of the socket, and reuse it to create a new
342    /// connection. This method is useful when the socket is created by
343    /// [`TcpListener::accept`], and will be reused in
344    /// [`TcpListener::accept_with`] to accept a new connection.
345    #[cfg(windows)]
346    pub async fn disconnect(self) -> io::Result<TcpSocket> {
347        self.inner.disconnect().await?;
348        Ok(TcpSocket { inner: self.inner })
349    }
350
351    /// Gets the value of the `TCP_NODELAY` option on this socket.
352    ///
353    /// For more information about this option, see
354    /// [`TcpStream::set_nodelay`].
355    pub fn nodelay(&self) -> io::Result<bool> {
356        self.inner.socket.tcp_nodelay()
357    }
358
359    /// Sets the value of the TCP_NODELAY option on this socket.
360    ///
361    /// If set, this option disables the Nagle algorithm. This means
362    /// that segments are always sent as soon as possible, even if
363    /// there is only a small amount of data. When not set, data is
364    /// buffered until there is a sufficient amount to send out,
365    /// thereby avoiding the frequent sending of small packets.
366    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
367        self.inner.socket.set_tcp_nodelay(nodelay)
368    }
369
370    /// Gets the value of the `TCP_QUICKACK` option on this socket.
371    ///
372    /// For more information about this option, see [`TcpStream::set_quickack`].
373    #[cfg(any(
374        target_os = "linux",
375        target_os = "android",
376        target_os = "fuchsia",
377        target_os = "cygwin",
378    ))]
379    pub fn quickack(&self) -> io::Result<bool> {
380        self.inner.socket.tcp_quickack()
381    }
382
383    /// Enable or disable `TCP_QUICKACK`.
384    ///
385    /// This flag causes Linux to eagerly send `ACK`s rather than delaying them.
386    /// Linux may reset this flag after further operations on the socket.
387    #[cfg(any(
388        target_os = "linux",
389        target_os = "android",
390        target_os = "fuchsia",
391        target_os = "cygwin",
392    ))]
393    pub fn set_quickack(&self, quickack: bool) -> io::Result<()> {
394        self.inner.socket.set_tcp_quickack(quickack)
395    }
396
397    /// Reads the linger duration for this socket by getting the `SO_LINGER`
398    /// option.
399    pub fn linger(&self) -> io::Result<Option<Duration>> {
400        self.inner.socket.linger()
401    }
402
403    /// Sets a linger duration of zero on this socket by setting the `SO_LINGER`
404    /// option.
405    pub fn set_zero_linger(&self) -> io::Result<()> {
406        self.inner.socket.set_linger(Some(Duration::ZERO))
407    }
408
409    /// Gets the value of the `IP_TTL` option for this socket.
410    ///
411    /// For more information about this option, see [`set_ttl_v4`].
412    ///
413    /// [`set_ttl_v4`]: TcpStream::set_ttl_v4
414    pub fn ttl_v4(&self) -> io::Result<u32> {
415        self.inner.socket.ttl_v4()
416    }
417
418    /// Sets the value for the `IP_TTL` option on this socket.
419    ///
420    /// This value sets the time-to-live field that is used in every packet sent
421    /// from this socket.
422    pub fn set_ttl_v4(&self, ttl: u32) -> io::Result<()> {
423        self.inner.socket.set_ttl_v4(ttl)
424    }
425
426    /// Sends out-of-band data on this socket.
427    ///
428    /// Out-of-band data is sent with the `MSG_OOB` flag.
429    pub async fn send_out_of_band<T: IoBuf>(&self, buf: T) -> BufResult<usize, T> {
430        #[cfg(unix)]
431        use libc::MSG_OOB;
432        #[cfg(windows)]
433        use windows_sys::Win32::Networking::WinSock::MSG_OOB;
434
435        self.inner
436            .send(
437                buf,
438                SendFlags::from_bits_retain(MSG_OOB as _) | MSG_NOSIGNAL,
439            )
440            .await
441    }
442}
443
444impl AsyncRead for TcpStream {
445    #[inline]
446    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
447        (&*self).read(buf).await
448    }
449
450    #[inline]
451    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
452        (&*self).read_vectored(buf).await
453    }
454}
455
456impl AsyncRead for &TcpStream {
457    #[inline]
458    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
459        self.inner.recv(buf, RecvFlags::empty()).await
460    }
461
462    #[inline]
463    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
464        self.inner.recv_vectored(buf, RecvFlags::empty()).await
465    }
466}
467
468impl AsyncReadManaged for TcpStream {
469    type Buffer = BufferRef;
470
471    async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
472        (&*self).read_managed(len).await
473    }
474}
475
476impl AsyncReadManaged for &TcpStream {
477    type Buffer = BufferRef;
478
479    async fn read_managed(&mut self, len: usize) -> io::Result<Option<Self::Buffer>> {
480        self.inner.recv_managed(len, RecvFlags::empty()).await
481    }
482}
483
484impl AsyncReadMulti for TcpStream {
485    fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
486        self.inner.recv_multi(len, RecvFlags::empty())
487    }
488}
489
490impl AsyncReadMulti for &TcpStream {
491    fn read_multi(&mut self, len: usize) -> impl Stream<Item = io::Result<Self::Buffer>> {
492        self.inner.recv_multi(len, RecvFlags::empty())
493    }
494}
495
496impl AsyncReadAncillary for TcpStream {
497    #[inline]
498    async fn read_with_ancillary<T: IoBufMut, C: IoBufMut>(
499        &mut self,
500        buffer: T,
501        control: C,
502    ) -> BufResult<(usize, usize), (T, C)> {
503        (&*self).read_with_ancillary(buffer, control).await
504    }
505
506    #[inline]
507    async fn read_vectored_with_ancillary<T: IoVectoredBufMut, C: IoBufMut>(
508        &mut self,
509        buffer: T,
510        control: C,
511    ) -> BufResult<(usize, usize), (T, C)> {
512        (&*self).read_vectored_with_ancillary(buffer, control).await
513    }
514}
515
516impl AsyncReadAncillary for &TcpStream {
517    #[inline]
518    async fn read_with_ancillary<T: IoBufMut, C: IoBufMut>(
519        &mut self,
520        buffer: T,
521        control: C,
522    ) -> BufResult<(usize, usize), (T, C)> {
523        self.inner
524            .recv_msg(buffer, control, RecvFlags::empty())
525            .await
526            .map_res(|(res, len, _addr)| (res, len))
527    }
528
529    #[inline]
530    async fn read_vectored_with_ancillary<T: IoVectoredBufMut, C: IoBufMut>(
531        &mut self,
532        buffer: T,
533        control: C,
534    ) -> BufResult<(usize, usize), (T, C)> {
535        self.inner
536            .recv_msg_vectored(buffer, control, RecvFlags::empty())
537            .await
538            .map_res(|(res, len, _addr)| (res, len))
539    }
540}
541
542impl AsyncReadAncillaryManaged for TcpStream {
543    #[inline]
544    async fn read_managed_with_ancillary<C: IoBufMut>(
545        &mut self,
546        len: usize,
547        control: C,
548    ) -> io::Result<Option<(Self::Buffer, C)>> {
549        (&*self).read_managed_with_ancillary(len, control).await
550    }
551}
552
553impl AsyncReadAncillaryManaged for &TcpStream {
554    #[inline]
555    async fn read_managed_with_ancillary<C: IoBufMut>(
556        &mut self,
557        len: usize,
558        control: C,
559    ) -> io::Result<Option<(Self::Buffer, C)>> {
560        self.inner
561            .recv_msg_managed(len, control, RecvFlags::empty())
562            .await
563            .map(|res| res.map(|(res, len, _addr)| (res, len)))
564    }
565}
566
567impl AsyncReadAncillaryMulti for TcpStream {
568    type Return = RecvMsgMultiResult;
569
570    #[inline]
571    fn read_multi_with_ancillary(
572        &mut self,
573        control_len: usize,
574    ) -> impl Stream<Item = io::Result<Self::Return>> {
575        self.inner.recv_msg_multi(control_len, RecvFlags::empty())
576    }
577}
578
579impl AsyncReadAncillaryMulti for &TcpStream {
580    type Return = RecvMsgMultiResult;
581
582    #[inline]
583    fn read_multi_with_ancillary(
584        &mut self,
585        control_len: usize,
586    ) -> impl Stream<Item = io::Result<Self::Return>> {
587        self.inner.recv_msg_multi(control_len, RecvFlags::empty())
588    }
589}
590
591impl AsyncWrite for TcpStream {
592    #[inline]
593    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
594        (&*self).write(buf).await
595    }
596
597    #[inline]
598    async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
599        (&*self).write_vectored(buf).await
600    }
601
602    #[inline]
603    async fn flush(&mut self) -> io::Result<()> {
604        (&*self).flush().await
605    }
606
607    #[inline]
608    async fn shutdown(&mut self) -> io::Result<()> {
609        (&*self).shutdown().await
610    }
611}
612
613impl AsyncWrite for &TcpStream {
614    #[inline]
615    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
616        self.inner.send(buf, MSG_NOSIGNAL).await
617    }
618
619    #[inline]
620    async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
621        self.inner.send_vectored(buf, MSG_NOSIGNAL).await
622    }
623
624    #[inline]
625    async fn flush(&mut self) -> io::Result<()> {
626        Ok(())
627    }
628
629    #[inline]
630    async fn shutdown(&mut self) -> io::Result<()> {
631        self.inner.shutdown().await
632    }
633}
634
635impl AsyncWriteAncillary for TcpStream {
636    #[inline]
637    async fn write_with_ancillary<T: IoBuf, C: IoBuf>(
638        &mut self,
639        buffer: T,
640        control: C,
641    ) -> BufResult<usize, (T, C)> {
642        (&*self).write_with_ancillary(buffer, control).await
643    }
644
645    #[inline]
646    async fn write_vectored_with_ancillary<T: IoVectoredBuf, C: IoBuf>(
647        &mut self,
648        buffer: T,
649        control: C,
650    ) -> BufResult<usize, (T, C)> {
651        (&*self)
652            .write_vectored_with_ancillary(buffer, control)
653            .await
654    }
655}
656
657impl AsyncWriteAncillary for &TcpStream {
658    #[inline]
659    async fn write_with_ancillary<T: IoBuf, C: IoBuf>(
660        &mut self,
661        buffer: T,
662        control: C,
663    ) -> BufResult<usize, (T, C)> {
664        self.inner
665            .send_msg(buffer, control, None, MSG_NOSIGNAL)
666            .await
667    }
668
669    #[inline]
670    async fn write_vectored_with_ancillary<T: IoVectoredBuf, C: IoBuf>(
671        &mut self,
672        buffer: T,
673        control: C,
674    ) -> BufResult<usize, (T, C)> {
675        self.inner
676            .send_msg_vectored(buffer, control, None, MSG_NOSIGNAL)
677            .await
678    }
679}
680
681impl AsyncWriteZerocopy for TcpStream {
682    type BufferReadyFuture<T: IoBuf> = Zerocopy<SendZc<T, SharedFd<Socket2>>>;
683    type VectoredBufferReadyFuture<T: IoVectoredBuf> =
684        Zerocopy<SendVectoredZc<T, SharedFd<Socket2>>>;
685
686    async fn write_zerocopy<T: IoBuf>(
687        &mut self,
688        buf: T,
689    ) -> BufResult<usize, Self::BufferReadyFuture<T>> {
690        self.inner.send_zerocopy(buf, MSG_NOSIGNAL).await
691    }
692
693    async fn write_zerocopy_vectored<T: IoVectoredBuf>(
694        &mut self,
695        buf: T,
696    ) -> BufResult<usize, Self::VectoredBufferReadyFuture<T>> {
697        self.inner.send_zerocopy_vectored(buf, MSG_NOSIGNAL).await
698    }
699}
700
701impl AsyncWriteZerocopy for &TcpStream {
702    type BufferReadyFuture<T: IoBuf> = Zerocopy<SendZc<T, SharedFd<Socket2>>>;
703    type VectoredBufferReadyFuture<T: IoVectoredBuf> =
704        Zerocopy<SendVectoredZc<T, SharedFd<Socket2>>>;
705
706    async fn write_zerocopy<T: IoBuf>(
707        &mut self,
708        buf: T,
709    ) -> BufResult<usize, Self::BufferReadyFuture<T>> {
710        self.inner.send_zerocopy(buf, MSG_NOSIGNAL).await
711    }
712
713    async fn write_zerocopy_vectored<T: IoVectoredBuf>(
714        &mut self,
715        buf: T,
716    ) -> BufResult<usize, Self::VectoredBufferReadyFuture<T>> {
717        self.inner.send_zerocopy_vectored(buf, MSG_NOSIGNAL).await
718    }
719}
720
721impl AsyncWriteAncillaryZerocopy for TcpStream {
722    type BufferReadyFuture<T: IoBuf, C: IoBuf> =
723        Extract<Zerocopy<SendMsgZc<[T; 1], C, SharedFd<Socket2>>>, T, C>;
724    type VectoredBufferReadyFuture<T: IoVectoredBuf, C: IoBuf> =
725        Zerocopy<SendMsgZc<T, C, SharedFd<Socket2>>>;
726
727    async fn write_zerocopy_with_ancillary<T: IoBuf, C: IoBuf>(
728        &mut self,
729        buf: T,
730        control: C,
731    ) -> BufResult<usize, Self::BufferReadyFuture<T, C>> {
732        self.inner
733            .send_msg_zerocopy(buf, control, None, MSG_NOSIGNAL)
734            .await
735    }
736
737    async fn write_zerocopy_vectored_with_ancillary<T: IoVectoredBuf, C: IoBuf>(
738        &mut self,
739        buf: T,
740        control: C,
741    ) -> BufResult<usize, Self::VectoredBufferReadyFuture<T, C>> {
742        self.inner
743            .send_msg_zerocopy_vectored(buf, control, None, MSG_NOSIGNAL)
744            .await
745    }
746}
747
748impl AsyncWriteAncillaryZerocopy for &TcpStream {
749    type BufferReadyFuture<T: IoBuf, C: IoBuf> =
750        Extract<Zerocopy<SendMsgZc<[T; 1], C, SharedFd<Socket2>>>, T, C>;
751    type VectoredBufferReadyFuture<T: IoVectoredBuf, C: IoBuf> =
752        Zerocopy<SendMsgZc<T, C, SharedFd<Socket2>>>;
753
754    async fn write_zerocopy_with_ancillary<T: IoBuf, C: IoBuf>(
755        &mut self,
756        buf: T,
757        control: C,
758    ) -> BufResult<usize, Self::BufferReadyFuture<T, C>> {
759        self.inner
760            .send_msg_zerocopy(buf, control, None, MSG_NOSIGNAL)
761            .await
762    }
763
764    async fn write_zerocopy_vectored_with_ancillary<T: IoVectoredBuf, C: IoBuf>(
765        &mut self,
766        buf: T,
767        control: C,
768    ) -> BufResult<usize, Self::VectoredBufferReadyFuture<T, C>> {
769        self.inner
770            .send_msg_zerocopy_vectored(buf, control, None, MSG_NOSIGNAL)
771            .await
772    }
773}
774
775impl Splittable for TcpStream {
776    type ReadHalf = Self;
777    type WriteHalf = Self;
778
779    fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
780        self.into_split()
781    }
782}
783
784impl<'a> Splittable for &'a TcpStream {
785    type ReadHalf = ReadHalf<'a, TcpStream>;
786    type WriteHalf = WriteHalf<'a, TcpStream>;
787
788    fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
789        crate::split(self)
790    }
791}
792
793impl<'a> Splittable for &'a mut TcpStream {
794    type ReadHalf = ReadHalf<'a, TcpStream>;
795    type WriteHalf = WriteHalf<'a, TcpStream>;
796
797    fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
798        crate::split(self)
799    }
800}
801
802impl_raw_fd!(TcpStream, socket2::Socket, inner, socket);
803
804/// A TCP socket that has not yet been converted to a [`TcpStream`] or
805/// [`TcpListener`].
806#[derive(Debug)]
807pub struct TcpSocket {
808    inner: Socket,
809}
810
811impl TcpSocket {
812    /// Creates a new socket configured for IPv4.
813    pub async fn new_v4() -> io::Result<TcpSocket> {
814        TcpSocket::new(socket2::Domain::IPV4).await
815    }
816
817    /// Creates a new socket configured for IPv6.
818    pub async fn new_v6() -> io::Result<TcpSocket> {
819        TcpSocket::new(socket2::Domain::IPV6).await
820    }
821
822    async fn new(domain: socket2::Domain) -> io::Result<TcpSocket> {
823        let inner =
824            Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).await?;
825        Ok(TcpSocket { inner })
826    }
827
828    /// Sets value for the `SO_KEEPALIVE` option on this socket.
829    pub fn set_keepalive(&self, keepalive: bool) -> io::Result<()> {
830        self.inner.socket.set_keepalive(keepalive)
831    }
832
833    /// Gets the value of the `SO_KEEPALIVE` option on this socket.
834    pub fn keepalive(&self) -> io::Result<bool> {
835        self.inner.socket.keepalive()
836    }
837
838    /// Allows the socket to bind to an in-use address.
839    pub fn set_reuseaddr(&self, reuseaddr: bool) -> io::Result<()> {
840        self.inner.socket.set_reuse_address(reuseaddr)
841    }
842
843    /// Retrieves the value set for `SO_REUSEADDR` on this socket.
844    pub fn reuseaddr(&self) -> io::Result<bool> {
845        self.inner.socket.reuse_address()
846    }
847
848    /// Allows the socket to bind to an in-use port. Only available for
849    /// supported unix systems.
850    #[cfg(all(
851        unix,
852        not(target_os = "solaris"),
853        not(target_os = "illumos"),
854        not(target_os = "cygwin"),
855    ))]
856    pub fn set_reuseport(&self, reuseport: bool) -> io::Result<()> {
857        self.inner.socket.set_reuse_port(reuseport)
858    }
859
860    /// Allows the socket to bind to an in-use port. Only available for
861    /// supported unix systems.
862    #[cfg(all(
863        unix,
864        not(target_os = "solaris"),
865        not(target_os = "illumos"),
866        not(target_os = "cygwin"),
867    ))]
868    pub fn reuseport(&self) -> io::Result<bool> {
869        self.inner.socket.reuse_port()
870    }
871
872    /// Sets the size of the TCP send buffer on this socket.
873    ///
874    /// On most operating systems, this sets the `SO_SNDBUF` socket option.
875    pub fn set_send_buffer_size(&self, size: u32) -> io::Result<()> {
876        self.inner.socket.set_send_buffer_size(size as usize)
877    }
878
879    /// Returns the size of the TCP send buffer for this socket.
880    ///
881    /// On most operating systems, this is the value of the `SO_SNDBUF` socket
882    /// option.
883    pub fn send_buffer_size(&self) -> io::Result<u32> {
884        self.inner.socket.send_buffer_size().map(|n| n as u32)
885    }
886
887    /// Sets the size of the TCP receive buffer on this socket.
888    ///
889    /// On most operating systems, this sets the `SO_RCVBUF` socket option.
890    pub fn set_recv_buffer_size(&self, size: u32) -> io::Result<()> {
891        self.inner.socket.set_recv_buffer_size(size as usize)
892    }
893
894    /// Returns the size of the TCP receive buffer for this socket.
895    ///
896    /// On most operating systems, this is the value of the `SO_RCVBUF` socket
897    /// option.
898    pub fn recv_buffer_size(&self) -> io::Result<u32> {
899        self.inner.socket.recv_buffer_size().map(|n| n as u32)
900    }
901
902    /// Sets a linger duration of zero on this socket by setting the `SO_LINGER`
903    /// option.
904    pub fn set_zero_linger(&self) -> io::Result<()> {
905        self.inner.socket.set_linger(Some(Duration::ZERO))
906    }
907
908    /// Reads the linger duration for this socket by getting the `SO_LINGER`
909    /// option.
910    pub fn linger(&self) -> io::Result<Option<Duration>> {
911        self.inner.socket.linger()
912    }
913
914    /// Sets the value of the `TCP_NODELAY` option on this socket.
915    ///
916    /// If set, this option disables the Nagle algorithm. This means that
917    /// segments are always sent as soon as possible, even if there is only
918    /// a small amount of data. When not set, data is buffered until there
919    /// is a sufficient amount to send out, thereby avoiding the frequent
920    /// sending of small packets.
921    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
922        self.inner.socket.set_tcp_nodelay(nodelay)
923    }
924
925    /// Gets the value of the `TCP_NODELAY` option on this socket.
926    ///
927    /// For more information about this option, see [`set_nodelay`].
928    ///
929    /// [`set_nodelay`]: TcpSocket::set_nodelay
930    pub fn nodelay(&self) -> io::Result<bool> {
931        self.inner.socket.tcp_nodelay()
932    }
933
934    /// Gets the value of the `IPV6_TCLASS` option for this socket.
935    ///
936    /// For more information about this option, see [`set_tclass_v6`].
937    ///
938    /// [`set_tclass_v6`]: Self::set_tclass_v6
939    #[cfg(any(
940        target_os = "android",
941        target_os = "dragonfly",
942        target_os = "freebsd",
943        target_os = "fuchsia",
944        target_os = "linux",
945        target_os = "macos",
946        target_os = "netbsd",
947        target_os = "openbsd",
948        target_os = "cygwin",
949    ))]
950    pub fn tclass_v6(&self) -> io::Result<u32> {
951        self.inner.socket.tclass_v6()
952    }
953
954    /// Sets the value for the `IPV6_TCLASS` option on this socket.
955    ///
956    /// Specifies the traffic class field that is used in every packet
957    /// sent from this socket.
958    ///
959    /// # Note
960    ///
961    /// This may not have any effect on IPv4 sockets.
962    #[cfg(any(
963        target_os = "android",
964        target_os = "dragonfly",
965        target_os = "freebsd",
966        target_os = "fuchsia",
967        target_os = "linux",
968        target_os = "macos",
969        target_os = "netbsd",
970        target_os = "openbsd",
971        target_os = "cygwin",
972    ))]
973    pub fn set_tclass_v6(&self, tclass: u32) -> io::Result<()> {
974        self.inner.socket.set_tclass_v6(tclass)
975    }
976
977    /// Gets the value of the `IP_TOS` option for this socket.
978    ///
979    /// For more information about this option, see [`set_tos_v4`].
980    ///
981    /// [`set_tos_v4`]: Self::set_tos_v4
982    #[cfg(not(any(
983        target_os = "fuchsia",
984        target_os = "redox",
985        target_os = "solaris",
986        target_os = "illumos",
987        target_os = "haiku"
988    )))]
989    pub fn tos_v4(&self) -> io::Result<u32> {
990        self.inner.socket.tos_v4()
991    }
992
993    /// Sets the value for the `IP_TOS` option on this socket.
994    ///
995    /// This value sets the type-of-service field that is used in every packet
996    /// sent from this socket.
997    ///
998    /// # Note
999    ///
1000    /// - This may not have any effect on IPv6 sockets.
1001    /// - On Windows, `IP_TOS` is only supported on [Windows 8+ or
1002    ///   Windows Server 2012+.](https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options)
1003    #[cfg(not(any(
1004        target_os = "fuchsia",
1005        target_os = "redox",
1006        target_os = "solaris",
1007        target_os = "illumos",
1008        target_os = "haiku"
1009    )))]
1010    pub fn set_tos_v4(&self, tos: u32) -> io::Result<()> {
1011        self.inner.socket.set_tos_v4(tos)
1012    }
1013
1014    /// Gets the value for the `SO_BINDTODEVICE` option on this socket
1015    ///
1016    /// Returns the interface name of the device to which this socket is bound.
1017    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux",))]
1018    pub fn device(&self) -> io::Result<Option<Vec<u8>>> {
1019        self.inner.socket.device()
1020    }
1021
1022    /// Sets the value for the `SO_BINDTODEVICE` option on this socket
1023    ///
1024    /// If a socket is bound to an interface, only packets received from that
1025    /// particular interface are processed by the socket. Note that this only
1026    /// works for some socket types, particularly `AF_INET` sockets.
1027    ///
1028    /// If `interface` is `None` or an empty string it removes the binding.
1029    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
1030    pub fn bind_device(&self, interface: Option<&[u8]>) -> io::Result<()> {
1031        self.inner.socket.bind_device(interface)
1032    }
1033
1034    /// Gets the local address of this socket.
1035    pub fn local_addr(&self) -> io::Result<SocketAddr> {
1036        Ok(self
1037            .inner
1038            .local_addr()?
1039            .as_socket()
1040            .expect("should be SocketAddr"))
1041    }
1042
1043    /// Returns the value of the `SO_ERROR` option.
1044    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1045        self.inner.socket.take_error()
1046    }
1047
1048    /// Binds the socket to the given address.
1049    pub async fn bind(&self, addr: SocketAddr) -> io::Result<()> {
1050        self.inner.bind(&addr.into()).await
1051    }
1052
1053    /// Establishes a TCP connection with a peer at the specified socket
1054    /// address.
1055    ///
1056    /// The [`TcpSocket`] is consumed. Once the connection is established, a
1057    /// connected [`TcpStream`] is returned. If the connection fails, the
1058    /// encountered error is returned.
1059    ///
1060    /// On Windows, the socket should be bound to an address before connecting.
1061    pub async fn connect(self, addr: SocketAddr) -> io::Result<TcpStream> {
1062        self.inner.connect_async(&addr.into()).await?;
1063        Ok(TcpStream { inner: self.inner })
1064    }
1065
1066    /// Converts the socket into a `TcpListener`.
1067    ///
1068    /// `backlog` defines the maximum number of pending connections that are
1069    /// queued by the operating system at any given time. Connections are
1070    /// removed from the queue with [`TcpListener::accept`]. When the queue
1071    /// is full, the operating system will start rejecting connections.
1072    pub async fn listen(self, backlog: i32) -> io::Result<TcpListener> {
1073        self.inner.listen(backlog).await?;
1074        Ok(TcpListener { inner: self.inner })
1075    }
1076
1077    /// Converts a [`std::net::TcpStream`] into a [`TcpSocket`]. The provided
1078    /// socket must not have been connected prior to calling this function. This
1079    /// function is typically used together with crates such as [`socket2`] to
1080    /// configure socket options that are not available on [`TcpSocket`].
1081    pub fn from_std_stream(stream: std::net::TcpStream) -> io::Result<TcpSocket> {
1082        Ok(Self {
1083            inner: Socket::from_socket2(Socket2::from(stream))?,
1084        })
1085    }
1086}
1087
1088impl_raw_fd!(TcpSocket, socket2::Socket, inner, socket);