1 use futures::stream::iter;
2 use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap};
3 use tokio_test::{assert_ok, assert_pending, assert_ready, task};
4
5 use std::future::{poll_fn, Future};
6 use std::pin::{pin, Pin};
7 use std::task::Poll;
8
9 mod support {
10 pub(crate) mod mpsc;
11 }
12
13 use support::mpsc;
14
15 macro_rules! assert_ready_some {
16 ($($t:tt)*) => {
17 match assert_ready!($($t)*) {
18 Some(v) => v,
19 None => panic!("expected `Some`, got `None`"),
20 }
21 };
22 }
23
24 macro_rules! assert_ready_none {
25 ($($t:tt)*) => {
26 match assert_ready!($($t)*) {
27 None => {}
28 Some(v) => panic!("expected `None`, got `Some({:?})`", v),
29 }
30 };
31 }
32
33 #[tokio::test]
empty()34 async fn empty() {
35 let mut map = StreamMap::<&str, stream::Pending<()>>::new();
36
37 assert_eq!(map.len(), 0);
38 assert!(map.is_empty());
39
40 assert!(map.next().await.is_none());
41 assert!(map.next().await.is_none());
42
43 assert!(map.remove("foo").is_none());
44 }
45
46 #[tokio::test]
single_entry()47 async fn single_entry() {
48 let mut map = task::spawn(StreamMap::new());
49 let (tx, rx) = mpsc::unbounded_channel_stream();
50 let rx = Box::pin(rx);
51
52 assert_ready_none!(map.poll_next());
53
54 assert!(map.insert("foo", rx).is_none());
55 assert!(map.contains_key("foo"));
56 assert!(!map.contains_key("bar"));
57
58 assert_eq!(map.len(), 1);
59 assert!(!map.is_empty());
60
61 assert_pending!(map.poll_next());
62
63 assert_ok!(tx.send(1));
64
65 assert!(map.is_woken());
66 let (k, v) = assert_ready_some!(map.poll_next());
67 assert_eq!(k, "foo");
68 assert_eq!(v, 1);
69
70 assert_pending!(map.poll_next());
71
72 assert_ok!(tx.send(2));
73
74 assert!(map.is_woken());
75 let (k, v) = assert_ready_some!(map.poll_next());
76 assert_eq!(k, "foo");
77 assert_eq!(v, 2);
78
79 assert_pending!(map.poll_next());
80 drop(tx);
81 assert!(map.is_woken());
82 assert_ready_none!(map.poll_next());
83 }
84
85 #[tokio::test]
multiple_entries()86 async fn multiple_entries() {
87 let mut map = task::spawn(StreamMap::new());
88 let (tx1, rx1) = mpsc::unbounded_channel_stream();
89 let (tx2, rx2) = mpsc::unbounded_channel_stream();
90
91 let rx1 = Box::pin(rx1);
92 let rx2 = Box::pin(rx2);
93
94 map.insert("foo", rx1);
95 map.insert("bar", rx2);
96
97 assert_pending!(map.poll_next());
98
99 assert_ok!(tx1.send(1));
100
101 assert!(map.is_woken());
102 let (k, v) = assert_ready_some!(map.poll_next());
103 assert_eq!(k, "foo");
104 assert_eq!(v, 1);
105
106 assert_pending!(map.poll_next());
107
108 assert_ok!(tx2.send(2));
109
110 assert!(map.is_woken());
111 let (k, v) = assert_ready_some!(map.poll_next());
112 assert_eq!(k, "bar");
113 assert_eq!(v, 2);
114
115 assert_pending!(map.poll_next());
116
117 assert_ok!(tx1.send(3));
118 assert_ok!(tx2.send(4));
119
120 assert!(map.is_woken());
121
122 // Given the randomization, there is no guarantee what order the values will
123 // be received in.
124 let mut v = (0..2)
125 .map(|_| assert_ready_some!(map.poll_next()))
126 .collect::<Vec<_>>();
127
128 assert_pending!(map.poll_next());
129
130 v.sort_unstable();
131 assert_eq!(v[0].0, "bar");
132 assert_eq!(v[0].1, 4);
133 assert_eq!(v[1].0, "foo");
134 assert_eq!(v[1].1, 3);
135
136 drop(tx1);
137 assert!(map.is_woken());
138 assert_pending!(map.poll_next());
139 drop(tx2);
140
141 assert_ready_none!(map.poll_next());
142 }
143
144 #[tokio::test]
insert_remove()145 async fn insert_remove() {
146 let mut map = task::spawn(StreamMap::new());
147 let (tx, rx) = mpsc::unbounded_channel_stream();
148
149 let rx = Box::pin(rx);
150
151 assert_ready_none!(map.poll_next());
152
153 assert!(map.insert("foo", rx).is_none());
154 let rx = map.remove("foo").unwrap();
155
156 assert_ok!(tx.send(1));
157
158 assert!(!map.is_woken());
159 assert_ready_none!(map.poll_next());
160
161 assert!(map.insert("bar", rx).is_none());
162
163 let v = assert_ready_some!(map.poll_next());
164 assert_eq!(v.0, "bar");
165 assert_eq!(v.1, 1);
166
167 assert!(map.remove("bar").is_some());
168 assert_ready_none!(map.poll_next());
169
170 assert!(map.is_empty());
171 assert_eq!(0, map.len());
172 }
173
174 #[tokio::test]
replace()175 async fn replace() {
176 let mut map = task::spawn(StreamMap::new());
177 let (tx1, rx1) = mpsc::unbounded_channel_stream();
178 let (tx2, rx2) = mpsc::unbounded_channel_stream();
179
180 let rx1 = Box::pin(rx1);
181 let rx2 = Box::pin(rx2);
182
183 assert!(map.insert("foo", rx1).is_none());
184
185 assert_pending!(map.poll_next());
186
187 let _rx1 = map.insert("foo", rx2).unwrap();
188
189 assert_pending!(map.poll_next());
190
191 tx1.send(1).unwrap();
192 assert_pending!(map.poll_next());
193
194 tx2.send(2).unwrap();
195 assert!(map.is_woken());
196 let v = assert_ready_some!(map.poll_next());
197 assert_eq!(v.0, "foo");
198 assert_eq!(v.1, 2);
199 }
200
201 #[test]
size_hint_with_upper()202 fn size_hint_with_upper() {
203 let mut map = StreamMap::new();
204
205 map.insert("a", stream::iter(vec![1]));
206 map.insert("b", stream::iter(vec![1, 2]));
207 map.insert("c", stream::iter(vec![1, 2, 3]));
208
209 assert_eq!(3, map.len());
210 assert!(!map.is_empty());
211
212 let size_hint = map.size_hint();
213 assert_eq!(size_hint, (6, Some(6)));
214 }
215
216 #[test]
size_hint_without_upper()217 fn size_hint_without_upper() {
218 let mut map = StreamMap::new();
219
220 map.insert("a", pin_box(stream::iter(vec![1])));
221 map.insert("b", pin_box(stream::iter(vec![1, 2])));
222 map.insert("c", pin_box(pending()));
223
224 let size_hint = map.size_hint();
225 assert_eq!(size_hint, (3, None));
226 }
227
228 #[test]
new_capacity_zero()229 fn new_capacity_zero() {
230 let map = StreamMap::<&str, stream::Pending<()>>::new();
231 assert_eq!(0, map.capacity());
232
233 assert!(map.keys().next().is_none());
234 }
235
236 #[test]
with_capacity()237 fn with_capacity() {
238 let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
239 assert!(10 <= map.capacity());
240
241 assert!(map.keys().next().is_none());
242 }
243
244 #[test]
iter_keys()245 fn iter_keys() {
246 let mut map = StreamMap::new();
247
248 map.insert("a", pending::<i32>());
249 map.insert("b", pending());
250 map.insert("c", pending());
251
252 let mut keys = map.keys().collect::<Vec<_>>();
253 keys.sort_unstable();
254
255 assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
256 }
257
258 #[test]
iter_values()259 fn iter_values() {
260 let mut map = StreamMap::new();
261
262 map.insert("a", stream::iter(vec![1]));
263 map.insert("b", stream::iter(vec![1, 2]));
264 map.insert("c", stream::iter(vec![1, 2, 3]));
265
266 let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
267
268 size_hints.sort_unstable();
269
270 assert_eq!(&size_hints[..], &[1, 2, 3]);
271 }
272
273 #[test]
iter_values_mut()274 fn iter_values_mut() {
275 let mut map = StreamMap::new();
276
277 map.insert("a", stream::iter(vec![1]));
278 map.insert("b", stream::iter(vec![1, 2]));
279 map.insert("c", stream::iter(vec![1, 2, 3]));
280
281 let mut size_hints = map
282 .values_mut()
283 .map(|s: &mut _| s.size_hint().0)
284 .collect::<Vec<_>>();
285
286 size_hints.sort_unstable();
287
288 assert_eq!(&size_hints[..], &[1, 2, 3]);
289 }
290
291 #[test]
clear()292 fn clear() {
293 let mut map = task::spawn(StreamMap::new());
294
295 map.insert("a", stream::iter(vec![1]));
296 map.insert("b", stream::iter(vec![1, 2]));
297 map.insert("c", stream::iter(vec![1, 2, 3]));
298
299 assert_ready_some!(map.poll_next());
300
301 map.clear();
302
303 assert_ready_none!(map.poll_next());
304 assert!(map.is_empty());
305 }
306
307 #[test]
contains_key_borrow()308 fn contains_key_borrow() {
309 let mut map = StreamMap::new();
310 map.insert("foo".to_string(), pending::<()>());
311
312 assert!(map.contains_key("foo"));
313 }
314
315 #[test]
one_ready_many_none()316 fn one_ready_many_none() {
317 // Run a few times because of randomness
318 for _ in 0..100 {
319 let mut map = task::spawn(StreamMap::new());
320
321 map.insert(0, pin_box(stream::empty()));
322 map.insert(1, pin_box(stream::empty()));
323 map.insert(2, pin_box(stream::once("hello")));
324 map.insert(3, pin_box(stream::pending()));
325
326 let v = assert_ready_some!(map.poll_next());
327 assert_eq!(v, (2, "hello"));
328 }
329 }
330
pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>>331 fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
332 Box::pin(s)
333 }
334
335 type UsizeStream = Pin<Box<dyn Stream<Item = usize> + Send>>;
336
337 #[tokio::test]
poll_next_many_zero()338 async fn poll_next_many_zero() {
339 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
340
341 stream_map.insert(0, Box::pin(pending()) as UsizeStream);
342
343 let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut vec![], 0)).await;
344
345 assert_eq!(n, 0);
346 }
347
348 #[tokio::test]
poll_next_many_empty()349 async fn poll_next_many_empty() {
350 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
351
352 let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut vec![], 1)).await;
353
354 assert_eq!(n, 0);
355 }
356
357 #[tokio::test]
poll_next_many_pending()358 async fn poll_next_many_pending() {
359 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
360
361 stream_map.insert(0, Box::pin(pending()) as UsizeStream);
362
363 let mut is_pending = false;
364 poll_fn(|cx| {
365 let poll = stream_map.poll_next_many(cx, &mut vec![], 1);
366
367 is_pending = poll.is_pending();
368
369 Poll::Ready(())
370 })
371 .await;
372
373 assert!(is_pending);
374 }
375
376 #[tokio::test]
poll_next_many_not_enough()377 async fn poll_next_many_not_enough() {
378 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
379
380 stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
381 stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
382
383 let mut buffer = vec![];
384 let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 3)).await;
385
386 assert_eq!(n, 2);
387 assert_eq!(buffer.len(), 2);
388 assert!(buffer.contains(&(0, 0)));
389 assert!(buffer.contains(&(1, 1)));
390 }
391
392 #[tokio::test]
poll_next_many_enough()393 async fn poll_next_many_enough() {
394 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
395
396 stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
397 stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
398
399 let mut buffer = vec![];
400 let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 2)).await;
401
402 assert_eq!(n, 2);
403 assert_eq!(buffer.len(), 2);
404 assert!(buffer.contains(&(0, 0)));
405 assert!(buffer.contains(&(1, 1)));
406 }
407
408 #[tokio::test]
poll_next_many_correctly_loops_around()409 async fn poll_next_many_correctly_loops_around() {
410 for _ in 0..10 {
411 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
412
413 stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
414 stream_map.insert(1, Box::pin(iter([0usize, 1].into_iter())) as UsizeStream);
415 stream_map.insert(2, Box::pin(iter([0usize, 1, 2].into_iter())) as UsizeStream);
416
417 let mut buffer = vec![];
418
419 let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 3)).await;
420 assert_eq!(n, 3);
421 assert_eq!(
422 std::mem::take(&mut buffer)
423 .into_iter()
424 .map(|(_, v)| v)
425 .collect::<Vec<_>>(),
426 vec![0, 0, 0]
427 );
428
429 let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 2)).await;
430 assert_eq!(n, 2);
431 assert_eq!(
432 std::mem::take(&mut buffer)
433 .into_iter()
434 .map(|(_, v)| v)
435 .collect::<Vec<_>>(),
436 vec![1, 1]
437 );
438
439 let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 1)).await;
440 assert_eq!(n, 1);
441 assert_eq!(
442 std::mem::take(&mut buffer)
443 .into_iter()
444 .map(|(_, v)| v)
445 .collect::<Vec<_>>(),
446 vec![2]
447 );
448 }
449 }
450
451 #[tokio::test]
next_many_zero()452 async fn next_many_zero() {
453 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
454
455 stream_map.insert(0, Box::pin(pending()) as UsizeStream);
456
457 let n = poll_fn(|cx| pin!(stream_map.next_many(&mut vec![], 0)).poll(cx)).await;
458
459 assert_eq!(n, 0);
460 }
461
462 #[tokio::test]
next_many_empty()463 async fn next_many_empty() {
464 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
465
466 let n = stream_map.next_many(&mut vec![], 1).await;
467
468 assert_eq!(n, 0);
469 }
470
471 #[tokio::test]
next_many_pending()472 async fn next_many_pending() {
473 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
474
475 stream_map.insert(0, Box::pin(pending()) as UsizeStream);
476
477 let mut is_pending = false;
478 poll_fn(|cx| {
479 let poll = pin!(stream_map.next_many(&mut vec![], 1)).poll(cx);
480
481 is_pending = poll.is_pending();
482
483 Poll::Ready(())
484 })
485 .await;
486
487 assert!(is_pending);
488 }
489
490 #[tokio::test]
next_many_not_enough()491 async fn next_many_not_enough() {
492 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
493
494 stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
495 stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
496
497 let mut buffer = vec![];
498 let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 3)).poll(cx)).await;
499
500 assert_eq!(n, 2);
501 assert_eq!(buffer.len(), 2);
502 assert!(buffer.contains(&(0, 0)));
503 assert!(buffer.contains(&(1, 1)));
504 }
505
506 #[tokio::test]
next_many_enough()507 async fn next_many_enough() {
508 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
509
510 stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
511 stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
512
513 let mut buffer = vec![];
514 let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 2)).poll(cx)).await;
515
516 assert_eq!(n, 2);
517 assert_eq!(buffer.len(), 2);
518 assert!(buffer.contains(&(0, 0)));
519 assert!(buffer.contains(&(1, 1)));
520 }
521
522 #[tokio::test]
next_many_correctly_loops_around()523 async fn next_many_correctly_loops_around() {
524 for _ in 0..10 {
525 let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
526
527 stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
528 stream_map.insert(1, Box::pin(iter([0usize, 1].into_iter())) as UsizeStream);
529 stream_map.insert(2, Box::pin(iter([0usize, 1, 2].into_iter())) as UsizeStream);
530
531 let mut buffer = vec![];
532
533 let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 3)).poll(cx)).await;
534 assert_eq!(n, 3);
535 assert_eq!(
536 std::mem::take(&mut buffer)
537 .into_iter()
538 .map(|(_, v)| v)
539 .collect::<Vec<_>>(),
540 vec![0, 0, 0]
541 );
542
543 let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 2)).poll(cx)).await;
544 assert_eq!(n, 2);
545 assert_eq!(
546 std::mem::take(&mut buffer)
547 .into_iter()
548 .map(|(_, v)| v)
549 .collect::<Vec<_>>(),
550 vec![1, 1]
551 );
552
553 let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 1)).poll(cx)).await;
554 assert_eq!(n, 1);
555 assert_eq!(
556 std::mem::take(&mut buffer)
557 .into_iter()
558 .map(|(_, v)| v)
559 .collect::<Vec<_>>(),
560 vec![2]
561 );
562 }
563 }
564