Skip to main content

compio_net/
udp.rs

1use std::{future::Future, io, net::SocketAddr};
2
3use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
4use compio_driver::impl_raw_fd;
5use compio_runtime::{BorrowedBuffer, BufferPool};
6use socket2::{Protocol, SockAddr, Socket as Socket2, Type};
7
8use crate::{Socket, SocketOpts, ToSocketAddrsAsync};
9
10/// A UDP socket.
11///
12/// UDP is "connectionless", unlike TCP. Meaning, regardless of what address
13/// you've bound to, a `UdpSocket` is free to communicate with many different
14/// remotes. There are basically two main ways to use `UdpSocket`:
15///
16/// * one to many: [`bind`](`UdpSocket::bind`) and use
17///   [`send_to`](`UdpSocket::send_to`) and
18///   [`recv_from`](`UdpSocket::recv_from`) to communicate with many different
19///   addresses
20/// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single
21///   address, using [`send`](`UdpSocket::send`) and [`recv`](`UdpSocket::recv`)
22///   to communicate only with that remote address
23///
24/// # Examples
25/// Bind and connect a pair of sockets and send a packet:
26///
27/// ```
28/// use std::net::SocketAddr;
29///
30/// use compio_net::UdpSocket;
31///
32/// # compio_runtime::Runtime::new().unwrap().block_on(async {
33/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
34/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
35///
36/// // bind sockets
37/// let mut socket = UdpSocket::bind(first_addr).await.unwrap();
38/// let first_addr = socket.local_addr().unwrap();
39/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap();
40/// let second_addr = other_socket.local_addr().unwrap();
41///
42/// // connect sockets
43/// socket.connect(second_addr).await.unwrap();
44/// other_socket.connect(first_addr).await.unwrap();
45///
46/// let buf = Vec::with_capacity(12);
47///
48/// // write data
49/// socket.send("Hello world!").await.unwrap();
50///
51/// // read data
52/// let (n_bytes, buf) = other_socket.recv(buf).await.unwrap();
53///
54/// assert_eq!(n_bytes, buf.len());
55/// assert_eq!(buf, b"Hello world!");
56/// # });
57/// ```
58/// Send and receive packets without connecting:
59///
60/// ```
61/// use std::net::SocketAddr;
62///
63/// use compio_net::UdpSocket;
64/// use socket2::SockAddr;
65///
66/// # compio_runtime::Runtime::new().unwrap().block_on(async {
67/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
68/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
69///
70/// // bind sockets
71/// let mut socket = UdpSocket::bind(first_addr).await.unwrap();
72/// let first_addr = socket.local_addr().unwrap();
73/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap();
74/// let second_addr = other_socket.local_addr().unwrap();
75///
76/// let buf = Vec::with_capacity(32);
77///
78/// // write data
79/// socket.send_to("hello world", second_addr).await.unwrap();
80///
81/// // read data
82/// let ((n_bytes, addr), buf) = other_socket.recv_from(buf).await.unwrap();
83///
84/// assert_eq!(addr, first_addr);
85/// assert_eq!(n_bytes, buf.len());
86/// assert_eq!(buf, b"hello world");
87/// # });
88/// ```
89#[derive(Debug, Clone)]
90pub struct UdpSocket {
91    inner: Socket,
92}
93
94impl UdpSocket {
95    /// Creates a new UDP socket and attempt to bind it to the addr provided.
96    pub async fn bind(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
97        Self::bind_with_options(addr, &SocketOpts::default()).await
98    }
99
100    /// Creates a new UDP socket with [`SocketOpts`] and attempt to bind it to
101    /// the addr provided.
102    pub async fn bind_with_options(
103        addr: impl ToSocketAddrsAsync,
104        opts: &SocketOpts,
105    ) -> io::Result<Self> {
106        super::each_addr(addr, |addr| async move {
107            let socket =
108                Socket::bind(&SockAddr::from(addr), Type::DGRAM, Some(Protocol::UDP)).await?;
109            opts.setup_socket(&socket)?;
110            Ok(Self { inner: socket })
111        })
112        .await
113    }
114
115    /// Connects this UDP socket to a remote address, allowing the `send` and
116    /// `recv` to be used to send data and also applies filters to only
117    /// receive data from the specified address.
118    ///
119    /// Note that usually, a successful `connect` call does not specify
120    /// that there is a remote server listening on the port, rather, such an
121    /// error would only be detected after the first send.
122    pub async fn connect(&self, addr: impl ToSocketAddrsAsync) -> io::Result<()> {
123        super::each_addr(addr, |addr| async move {
124            self.inner.connect(&SockAddr::from(addr))
125        })
126        .await
127    }
128
129    /// Creates new UdpSocket from a std::net::UdpSocket.
130    pub fn from_std(socket: std::net::UdpSocket) -> io::Result<Self> {
131        Ok(Self {
132            inner: Socket::from_socket2(Socket2::from(socket))?,
133        })
134    }
135
136    /// Close the socket. If the returned future is dropped before polling, the
137    /// socket won't be closed.
138    pub fn close(self) -> impl Future<Output = io::Result<()>> {
139        self.inner.close()
140    }
141
142    /// Returns the socket address of the remote peer this socket was connected
143    /// to.
144    ///
145    /// # Examples
146    ///
147    /// ```no_run
148    /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
149    ///
150    /// use compio_net::UdpSocket;
151    /// use socket2::SockAddr;
152    ///
153    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
154    /// let socket = UdpSocket::bind("127.0.0.1:34254")
155    ///     .await
156    ///     .expect("couldn't bind to address");
157    /// socket
158    ///     .connect("192.168.0.1:41203")
159    ///     .await
160    ///     .expect("couldn't connect to address");
161    /// assert_eq!(
162    ///     socket.peer_addr().unwrap(),
163    ///     SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 0, 1), 41203))
164    /// );
165    /// # });
166    /// ```
167    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
168        self.inner
169            .peer_addr()
170            .map(|addr| addr.as_socket().expect("should be SocketAddr"))
171    }
172
173    /// Returns the local address that this socket is bound to.
174    ///
175    /// # Example
176    ///
177    /// ```
178    /// use std::net::SocketAddr;
179    ///
180    /// use compio_net::UdpSocket;
181    /// use socket2::SockAddr;
182    ///
183    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
184    /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
185    /// let sock = UdpSocket::bind(&addr).await.unwrap();
186    /// // the address the socket is bound to
187    /// let local_addr = sock.local_addr().unwrap();
188    /// assert_eq!(local_addr, addr);
189    /// # });
190    /// ```
191    pub fn local_addr(&self) -> io::Result<SocketAddr> {
192        self.inner
193            .local_addr()
194            .map(|addr| addr.as_socket().expect("should be SocketAddr"))
195    }
196
197    /// Receives a packet of data from the socket into the buffer, returning the
198    /// original buffer and quantity of data received.
199    pub async fn recv<T: IoBufMut>(&self, buffer: T) -> BufResult<usize, T> {
200        self.inner.recv(buffer, 0).await
201    }
202
203    /// Receives a packet of data from the socket into the buffer, returning the
204    /// original buffer and quantity of data received.
205    pub async fn recv_vectored<T: IoVectoredBufMut>(&self, buffer: T) -> BufResult<usize, T> {
206        self.inner.recv_vectored(buffer, 0).await
207    }
208
209    /// Read some bytes from this source with [`BufferPool`] and return
210    /// a [`BorrowedBuffer`].
211    ///
212    /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len,
213    /// if `len` > 0, `min(len, inner buffer size)` will be the read max len
214    pub async fn recv_managed<'a>(
215        &self,
216        buffer_pool: &'a BufferPool,
217        len: usize,
218    ) -> io::Result<BorrowedBuffer<'a>> {
219        self.inner.recv_managed(buffer_pool, len, 0).await
220    }
221
222    /// Read some bytes from this source with [`BufferPool`] and return
223    /// a [`BorrowedBuffer`] with the sender address.
224    ///
225    /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len,
226    /// if `len` > 0, `min(len, inner buffer size)` will be the read max len
227    pub async fn recv_from_managed<'a>(
228        &self,
229        buffer_pool: &'a BufferPool,
230        len: usize,
231    ) -> io::Result<(BorrowedBuffer<'a>, SocketAddr)> {
232        self.inner
233            .recv_from_managed(buffer_pool, len, 0)
234            .await
235            .map(|(buffer, addr)| {
236                let addr = addr
237                    .expect("should have addr")
238                    .as_socket()
239                    .expect("should be SocketAddr");
240                (buffer, addr)
241            })
242    }
243
244    /// Sends some data to the socket from the buffer, returning the original
245    /// buffer and quantity of data sent.
246    pub async fn send<T: IoBuf>(&self, buffer: T) -> BufResult<usize, T> {
247        self.inner.send(buffer, 0).await
248    }
249
250    /// Sends some data to the socket from the buffer, returning the original
251    /// buffer and quantity of data sent.
252    pub async fn send_vectored<T: IoVectoredBuf>(&self, buffer: T) -> BufResult<usize, T> {
253        self.inner.send_vectored(buffer, 0).await
254    }
255
256    /// Receives a single datagram message on the socket. On success, returns
257    /// the number of bytes received and the origin.
258    pub async fn recv_from<T: IoBufMut>(&self, buffer: T) -> BufResult<(usize, SocketAddr), T> {
259        self.inner.recv_from(buffer, 0).await.map_res(|(n, addr)| {
260            let addr = addr
261                .expect("should have addr")
262                .as_socket()
263                .expect("should be SocketAddr");
264            (n, addr)
265        })
266    }
267
268    /// Receives a single datagram message on the socket. On success, returns
269    /// the number of bytes received and the origin.
270    pub async fn recv_from_vectored<T: IoVectoredBufMut>(
271        &self,
272        buffer: T,
273    ) -> BufResult<(usize, SocketAddr), T> {
274        self.inner
275            .recv_from_vectored(buffer, 0)
276            .await
277            .map_res(|(n, addr)| {
278                let addr = addr
279                    .expect("should have addr")
280                    .as_socket()
281                    .expect("should be SocketAddr");
282                (n, addr)
283            })
284    }
285
286    /// Receives a single datagram message and ancillary data on the socket. On
287    /// success, returns the number of bytes received and the origin.
288    pub async fn recv_msg<T: IoBufMut, C: IoBufMut>(
289        &self,
290        buffer: T,
291        control: C,
292    ) -> BufResult<(usize, usize, SocketAddr), (T, C)> {
293        self.inner
294            .recv_msg(buffer, control, 0)
295            .await
296            .map_res(|(n, m, addr)| {
297                let addr = addr
298                    .expect("should have addr")
299                    .as_socket()
300                    .expect("should be SocketAddr");
301                (n, m, addr)
302            })
303    }
304
305    /// Receives a single datagram message and ancillary data on the socket. On
306    /// success, returns the number of bytes received and the origin.
307    pub async fn recv_msg_vectored<T: IoVectoredBufMut, C: IoBufMut>(
308        &self,
309        buffer: T,
310        control: C,
311    ) -> BufResult<(usize, usize, SocketAddr), (T, C)> {
312        self.inner
313            .recv_msg_vectored(buffer, control, 0)
314            .await
315            .map_res(|(n, m, addr)| {
316                let addr = addr
317                    .expect("should have addr")
318                    .as_socket()
319                    .expect("should be SocketAddr");
320                (n, m, addr)
321            })
322    }
323
324    /// Sends data on the socket to the given address. On success, returns the
325    /// number of bytes sent.
326    pub async fn send_to<T: IoBuf>(
327        &self,
328        buffer: T,
329        addr: impl ToSocketAddrsAsync,
330    ) -> BufResult<usize, T> {
331        super::first_addr_buf(addr, buffer, |addr, buffer| async move {
332            self.inner.send_to(buffer, &SockAddr::from(addr), 0).await
333        })
334        .await
335    }
336
337    /// Sends data on the socket to the given address. On success, returns the
338    /// number of bytes sent.
339    pub async fn send_to_vectored<T: IoVectoredBuf>(
340        &self,
341        buffer: T,
342        addr: impl ToSocketAddrsAsync,
343    ) -> BufResult<usize, T> {
344        super::first_addr_buf(addr, buffer, |addr, buffer| async move {
345            self.inner
346                .send_to_vectored(buffer, &SockAddr::from(addr), 0)
347                .await
348        })
349        .await
350    }
351
352    /// Sends data on the socket to the given address accompanied by ancillary
353    /// data. On success, returns the number of bytes sent.
354    pub async fn send_msg<T: IoBuf, C: IoBuf>(
355        &self,
356        buffer: T,
357        control: C,
358        addr: impl ToSocketAddrsAsync,
359    ) -> BufResult<usize, (T, C)> {
360        super::first_addr_buf(
361            addr,
362            (buffer, control),
363            |addr, (buffer, control)| async move {
364                self.inner
365                    .send_msg(buffer, control, Some(&SockAddr::from(addr)), 0)
366                    .await
367            },
368        )
369        .await
370    }
371
372    /// Sends data on the socket to the given address accompanied by ancillary
373    /// data. On success, returns the number of bytes sent.
374    pub async fn send_msg_vectored<T: IoVectoredBuf, C: IoBuf>(
375        &self,
376        buffer: T,
377        control: C,
378        addr: impl ToSocketAddrsAsync,
379    ) -> BufResult<usize, (T, C)> {
380        super::first_addr_buf(
381            addr,
382            (buffer, control),
383            |addr, (buffer, control)| async move {
384                self.inner
385                    .send_msg_vectored(buffer, control, Some(&SockAddr::from(addr)), 0)
386                    .await
387            },
388        )
389        .await
390    }
391
392    /// Sends data on the socket to the given address with zero copy.
393    ///
394    /// Returns the result of send and a future that resolves to the
395    /// original buffer when the send is complete.
396    pub async fn send_to_zerocopy<A: ToSocketAddrsAsync, T: IoBuf>(
397        &self,
398        buffer: T,
399        addr: A,
400    ) -> BufResult<usize, impl Future<Output = T> + use<A, T>> {
401        super::first_addr_buf_zerocopy(addr, buffer, |addr, buffer| async move {
402            self.inner.send_to_zerocopy(buffer, &addr.into(), 0).await
403        })
404        .await
405    }
406
407    /// Sends vectored data on the socket to the given address with zero copy.
408    ///
409    /// Returns the result of send and a future that resolves to the
410    /// original buffer when the send is complete.
411    pub async fn send_to_zerocopy_vectored<A: ToSocketAddrsAsync, T: IoVectoredBuf>(
412        &self,
413        buffer: T,
414        addr: A,
415    ) -> BufResult<usize, impl Future<Output = T> + use<A, T>> {
416        super::first_addr_buf_zerocopy(addr, buffer, |addr, buffer| async move {
417            self.inner
418                .send_to_zerocopy_vectored(buffer, &addr.into(), 0)
419                .await
420        })
421        .await
422    }
423
424    /// Sends data with control message on the socket to the given address with
425    /// zero copy.
426    ///
427    /// Returns the result of send and a future that resolves to the
428    /// original buffer when the send is complete.
429    pub async fn send_msg_zerocopy<A: ToSocketAddrsAsync, T: IoBuf, C: IoBuf>(
430        &self,
431        buffer: T,
432        control: C,
433        addr: A,
434    ) -> BufResult<usize, impl Future<Output = (T, C)> + use<A, T, C>> {
435        super::first_addr_buf_zerocopy(addr, (buffer, control), |addr, (b, c)| async move {
436            self.inner
437                .send_msg_zerocopy(b, c, Some(&addr.into()), 0)
438                .await
439        })
440        .await
441    }
442
443    /// Sends vectored data with control message on the socket to the given
444    /// address with zero copy.
445    ///
446    /// Returns the result of send and a future that resolves to the
447    /// original buffer when the send is complete.
448    pub async fn send_msg_zerocopy_vectored<A: ToSocketAddrsAsync, T: IoVectoredBuf, C: IoBuf>(
449        &self,
450        buffer: T,
451        control: C,
452        addr: A,
453    ) -> BufResult<usize, impl Future<Output = (T, C)> + use<A, T, C>> {
454        super::first_addr_buf_zerocopy(addr, (buffer, control), |addr, (b, c)| async move {
455            self.inner
456                .send_msg_zerocopy_vectored(b, c, Some(&addr.into()), 0)
457                .await
458        })
459        .await
460    }
461
462    /// Gets a socket option.
463    ///
464    /// # Safety
465    ///
466    /// The caller must ensure `T` is the correct type for `level` and `name`.
467    pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
468        unsafe { self.inner.get_socket_option(level, name) }
469    }
470
471    /// Sets a socket option.
472    ///
473    /// # Safety
474    ///
475    /// The caller must ensure `T` is the correct type for `level` and `name`.
476    pub unsafe fn set_socket_option<T: Copy>(
477        &self,
478        level: i32,
479        name: i32,
480        value: &T,
481    ) -> io::Result<()> {
482        unsafe { self.inner.set_socket_option(level, name, value) }
483    }
484}
485
486impl_raw_fd!(UdpSocket, socket2::Socket, inner, socket);