compio_net/udp.rs
1use std::{future::Future, io, net::SocketAddr};
2
3use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
4use compio_driver::impl_raw_fd;
5use compio_runtime::{BorrowedBuffer, BufferPool};
6use socket2::{Protocol, SockAddr, Socket as Socket2, Type};
7
8use crate::{Socket, SocketOpts, ToSocketAddrsAsync};
9
10/// A UDP socket.
11///
12/// UDP is "connectionless", unlike TCP. Meaning, regardless of what address
13/// you've bound to, a `UdpSocket` is free to communicate with many different
14/// remotes. There are basically two main ways to use `UdpSocket`:
15///
16/// * one to many: [`bind`](`UdpSocket::bind`) and use
17/// [`send_to`](`UdpSocket::send_to`) and
18/// [`recv_from`](`UdpSocket::recv_from`) to communicate with many different
19/// addresses
20/// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single
21/// address, using [`send`](`UdpSocket::send`) and [`recv`](`UdpSocket::recv`)
22/// to communicate only with that remote address
23///
24/// # Examples
25/// Bind and connect a pair of sockets and send a packet:
26///
27/// ```
28/// use std::net::SocketAddr;
29///
30/// use compio_net::UdpSocket;
31///
32/// # compio_runtime::Runtime::new().unwrap().block_on(async {
33/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
34/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
35///
36/// // bind sockets
37/// let mut socket = UdpSocket::bind(first_addr).await.unwrap();
38/// let first_addr = socket.local_addr().unwrap();
39/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap();
40/// let second_addr = other_socket.local_addr().unwrap();
41///
42/// // connect sockets
43/// socket.connect(second_addr).await.unwrap();
44/// other_socket.connect(first_addr).await.unwrap();
45///
46/// let buf = Vec::with_capacity(12);
47///
48/// // write data
49/// socket.send("Hello world!").await.unwrap();
50///
51/// // read data
52/// let (n_bytes, buf) = other_socket.recv(buf).await.unwrap();
53///
54/// assert_eq!(n_bytes, buf.len());
55/// assert_eq!(buf, b"Hello world!");
56/// # });
57/// ```
58/// Send and receive packets without connecting:
59///
60/// ```
61/// use std::net::SocketAddr;
62///
63/// use compio_net::UdpSocket;
64/// use socket2::SockAddr;
65///
66/// # compio_runtime::Runtime::new().unwrap().block_on(async {
67/// let first_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
68/// let second_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
69///
70/// // bind sockets
71/// let mut socket = UdpSocket::bind(first_addr).await.unwrap();
72/// let first_addr = socket.local_addr().unwrap();
73/// let mut other_socket = UdpSocket::bind(second_addr).await.unwrap();
74/// let second_addr = other_socket.local_addr().unwrap();
75///
76/// let buf = Vec::with_capacity(32);
77///
78/// // write data
79/// socket.send_to("hello world", second_addr).await.unwrap();
80///
81/// // read data
82/// let ((n_bytes, addr), buf) = other_socket.recv_from(buf).await.unwrap();
83///
84/// assert_eq!(addr, first_addr);
85/// assert_eq!(n_bytes, buf.len());
86/// assert_eq!(buf, b"hello world");
87/// # });
88/// ```
89#[derive(Debug, Clone)]
90pub struct UdpSocket {
91 inner: Socket,
92}
93
94impl UdpSocket {
95 /// Creates a new UDP socket and attempt to bind it to the addr provided.
96 pub async fn bind(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
97 Self::bind_with_options(addr, &SocketOpts::default()).await
98 }
99
100 /// Creates a new UDP socket with [`SocketOpts`] and attempt to bind it to
101 /// the addr provided.
102 pub async fn bind_with_options(
103 addr: impl ToSocketAddrsAsync,
104 opts: &SocketOpts,
105 ) -> io::Result<Self> {
106 super::each_addr(addr, |addr| async move {
107 let socket =
108 Socket::bind(&SockAddr::from(addr), Type::DGRAM, Some(Protocol::UDP)).await?;
109 opts.setup_socket(&socket)?;
110 Ok(Self { inner: socket })
111 })
112 .await
113 }
114
115 /// Connects this UDP socket to a remote address, allowing the `send` and
116 /// `recv` to be used to send data and also applies filters to only
117 /// receive data from the specified address.
118 ///
119 /// Note that usually, a successful `connect` call does not specify
120 /// that there is a remote server listening on the port, rather, such an
121 /// error would only be detected after the first send.
122 pub async fn connect(&self, addr: impl ToSocketAddrsAsync) -> io::Result<()> {
123 super::each_addr(addr, |addr| async move {
124 self.inner.connect(&SockAddr::from(addr))
125 })
126 .await
127 }
128
129 /// Creates new UdpSocket from a std::net::UdpSocket.
130 pub fn from_std(socket: std::net::UdpSocket) -> io::Result<Self> {
131 Ok(Self {
132 inner: Socket::from_socket2(Socket2::from(socket))?,
133 })
134 }
135
136 /// Close the socket. If the returned future is dropped before polling, the
137 /// socket won't be closed.
138 pub fn close(self) -> impl Future<Output = io::Result<()>> {
139 self.inner.close()
140 }
141
142 /// Returns the socket address of the remote peer this socket was connected
143 /// to.
144 ///
145 /// # Examples
146 ///
147 /// ```no_run
148 /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
149 ///
150 /// use compio_net::UdpSocket;
151 /// use socket2::SockAddr;
152 ///
153 /// # compio_runtime::Runtime::new().unwrap().block_on(async {
154 /// let socket = UdpSocket::bind("127.0.0.1:34254")
155 /// .await
156 /// .expect("couldn't bind to address");
157 /// socket
158 /// .connect("192.168.0.1:41203")
159 /// .await
160 /// .expect("couldn't connect to address");
161 /// assert_eq!(
162 /// socket.peer_addr().unwrap(),
163 /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 0, 1), 41203))
164 /// );
165 /// # });
166 /// ```
167 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
168 self.inner
169 .peer_addr()
170 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
171 }
172
173 /// Returns the local address that this socket is bound to.
174 ///
175 /// # Example
176 ///
177 /// ```
178 /// use std::net::SocketAddr;
179 ///
180 /// use compio_net::UdpSocket;
181 /// use socket2::SockAddr;
182 ///
183 /// # compio_runtime::Runtime::new().unwrap().block_on(async {
184 /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
185 /// let sock = UdpSocket::bind(&addr).await.unwrap();
186 /// // the address the socket is bound to
187 /// let local_addr = sock.local_addr().unwrap();
188 /// assert_eq!(local_addr, addr);
189 /// # });
190 /// ```
191 pub fn local_addr(&self) -> io::Result<SocketAddr> {
192 self.inner
193 .local_addr()
194 .map(|addr| addr.as_socket().expect("should be SocketAddr"))
195 }
196
197 /// Receives a packet of data from the socket into the buffer, returning the
198 /// original buffer and quantity of data received.
199 pub async fn recv<T: IoBufMut>(&self, buffer: T) -> BufResult<usize, T> {
200 self.inner.recv(buffer, 0).await
201 }
202
203 /// Receives a packet of data from the socket into the buffer, returning the
204 /// original buffer and quantity of data received.
205 pub async fn recv_vectored<T: IoVectoredBufMut>(&self, buffer: T) -> BufResult<usize, T> {
206 self.inner.recv_vectored(buffer, 0).await
207 }
208
209 /// Read some bytes from this source with [`BufferPool`] and return
210 /// a [`BorrowedBuffer`].
211 ///
212 /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len,
213 /// if `len` > 0, `min(len, inner buffer size)` will be the read max len
214 pub async fn recv_managed<'a>(
215 &self,
216 buffer_pool: &'a BufferPool,
217 len: usize,
218 ) -> io::Result<BorrowedBuffer<'a>> {
219 self.inner.recv_managed(buffer_pool, len, 0).await
220 }
221
222 /// Read some bytes from this source with [`BufferPool`] and return
223 /// a [`BorrowedBuffer`] with the sender address.
224 ///
225 /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len,
226 /// if `len` > 0, `min(len, inner buffer size)` will be the read max len
227 pub async fn recv_from_managed<'a>(
228 &self,
229 buffer_pool: &'a BufferPool,
230 len: usize,
231 ) -> io::Result<(BorrowedBuffer<'a>, SocketAddr)> {
232 self.inner
233 .recv_from_managed(buffer_pool, len, 0)
234 .await
235 .map(|(buffer, addr)| {
236 let addr = addr
237 .expect("should have addr")
238 .as_socket()
239 .expect("should be SocketAddr");
240 (buffer, addr)
241 })
242 }
243
244 /// Sends some data to the socket from the buffer, returning the original
245 /// buffer and quantity of data sent.
246 pub async fn send<T: IoBuf>(&self, buffer: T) -> BufResult<usize, T> {
247 self.inner.send(buffer, 0).await
248 }
249
250 /// Sends some data to the socket from the buffer, returning the original
251 /// buffer and quantity of data sent.
252 pub async fn send_vectored<T: IoVectoredBuf>(&self, buffer: T) -> BufResult<usize, T> {
253 self.inner.send_vectored(buffer, 0).await
254 }
255
256 /// Receives a single datagram message on the socket. On success, returns
257 /// the number of bytes received and the origin.
258 pub async fn recv_from<T: IoBufMut>(&self, buffer: T) -> BufResult<(usize, SocketAddr), T> {
259 self.inner.recv_from(buffer, 0).await.map_res(|(n, addr)| {
260 let addr = addr
261 .expect("should have addr")
262 .as_socket()
263 .expect("should be SocketAddr");
264 (n, addr)
265 })
266 }
267
268 /// Receives a single datagram message on the socket. On success, returns
269 /// the number of bytes received and the origin.
270 pub async fn recv_from_vectored<T: IoVectoredBufMut>(
271 &self,
272 buffer: T,
273 ) -> BufResult<(usize, SocketAddr), T> {
274 self.inner
275 .recv_from_vectored(buffer, 0)
276 .await
277 .map_res(|(n, addr)| {
278 let addr = addr
279 .expect("should have addr")
280 .as_socket()
281 .expect("should be SocketAddr");
282 (n, addr)
283 })
284 }
285
286 /// Receives a single datagram message and ancillary data on the socket. On
287 /// success, returns the number of bytes received and the origin.
288 pub async fn recv_msg<T: IoBufMut, C: IoBufMut>(
289 &self,
290 buffer: T,
291 control: C,
292 ) -> BufResult<(usize, usize, SocketAddr), (T, C)> {
293 self.inner
294 .recv_msg(buffer, control, 0)
295 .await
296 .map_res(|(n, m, addr)| {
297 let addr = addr
298 .expect("should have addr")
299 .as_socket()
300 .expect("should be SocketAddr");
301 (n, m, addr)
302 })
303 }
304
305 /// Receives a single datagram message and ancillary data on the socket. On
306 /// success, returns the number of bytes received and the origin.
307 pub async fn recv_msg_vectored<T: IoVectoredBufMut, C: IoBufMut>(
308 &self,
309 buffer: T,
310 control: C,
311 ) -> BufResult<(usize, usize, SocketAddr), (T, C)> {
312 self.inner
313 .recv_msg_vectored(buffer, control, 0)
314 .await
315 .map_res(|(n, m, addr)| {
316 let addr = addr
317 .expect("should have addr")
318 .as_socket()
319 .expect("should be SocketAddr");
320 (n, m, addr)
321 })
322 }
323
324 /// Sends data on the socket to the given address. On success, returns the
325 /// number of bytes sent.
326 pub async fn send_to<T: IoBuf>(
327 &self,
328 buffer: T,
329 addr: impl ToSocketAddrsAsync,
330 ) -> BufResult<usize, T> {
331 super::first_addr_buf(addr, buffer, |addr, buffer| async move {
332 self.inner.send_to(buffer, &SockAddr::from(addr), 0).await
333 })
334 .await
335 }
336
337 /// Sends data on the socket to the given address. On success, returns the
338 /// number of bytes sent.
339 pub async fn send_to_vectored<T: IoVectoredBuf>(
340 &self,
341 buffer: T,
342 addr: impl ToSocketAddrsAsync,
343 ) -> BufResult<usize, T> {
344 super::first_addr_buf(addr, buffer, |addr, buffer| async move {
345 self.inner
346 .send_to_vectored(buffer, &SockAddr::from(addr), 0)
347 .await
348 })
349 .await
350 }
351
352 /// Sends data on the socket to the given address accompanied by ancillary
353 /// data. On success, returns the number of bytes sent.
354 pub async fn send_msg<T: IoBuf, C: IoBuf>(
355 &self,
356 buffer: T,
357 control: C,
358 addr: impl ToSocketAddrsAsync,
359 ) -> BufResult<usize, (T, C)> {
360 super::first_addr_buf(
361 addr,
362 (buffer, control),
363 |addr, (buffer, control)| async move {
364 self.inner
365 .send_msg(buffer, control, Some(&SockAddr::from(addr)), 0)
366 .await
367 },
368 )
369 .await
370 }
371
372 /// Sends data on the socket to the given address accompanied by ancillary
373 /// data. On success, returns the number of bytes sent.
374 pub async fn send_msg_vectored<T: IoVectoredBuf, C: IoBuf>(
375 &self,
376 buffer: T,
377 control: C,
378 addr: impl ToSocketAddrsAsync,
379 ) -> BufResult<usize, (T, C)> {
380 super::first_addr_buf(
381 addr,
382 (buffer, control),
383 |addr, (buffer, control)| async move {
384 self.inner
385 .send_msg_vectored(buffer, control, Some(&SockAddr::from(addr)), 0)
386 .await
387 },
388 )
389 .await
390 }
391
392 /// Sends data on the socket to the given address with zero copy.
393 ///
394 /// Returns the result of send and a future that resolves to the
395 /// original buffer when the send is complete.
396 pub async fn send_to_zerocopy<A: ToSocketAddrsAsync, T: IoBuf>(
397 &self,
398 buffer: T,
399 addr: A,
400 ) -> BufResult<usize, impl Future<Output = T> + use<A, T>> {
401 super::first_addr_buf_zerocopy(addr, buffer, |addr, buffer| async move {
402 self.inner.send_to_zerocopy(buffer, &addr.into(), 0).await
403 })
404 .await
405 }
406
407 /// Sends vectored data on the socket to the given address with zero copy.
408 ///
409 /// Returns the result of send and a future that resolves to the
410 /// original buffer when the send is complete.
411 pub async fn send_to_zerocopy_vectored<A: ToSocketAddrsAsync, T: IoVectoredBuf>(
412 &self,
413 buffer: T,
414 addr: A,
415 ) -> BufResult<usize, impl Future<Output = T> + use<A, T>> {
416 super::first_addr_buf_zerocopy(addr, buffer, |addr, buffer| async move {
417 self.inner
418 .send_to_zerocopy_vectored(buffer, &addr.into(), 0)
419 .await
420 })
421 .await
422 }
423
424 /// Sends data with control message on the socket to the given address with
425 /// zero copy.
426 ///
427 /// Returns the result of send and a future that resolves to the
428 /// original buffer when the send is complete.
429 pub async fn send_msg_zerocopy<A: ToSocketAddrsAsync, T: IoBuf, C: IoBuf>(
430 &self,
431 buffer: T,
432 control: C,
433 addr: A,
434 ) -> BufResult<usize, impl Future<Output = (T, C)> + use<A, T, C>> {
435 super::first_addr_buf_zerocopy(addr, (buffer, control), |addr, (b, c)| async move {
436 self.inner
437 .send_msg_zerocopy(b, c, Some(&addr.into()), 0)
438 .await
439 })
440 .await
441 }
442
443 /// Sends vectored data with control message on the socket to the given
444 /// address with zero copy.
445 ///
446 /// Returns the result of send and a future that resolves to the
447 /// original buffer when the send is complete.
448 pub async fn send_msg_zerocopy_vectored<A: ToSocketAddrsAsync, T: IoVectoredBuf, C: IoBuf>(
449 &self,
450 buffer: T,
451 control: C,
452 addr: A,
453 ) -> BufResult<usize, impl Future<Output = (T, C)> + use<A, T, C>> {
454 super::first_addr_buf_zerocopy(addr, (buffer, control), |addr, (b, c)| async move {
455 self.inner
456 .send_msg_zerocopy_vectored(b, c, Some(&addr.into()), 0)
457 .await
458 })
459 .await
460 }
461
462 /// Gets a socket option.
463 ///
464 /// # Safety
465 ///
466 /// The caller must ensure `T` is the correct type for `level` and `name`.
467 pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
468 unsafe { self.inner.get_socket_option(level, name) }
469 }
470
471 /// Sets a socket option.
472 ///
473 /// # Safety
474 ///
475 /// The caller must ensure `T` is the correct type for `level` and `name`.
476 pub unsafe fn set_socket_option<T: Copy>(
477 &self,
478 level: i32,
479 name: i32,
480 value: &T,
481 ) -> io::Result<()> {
482 unsafe { self.inner.set_socket_option(level, name, value) }
483 }
484}
485
486impl_raw_fd!(UdpSocket, socket2::Socket, inner, socket);