1use std::{future::Future, io, net::SocketAddr};
2
3use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
4use compio_driver::impl_raw_fd;
5use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite, util::Splittable};
6use compio_runtime::{BorrowedBuffer, BufferPool};
7use socket2::{Protocol, SockAddr, Socket as Socket2, Type};
8
9use crate::{
10 OwnedReadHalf, OwnedWriteHalf, PollFd, ReadHalf, Socket, SocketOpts, ToSocketAddrsAsync,
11 WriteHalf,
12};
13
14#[derive(Debug, Clone)]
47pub struct TcpListener {
48 inner: Socket,
49}
50
51impl TcpListener {
52 pub async fn bind(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
62 Self::bind_with_options(addr, &SocketOpts::default().reuse_address(true)).await
63 }
64
65 pub async fn bind_with_options(
73 addr: impl ToSocketAddrsAsync,
74 options: &SocketOpts,
75 ) -> io::Result<Self> {
76 super::each_addr(addr, |addr| async move {
77 let sa = SockAddr::from(addr);
78 let socket = Socket::new(sa.domain(), Type::STREAM, Some(Protocol::TCP)).await?;
79 options.setup_socket(&socket)?;
80 socket.socket.bind(&sa)?;
81 socket.listen(128)?;
82 Ok(Self { inner: socket })
83 })
84 .await
85 }
86
87 pub fn from_std(stream: std::net::TcpListener) -> io::Result<Self> {
89 Ok(Self {
90 inner: Socket::from_socket2(Socket2::from(stream))?,
91 })
92 }
93
94 pub fn close(self) -> impl Future<Output = io::Result<()>> {
97 self.inner.close()
98 }
99
100 pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
106 self.accept_with_options(&SocketOpts::default()).await
107 }
108
109 pub async fn accept_with_options(
115 &self,
116 options: &SocketOpts,
117 ) -> io::Result<(TcpStream, SocketAddr)> {
118 let (socket, addr) = self.inner.accept().await?;
119 options.setup_socket(&socket)?;
120 let stream = TcpStream { inner: socket };
121 Ok((stream, addr.as_socket().expect("should be SocketAddr")))
122 }
123
124 pub fn local_addr(&self) -> io::Result<SocketAddr> {
148 self.inner
149 .local_addr()
150 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
151 }
152}
153
154impl_raw_fd!(TcpListener, socket2::Socket, inner, socket);
155
156#[derive(Debug, Clone)]
178pub struct TcpStream {
179 inner: Socket,
180}
181
182impl TcpStream {
183 pub async fn connect(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
185 Self::connect_with_options(addr, &SocketOpts::default()).await
186 }
187
188 pub async fn connect_with_options(
190 addr: impl ToSocketAddrsAsync,
191 options: &SocketOpts,
192 ) -> io::Result<Self> {
193 use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
194
195 super::each_addr(addr, |addr| async move {
196 let addr2 = SockAddr::from(addr);
197 let socket = if cfg!(windows) {
198 let bind_addr = if addr.is_ipv4() {
199 SockAddr::from(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
200 } else if addr.is_ipv6() {
201 SockAddr::from(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0))
202 } else {
203 return Err(io::Error::new(
204 io::ErrorKind::AddrNotAvailable,
205 "Unsupported address domain.",
206 ));
207 };
208 Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP)).await?
209 } else {
210 Socket::new(addr2.domain(), Type::STREAM, Some(Protocol::TCP)).await?
211 };
212 options.setup_socket(&socket)?;
213 socket.connect_async(&addr2).await?;
214 Ok(Self { inner: socket })
215 })
216 .await
217 }
218
219 pub async fn bind_and_connect(
221 bind_addr: SocketAddr,
222 addr: impl ToSocketAddrsAsync,
223 ) -> io::Result<Self> {
224 Self::bind_and_connect_with_options(bind_addr, addr, &SocketOpts::default()).await
225 }
226
227 pub async fn bind_and_connect_with_options(
230 bind_addr: SocketAddr,
231 addr: impl ToSocketAddrsAsync,
232 options: &SocketOpts,
233 ) -> io::Result<Self> {
234 super::each_addr(addr, |addr| async move {
235 let addr = SockAddr::from(addr);
236 let bind_addr = SockAddr::from(bind_addr);
237
238 let socket = Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP)).await?;
239 options.setup_socket(&socket)?;
240 socket.connect_async(&addr).await?;
241 Ok(Self { inner: socket })
242 })
243 .await
244 }
245
246 pub fn from_std(stream: std::net::TcpStream) -> io::Result<Self> {
248 Ok(Self {
249 inner: Socket::from_socket2(Socket2::from(stream))?,
250 })
251 }
252
253 pub fn close(self) -> impl Future<Output = io::Result<()>> {
256 self.inner.close()
257 }
258
259 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
261 self.inner
262 .peer_addr()
263 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
264 }
265
266 pub fn local_addr(&self) -> io::Result<SocketAddr> {
268 self.inner
269 .local_addr()
270 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
271 }
272
273 pub fn split(&self) -> (ReadHalf<'_, Self>, WriteHalf<'_, Self>) {
280 crate::split(self)
281 }
282
283 pub fn into_split(self) -> (OwnedReadHalf<Self>, OwnedWriteHalf<Self>) {
289 crate::into_split(self)
290 }
291
292 pub fn to_poll_fd(&self) -> io::Result<PollFd<Socket2>> {
294 self.inner.to_poll_fd()
295 }
296
297 pub fn into_poll_fd(self) -> io::Result<PollFd<Socket2>> {
299 self.inner.into_poll_fd()
300 }
301
302 pub fn nodelay(&self) -> io::Result<bool> {
307 self.inner.socket.tcp_nodelay()
308 }
309
310 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
318 self.inner.socket.set_tcp_nodelay(nodelay)
319 }
320
321 pub async fn send_out_of_band<T: IoBuf>(&self, buf: T) -> BufResult<usize, T> {
325 #[cfg(unix)]
326 use libc::MSG_OOB;
327 #[cfg(windows)]
328 use windows_sys::Win32::Networking::WinSock::MSG_OOB;
329
330 self.inner.send(buf, MSG_OOB).await
331 }
332}
333
334impl AsyncRead for TcpStream {
335 #[inline]
336 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
337 (&*self).read(buf).await
338 }
339
340 #[inline]
341 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
342 (&*self).read_vectored(buf).await
343 }
344}
345
346impl AsyncRead for &TcpStream {
347 #[inline]
348 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
349 self.inner.recv(buf, 0).await
350 }
351
352 #[inline]
353 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
354 self.inner.recv_vectored(buf, 0).await
355 }
356}
357
358impl AsyncReadManaged for TcpStream {
359 type Buffer<'a> = BorrowedBuffer<'a>;
360 type BufferPool = BufferPool;
361
362 async fn read_managed<'a>(
363 &mut self,
364 buffer_pool: &'a Self::BufferPool,
365 len: usize,
366 ) -> io::Result<Self::Buffer<'a>> {
367 (&*self).read_managed(buffer_pool, len).await
368 }
369}
370
371impl AsyncReadManaged for &TcpStream {
372 type Buffer<'a> = BorrowedBuffer<'a>;
373 type BufferPool = BufferPool;
374
375 async fn read_managed<'a>(
376 &mut self,
377 buffer_pool: &'a Self::BufferPool,
378 len: usize,
379 ) -> io::Result<Self::Buffer<'a>> {
380 self.inner.recv_managed(buffer_pool, len as _, 0).await
381 }
382}
383
384impl AsyncWrite for TcpStream {
385 #[inline]
386 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
387 (&*self).write(buf).await
388 }
389
390 #[inline]
391 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
392 (&*self).write_vectored(buf).await
393 }
394
395 #[inline]
396 async fn flush(&mut self) -> io::Result<()> {
397 (&*self).flush().await
398 }
399
400 #[inline]
401 async fn shutdown(&mut self) -> io::Result<()> {
402 (&*self).shutdown().await
403 }
404}
405
406impl AsyncWrite for &TcpStream {
407 #[inline]
408 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
409 self.inner.send(buf, 0).await
410 }
411
412 #[inline]
413 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
414 self.inner.send_vectored(buf, 0).await
415 }
416
417 #[inline]
418 async fn flush(&mut self) -> io::Result<()> {
419 Ok(())
420 }
421
422 #[inline]
423 async fn shutdown(&mut self) -> io::Result<()> {
424 self.inner.shutdown().await
425 }
426}
427
428impl Splittable for TcpStream {
429 type ReadHalf = OwnedReadHalf<Self>;
430 type WriteHalf = OwnedWriteHalf<Self>;
431
432 fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
433 crate::into_split(self)
434 }
435}
436
437impl<'a> Splittable for &'a TcpStream {
438 type ReadHalf = ReadHalf<'a, TcpStream>;
439 type WriteHalf = WriteHalf<'a, TcpStream>;
440
441 fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
442 crate::split(self)
443 }
444}
445
446impl_raw_fd!(TcpStream, socket2::Socket, inner, socket);