1 use std::ffi::c_void;
2 use std::mem::ManuallyDrop;
3 use std::ptr;
4 use std::task::{Context, Poll};
5
6 use http::HeaderMap;
7 use libc::{c_int, size_t};
8
9 use super::task::{hyper_context, hyper_task, hyper_task_return_type, AsTaskType};
10 use super::{UserDataPointer, HYPER_ITER_CONTINUE};
11 use crate::body::{Body, Bytes, HttpBody as _};
12
13 /// A streaming HTTP body.
14 pub struct hyper_body(pub(super) Body);
15
16 /// A buffer of bytes that is sent or received on a `hyper_body`.
17 pub struct hyper_buf(pub(crate) Bytes);
18
19 pub(crate) struct UserBody {
20 data_func: hyper_body_data_callback,
21 userdata: *mut c_void,
22 }
23
24 // ===== Body =====
25
26 type hyper_body_foreach_callback = extern "C" fn(*mut c_void, *const hyper_buf) -> c_int;
27
28 type hyper_body_data_callback =
29 extern "C" fn(*mut c_void, *mut hyper_context<'_>, *mut *mut hyper_buf) -> c_int;
30
31 ffi_fn! {
32 /// Create a new "empty" body.
33 ///
34 /// If not configured, this body acts as an empty payload.
35 fn hyper_body_new() -> *mut hyper_body {
36 Box::into_raw(Box::new(hyper_body(Body::empty())))
37 } ?= ptr::null_mut()
38 }
39
40 ffi_fn! {
41 /// Free a `hyper_body *`.
42 fn hyper_body_free(body: *mut hyper_body) {
43 drop(non_null!(Box::from_raw(body) ?= ()));
44 }
45 }
46
47 ffi_fn! {
48 /// Return a task that will poll the body for the next buffer of data.
49 ///
50 /// The task value may have different types depending on the outcome:
51 ///
52 /// - `HYPER_TASK_BUF`: Success, and more data was received.
53 /// - `HYPER_TASK_ERROR`: An error retrieving the data.
54 /// - `HYPER_TASK_EMPTY`: The body has finished streaming data.
55 ///
56 /// This does not consume the `hyper_body *`, so it may be used to again.
57 /// However, it MUST NOT be used or freed until the related task completes.
58 fn hyper_body_data(body: *mut hyper_body) -> *mut hyper_task {
59 // This doesn't take ownership of the Body, so don't allow destructor
60 let mut body = ManuallyDrop::new(non_null!(Box::from_raw(body) ?= ptr::null_mut()));
61
62 Box::into_raw(hyper_task::boxed(async move {
63 body.0.data().await.map(|res| res.map(hyper_buf))
64 }))
65 } ?= ptr::null_mut()
66 }
67
68 ffi_fn! {
69 /// Return a task that will poll the body and execute the callback with each
70 /// body chunk that is received.
71 ///
72 /// The `hyper_buf` pointer is only a borrowed reference, it cannot live outside
73 /// the execution of the callback. You must make a copy to retain it.
74 ///
75 /// The callback should return `HYPER_ITER_CONTINUE` to continue iterating
76 /// chunks as they are received, or `HYPER_ITER_BREAK` to cancel.
77 ///
78 /// This will consume the `hyper_body *`, you shouldn't use it anymore or free it.
79 fn hyper_body_foreach(body: *mut hyper_body, func: hyper_body_foreach_callback, userdata: *mut c_void) -> *mut hyper_task {
80 let mut body = non_null!(Box::from_raw(body) ?= ptr::null_mut());
81 let userdata = UserDataPointer(userdata);
82
83 Box::into_raw(hyper_task::boxed(async move {
84 while let Some(item) = body.0.data().await {
85 let chunk = item?;
86 if HYPER_ITER_CONTINUE != func(userdata.0, &hyper_buf(chunk)) {
87 return Err(crate::Error::new_user_aborted_by_callback());
88 }
89 }
90 Ok(())
91 }))
92 } ?= ptr::null_mut()
93 }
94
95 ffi_fn! {
96 /// Set userdata on this body, which will be passed to callback functions.
97 fn hyper_body_set_userdata(body: *mut hyper_body, userdata: *mut c_void) {
98 let b = non_null!(&mut *body ?= ());
99 b.0.as_ffi_mut().userdata = userdata;
100 }
101 }
102
103 ffi_fn! {
104 /// Set the data callback for this body.
105 ///
106 /// The callback is called each time hyper needs to send more data for the
107 /// body. It is passed the value from `hyper_body_set_userdata`.
108 ///
109 /// If there is data available, the `hyper_buf **` argument should be set
110 /// to a `hyper_buf *` containing the data, and `HYPER_POLL_READY` should
111 /// be returned.
112 ///
113 /// Returning `HYPER_POLL_READY` while the `hyper_buf **` argument points
114 /// to `NULL` will indicate the body has completed all data.
115 ///
116 /// If there is more data to send, but it isn't yet available, a
117 /// `hyper_waker` should be saved from the `hyper_context *` argument, and
118 /// `HYPER_POLL_PENDING` should be returned. You must wake the saved waker
119 /// to signal the task when data is available.
120 ///
121 /// If some error has occurred, you can return `HYPER_POLL_ERROR` to abort
122 /// the body.
123 fn hyper_body_set_data_func(body: *mut hyper_body, func: hyper_body_data_callback) {
124 let b = non_null!{ &mut *body ?= () };
125 b.0.as_ffi_mut().data_func = func;
126 }
127 }
128
129 // ===== impl UserBody =====
130
131 impl UserBody {
new() -> UserBody132 pub(crate) fn new() -> UserBody {
133 UserBody {
134 data_func: data_noop,
135 userdata: std::ptr::null_mut(),
136 }
137 }
138
poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>>139 pub(crate) fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
140 let mut out = std::ptr::null_mut();
141 match (self.data_func)(self.userdata, hyper_context::wrap(cx), &mut out) {
142 super::task::HYPER_POLL_READY => {
143 if out.is_null() {
144 Poll::Ready(None)
145 } else {
146 let buf = unsafe { Box::from_raw(out) };
147 Poll::Ready(Some(Ok(buf.0)))
148 }
149 }
150 super::task::HYPER_POLL_PENDING => Poll::Pending,
151 super::task::HYPER_POLL_ERROR => {
152 Poll::Ready(Some(Err(crate::Error::new_body_write_aborted())))
153 }
154 unexpected => Poll::Ready(Some(Err(crate::Error::new_body_write(format!(
155 "unexpected hyper_body_data_func return code {}",
156 unexpected
157 ))))),
158 }
159 }
160
poll_trailers( &mut self, _cx: &mut Context<'_>, ) -> Poll<crate::Result<Option<HeaderMap>>>161 pub(crate) fn poll_trailers(
162 &mut self,
163 _cx: &mut Context<'_>,
164 ) -> Poll<crate::Result<Option<HeaderMap>>> {
165 Poll::Ready(Ok(None))
166 }
167 }
168
169 /// cbindgen:ignore
data_noop( _userdata: *mut c_void, _: *mut hyper_context<'_>, _: *mut *mut hyper_buf, ) -> c_int170 extern "C" fn data_noop(
171 _userdata: *mut c_void,
172 _: *mut hyper_context<'_>,
173 _: *mut *mut hyper_buf,
174 ) -> c_int {
175 super::task::HYPER_POLL_READY
176 }
177
178 unsafe impl Send for UserBody {}
179 unsafe impl Sync for UserBody {}
180
181 // ===== Bytes =====
182
183 ffi_fn! {
184 /// Create a new `hyper_buf *` by copying the provided bytes.
185 ///
186 /// This makes an owned copy of the bytes, so the `buf` argument can be
187 /// freed or changed afterwards.
188 ///
189 /// This returns `NULL` if allocating a new buffer fails.
190 fn hyper_buf_copy(buf: *const u8, len: size_t) -> *mut hyper_buf {
191 let slice = unsafe {
192 std::slice::from_raw_parts(buf, len)
193 };
194 Box::into_raw(Box::new(hyper_buf(Bytes::copy_from_slice(slice))))
195 } ?= ptr::null_mut()
196 }
197
198 ffi_fn! {
199 /// Get a pointer to the bytes in this buffer.
200 ///
201 /// This should be used in conjunction with `hyper_buf_len` to get the length
202 /// of the bytes data.
203 ///
204 /// This pointer is borrowed data, and not valid once the `hyper_buf` is
205 /// consumed/freed.
206 fn hyper_buf_bytes(buf: *const hyper_buf) -> *const u8 {
207 unsafe { (*buf).0.as_ptr() }
208 } ?= ptr::null()
209 }
210
211 ffi_fn! {
212 /// Get the length of the bytes this buffer contains.
213 fn hyper_buf_len(buf: *const hyper_buf) -> size_t {
214 unsafe { (*buf).0.len() }
215 }
216 }
217
218 ffi_fn! {
219 /// Free this buffer.
220 fn hyper_buf_free(buf: *mut hyper_buf) {
221 drop(unsafe { Box::from_raw(buf) });
222 }
223 }
224
225 unsafe impl AsTaskType for hyper_buf {
as_task_type(&self) -> hyper_task_return_type226 fn as_task_type(&self) -> hyper_task_return_type {
227 hyper_task_return_type::HYPER_TASK_BUF
228 }
229 }
230