1 use std::ffi::c_void;
2 use std::mem::ManuallyDrop;
3 use std::ptr;
4 use std::task::{Context, Poll};
5 
6 use http::HeaderMap;
7 use libc::{c_int, size_t};
8 
9 use super::task::{hyper_context, hyper_task, hyper_task_return_type, AsTaskType};
10 use super::{UserDataPointer, HYPER_ITER_CONTINUE};
11 use crate::body::{Body, Bytes, HttpBody as _};
12 
13 /// A streaming HTTP body.
14 pub struct hyper_body(pub(super) Body);
15 
16 /// A buffer of bytes that is sent or received on a `hyper_body`.
17 pub struct hyper_buf(pub(crate) Bytes);
18 
19 pub(crate) struct UserBody {
20     data_func: hyper_body_data_callback,
21     userdata: *mut c_void,
22 }
23 
24 // ===== Body =====
25 
26 type hyper_body_foreach_callback = extern "C" fn(*mut c_void, *const hyper_buf) -> c_int;
27 
28 type hyper_body_data_callback =
29     extern "C" fn(*mut c_void, *mut hyper_context<'_>, *mut *mut hyper_buf) -> c_int;
30 
31 ffi_fn! {
32     /// Create a new "empty" body.
33     ///
34     /// If not configured, this body acts as an empty payload.
35     fn hyper_body_new() -> *mut hyper_body {
36         Box::into_raw(Box::new(hyper_body(Body::empty())))
37     } ?= ptr::null_mut()
38 }
39 
40 ffi_fn! {
41     /// Free a `hyper_body *`.
42     fn hyper_body_free(body: *mut hyper_body) {
43         drop(non_null!(Box::from_raw(body) ?= ()));
44     }
45 }
46 
47 ffi_fn! {
48     /// Return a task that will poll the body for the next buffer of data.
49     ///
50     /// The task value may have different types depending on the outcome:
51     ///
52     /// - `HYPER_TASK_BUF`: Success, and more data was received.
53     /// - `HYPER_TASK_ERROR`: An error retrieving the data.
54     /// - `HYPER_TASK_EMPTY`: The body has finished streaming data.
55     ///
56     /// This does not consume the `hyper_body *`, so it may be used to again.
57     /// However, it MUST NOT be used or freed until the related task completes.
58     fn hyper_body_data(body: *mut hyper_body) -> *mut hyper_task {
59         // This doesn't take ownership of the Body, so don't allow destructor
60         let mut body = ManuallyDrop::new(non_null!(Box::from_raw(body) ?= ptr::null_mut()));
61 
62         Box::into_raw(hyper_task::boxed(async move {
63             body.0.data().await.map(|res| res.map(hyper_buf))
64         }))
65     } ?= ptr::null_mut()
66 }
67 
68 ffi_fn! {
69     /// Return a task that will poll the body and execute the callback with each
70     /// body chunk that is received.
71     ///
72     /// The `hyper_buf` pointer is only a borrowed reference, it cannot live outside
73     /// the execution of the callback. You must make a copy to retain it.
74     ///
75     /// The callback should return `HYPER_ITER_CONTINUE` to continue iterating
76     /// chunks as they are received, or `HYPER_ITER_BREAK` to cancel.
77     ///
78     /// This will consume the `hyper_body *`, you shouldn't use it anymore or free it.
79     fn hyper_body_foreach(body: *mut hyper_body, func: hyper_body_foreach_callback, userdata: *mut c_void) -> *mut hyper_task {
80         let mut body = non_null!(Box::from_raw(body) ?= ptr::null_mut());
81         let userdata = UserDataPointer(userdata);
82 
83         Box::into_raw(hyper_task::boxed(async move {
84             while let Some(item) = body.0.data().await {
85                 let chunk = item?;
86                 if HYPER_ITER_CONTINUE != func(userdata.0, &hyper_buf(chunk)) {
87                     return Err(crate::Error::new_user_aborted_by_callback());
88                 }
89             }
90             Ok(())
91         }))
92     } ?= ptr::null_mut()
93 }
94 
95 ffi_fn! {
96     /// Set userdata on this body, which will be passed to callback functions.
97     fn hyper_body_set_userdata(body: *mut hyper_body, userdata: *mut c_void) {
98         let b = non_null!(&mut *body ?= ());
99         b.0.as_ffi_mut().userdata = userdata;
100     }
101 }
102 
103 ffi_fn! {
104     /// Set the data callback for this body.
105     ///
106     /// The callback is called each time hyper needs to send more data for the
107     /// body. It is passed the value from `hyper_body_set_userdata`.
108     ///
109     /// If there is data available, the `hyper_buf **` argument should be set
110     /// to a `hyper_buf *` containing the data, and `HYPER_POLL_READY` should
111     /// be returned.
112     ///
113     /// Returning `HYPER_POLL_READY` while the `hyper_buf **` argument points
114     /// to `NULL` will indicate the body has completed all data.
115     ///
116     /// If there is more data to send, but it isn't yet available, a
117     /// `hyper_waker` should be saved from the `hyper_context *` argument, and
118     /// `HYPER_POLL_PENDING` should be returned. You must wake the saved waker
119     /// to signal the task when data is available.
120     ///
121     /// If some error has occurred, you can return `HYPER_POLL_ERROR` to abort
122     /// the body.
123     fn hyper_body_set_data_func(body: *mut hyper_body, func: hyper_body_data_callback) {
124         let b = non_null!{ &mut *body ?= () };
125         b.0.as_ffi_mut().data_func = func;
126     }
127 }
128 
129 // ===== impl UserBody =====
130 
131 impl UserBody {
new() -> UserBody132     pub(crate) fn new() -> UserBody {
133         UserBody {
134             data_func: data_noop,
135             userdata: std::ptr::null_mut(),
136         }
137     }
138 
poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>>139     pub(crate) fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
140         let mut out = std::ptr::null_mut();
141         match (self.data_func)(self.userdata, hyper_context::wrap(cx), &mut out) {
142             super::task::HYPER_POLL_READY => {
143                 if out.is_null() {
144                     Poll::Ready(None)
145                 } else {
146                     let buf = unsafe { Box::from_raw(out) };
147                     Poll::Ready(Some(Ok(buf.0)))
148                 }
149             }
150             super::task::HYPER_POLL_PENDING => Poll::Pending,
151             super::task::HYPER_POLL_ERROR => {
152                 Poll::Ready(Some(Err(crate::Error::new_body_write_aborted())))
153             }
154             unexpected => Poll::Ready(Some(Err(crate::Error::new_body_write(format!(
155                 "unexpected hyper_body_data_func return code {}",
156                 unexpected
157             ))))),
158         }
159     }
160 
poll_trailers( &mut self, _cx: &mut Context<'_>, ) -> Poll<crate::Result<Option<HeaderMap>>>161     pub(crate) fn poll_trailers(
162         &mut self,
163         _cx: &mut Context<'_>,
164     ) -> Poll<crate::Result<Option<HeaderMap>>> {
165         Poll::Ready(Ok(None))
166     }
167 }
168 
169 /// cbindgen:ignore
data_noop( _userdata: *mut c_void, _: *mut hyper_context<'_>, _: *mut *mut hyper_buf, ) -> c_int170 extern "C" fn data_noop(
171     _userdata: *mut c_void,
172     _: *mut hyper_context<'_>,
173     _: *mut *mut hyper_buf,
174 ) -> c_int {
175     super::task::HYPER_POLL_READY
176 }
177 
178 unsafe impl Send for UserBody {}
179 unsafe impl Sync for UserBody {}
180 
181 // ===== Bytes =====
182 
183 ffi_fn! {
184     /// Create a new `hyper_buf *` by copying the provided bytes.
185     ///
186     /// This makes an owned copy of the bytes, so the `buf` argument can be
187     /// freed or changed afterwards.
188     ///
189     /// This returns `NULL` if allocating a new buffer fails.
190     fn hyper_buf_copy(buf: *const u8, len: size_t) -> *mut hyper_buf {
191         let slice = unsafe {
192             std::slice::from_raw_parts(buf, len)
193         };
194         Box::into_raw(Box::new(hyper_buf(Bytes::copy_from_slice(slice))))
195     } ?= ptr::null_mut()
196 }
197 
198 ffi_fn! {
199     /// Get a pointer to the bytes in this buffer.
200     ///
201     /// This should be used in conjunction with `hyper_buf_len` to get the length
202     /// of the bytes data.
203     ///
204     /// This pointer is borrowed data, and not valid once the `hyper_buf` is
205     /// consumed/freed.
206     fn hyper_buf_bytes(buf: *const hyper_buf) -> *const u8 {
207         unsafe { (*buf).0.as_ptr() }
208     } ?= ptr::null()
209 }
210 
211 ffi_fn! {
212     /// Get the length of the bytes this buffer contains.
213     fn hyper_buf_len(buf: *const hyper_buf) -> size_t {
214         unsafe { (*buf).0.len() }
215     }
216 }
217 
218 ffi_fn! {
219     /// Free this buffer.
220     fn hyper_buf_free(buf: *mut hyper_buf) {
221         drop(unsafe { Box::from_raw(buf) });
222     }
223 }
224 
225 unsafe impl AsTaskType for hyper_buf {
as_task_type(&self) -> hyper_task_return_type226     fn as_task_type(&self) -> hyper_task_return_type {
227         hyper_task_return_type::HYPER_TASK_BUF
228     }
229 }
230