1 use futures::stream::iter;
2 use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap};
3 use tokio_test::{assert_ok, assert_pending, assert_ready, task};
4 
5 use std::future::{poll_fn, Future};
6 use std::pin::{pin, Pin};
7 use std::task::Poll;
8 
9 mod support {
10     pub(crate) mod mpsc;
11 }
12 
13 use support::mpsc;
14 
15 macro_rules! assert_ready_some {
16     ($($t:tt)*) => {
17         match assert_ready!($($t)*) {
18             Some(v) => v,
19             None => panic!("expected `Some`, got `None`"),
20         }
21     };
22 }
23 
24 macro_rules! assert_ready_none {
25     ($($t:tt)*) => {
26         match assert_ready!($($t)*) {
27             None => {}
28             Some(v) => panic!("expected `None`, got `Some({:?})`", v),
29         }
30     };
31 }
32 
33 #[tokio::test]
empty()34 async fn empty() {
35     let mut map = StreamMap::<&str, stream::Pending<()>>::new();
36 
37     assert_eq!(map.len(), 0);
38     assert!(map.is_empty());
39 
40     assert!(map.next().await.is_none());
41     assert!(map.next().await.is_none());
42 
43     assert!(map.remove("foo").is_none());
44 }
45 
46 #[tokio::test]
single_entry()47 async fn single_entry() {
48     let mut map = task::spawn(StreamMap::new());
49     let (tx, rx) = mpsc::unbounded_channel_stream();
50     let rx = Box::pin(rx);
51 
52     assert_ready_none!(map.poll_next());
53 
54     assert!(map.insert("foo", rx).is_none());
55     assert!(map.contains_key("foo"));
56     assert!(!map.contains_key("bar"));
57 
58     assert_eq!(map.len(), 1);
59     assert!(!map.is_empty());
60 
61     assert_pending!(map.poll_next());
62 
63     assert_ok!(tx.send(1));
64 
65     assert!(map.is_woken());
66     let (k, v) = assert_ready_some!(map.poll_next());
67     assert_eq!(k, "foo");
68     assert_eq!(v, 1);
69 
70     assert_pending!(map.poll_next());
71 
72     assert_ok!(tx.send(2));
73 
74     assert!(map.is_woken());
75     let (k, v) = assert_ready_some!(map.poll_next());
76     assert_eq!(k, "foo");
77     assert_eq!(v, 2);
78 
79     assert_pending!(map.poll_next());
80     drop(tx);
81     assert!(map.is_woken());
82     assert_ready_none!(map.poll_next());
83 }
84 
85 #[tokio::test]
multiple_entries()86 async fn multiple_entries() {
87     let mut map = task::spawn(StreamMap::new());
88     let (tx1, rx1) = mpsc::unbounded_channel_stream();
89     let (tx2, rx2) = mpsc::unbounded_channel_stream();
90 
91     let rx1 = Box::pin(rx1);
92     let rx2 = Box::pin(rx2);
93 
94     map.insert("foo", rx1);
95     map.insert("bar", rx2);
96 
97     assert_pending!(map.poll_next());
98 
99     assert_ok!(tx1.send(1));
100 
101     assert!(map.is_woken());
102     let (k, v) = assert_ready_some!(map.poll_next());
103     assert_eq!(k, "foo");
104     assert_eq!(v, 1);
105 
106     assert_pending!(map.poll_next());
107 
108     assert_ok!(tx2.send(2));
109 
110     assert!(map.is_woken());
111     let (k, v) = assert_ready_some!(map.poll_next());
112     assert_eq!(k, "bar");
113     assert_eq!(v, 2);
114 
115     assert_pending!(map.poll_next());
116 
117     assert_ok!(tx1.send(3));
118     assert_ok!(tx2.send(4));
119 
120     assert!(map.is_woken());
121 
122     // Given the randomization, there is no guarantee what order the values will
123     // be received in.
124     let mut v = (0..2)
125         .map(|_| assert_ready_some!(map.poll_next()))
126         .collect::<Vec<_>>();
127 
128     assert_pending!(map.poll_next());
129 
130     v.sort_unstable();
131     assert_eq!(v[0].0, "bar");
132     assert_eq!(v[0].1, 4);
133     assert_eq!(v[1].0, "foo");
134     assert_eq!(v[1].1, 3);
135 
136     drop(tx1);
137     assert!(map.is_woken());
138     assert_pending!(map.poll_next());
139     drop(tx2);
140 
141     assert_ready_none!(map.poll_next());
142 }
143 
144 #[tokio::test]
insert_remove()145 async fn insert_remove() {
146     let mut map = task::spawn(StreamMap::new());
147     let (tx, rx) = mpsc::unbounded_channel_stream();
148 
149     let rx = Box::pin(rx);
150 
151     assert_ready_none!(map.poll_next());
152 
153     assert!(map.insert("foo", rx).is_none());
154     let rx = map.remove("foo").unwrap();
155 
156     assert_ok!(tx.send(1));
157 
158     assert!(!map.is_woken());
159     assert_ready_none!(map.poll_next());
160 
161     assert!(map.insert("bar", rx).is_none());
162 
163     let v = assert_ready_some!(map.poll_next());
164     assert_eq!(v.0, "bar");
165     assert_eq!(v.1, 1);
166 
167     assert!(map.remove("bar").is_some());
168     assert_ready_none!(map.poll_next());
169 
170     assert!(map.is_empty());
171     assert_eq!(0, map.len());
172 }
173 
174 #[tokio::test]
replace()175 async fn replace() {
176     let mut map = task::spawn(StreamMap::new());
177     let (tx1, rx1) = mpsc::unbounded_channel_stream();
178     let (tx2, rx2) = mpsc::unbounded_channel_stream();
179 
180     let rx1 = Box::pin(rx1);
181     let rx2 = Box::pin(rx2);
182 
183     assert!(map.insert("foo", rx1).is_none());
184 
185     assert_pending!(map.poll_next());
186 
187     let _rx1 = map.insert("foo", rx2).unwrap();
188 
189     assert_pending!(map.poll_next());
190 
191     tx1.send(1).unwrap();
192     assert_pending!(map.poll_next());
193 
194     tx2.send(2).unwrap();
195     assert!(map.is_woken());
196     let v = assert_ready_some!(map.poll_next());
197     assert_eq!(v.0, "foo");
198     assert_eq!(v.1, 2);
199 }
200 
201 #[test]
size_hint_with_upper()202 fn size_hint_with_upper() {
203     let mut map = StreamMap::new();
204 
205     map.insert("a", stream::iter(vec![1]));
206     map.insert("b", stream::iter(vec![1, 2]));
207     map.insert("c", stream::iter(vec![1, 2, 3]));
208 
209     assert_eq!(3, map.len());
210     assert!(!map.is_empty());
211 
212     let size_hint = map.size_hint();
213     assert_eq!(size_hint, (6, Some(6)));
214 }
215 
216 #[test]
size_hint_without_upper()217 fn size_hint_without_upper() {
218     let mut map = StreamMap::new();
219 
220     map.insert("a", pin_box(stream::iter(vec![1])));
221     map.insert("b", pin_box(stream::iter(vec![1, 2])));
222     map.insert("c", pin_box(pending()));
223 
224     let size_hint = map.size_hint();
225     assert_eq!(size_hint, (3, None));
226 }
227 
228 #[test]
new_capacity_zero()229 fn new_capacity_zero() {
230     let map = StreamMap::<&str, stream::Pending<()>>::new();
231     assert_eq!(0, map.capacity());
232 
233     assert!(map.keys().next().is_none());
234 }
235 
236 #[test]
with_capacity()237 fn with_capacity() {
238     let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
239     assert!(10 <= map.capacity());
240 
241     assert!(map.keys().next().is_none());
242 }
243 
244 #[test]
iter_keys()245 fn iter_keys() {
246     let mut map = StreamMap::new();
247 
248     map.insert("a", pending::<i32>());
249     map.insert("b", pending());
250     map.insert("c", pending());
251 
252     let mut keys = map.keys().collect::<Vec<_>>();
253     keys.sort_unstable();
254 
255     assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
256 }
257 
258 #[test]
iter_values()259 fn iter_values() {
260     let mut map = StreamMap::new();
261 
262     map.insert("a", stream::iter(vec![1]));
263     map.insert("b", stream::iter(vec![1, 2]));
264     map.insert("c", stream::iter(vec![1, 2, 3]));
265 
266     let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
267 
268     size_hints.sort_unstable();
269 
270     assert_eq!(&size_hints[..], &[1, 2, 3]);
271 }
272 
273 #[test]
iter_values_mut()274 fn iter_values_mut() {
275     let mut map = StreamMap::new();
276 
277     map.insert("a", stream::iter(vec![1]));
278     map.insert("b", stream::iter(vec![1, 2]));
279     map.insert("c", stream::iter(vec![1, 2, 3]));
280 
281     let mut size_hints = map
282         .values_mut()
283         .map(|s: &mut _| s.size_hint().0)
284         .collect::<Vec<_>>();
285 
286     size_hints.sort_unstable();
287 
288     assert_eq!(&size_hints[..], &[1, 2, 3]);
289 }
290 
291 #[test]
clear()292 fn clear() {
293     let mut map = task::spawn(StreamMap::new());
294 
295     map.insert("a", stream::iter(vec![1]));
296     map.insert("b", stream::iter(vec![1, 2]));
297     map.insert("c", stream::iter(vec![1, 2, 3]));
298 
299     assert_ready_some!(map.poll_next());
300 
301     map.clear();
302 
303     assert_ready_none!(map.poll_next());
304     assert!(map.is_empty());
305 }
306 
307 #[test]
contains_key_borrow()308 fn contains_key_borrow() {
309     let mut map = StreamMap::new();
310     map.insert("foo".to_string(), pending::<()>());
311 
312     assert!(map.contains_key("foo"));
313 }
314 
315 #[test]
one_ready_many_none()316 fn one_ready_many_none() {
317     // Run a few times because of randomness
318     for _ in 0..100 {
319         let mut map = task::spawn(StreamMap::new());
320 
321         map.insert(0, pin_box(stream::empty()));
322         map.insert(1, pin_box(stream::empty()));
323         map.insert(2, pin_box(stream::once("hello")));
324         map.insert(3, pin_box(stream::pending()));
325 
326         let v = assert_ready_some!(map.poll_next());
327         assert_eq!(v, (2, "hello"));
328     }
329 }
330 
pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>>331 fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
332     Box::pin(s)
333 }
334 
335 type UsizeStream = Pin<Box<dyn Stream<Item = usize> + Send>>;
336 
337 #[tokio::test]
poll_next_many_zero()338 async fn poll_next_many_zero() {
339     let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
340 
341     stream_map.insert(0, Box::pin(pending()) as UsizeStream);
342 
343     let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut vec![], 0)).await;
344 
345     assert_eq!(n, 0);
346 }
347 
348 #[tokio::test]
poll_next_many_empty()349 async fn poll_next_many_empty() {
350     let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
351 
352     let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut vec![], 1)).await;
353 
354     assert_eq!(n, 0);
355 }
356 
357 #[tokio::test]
poll_next_many_pending()358 async fn poll_next_many_pending() {
359     let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
360 
361     stream_map.insert(0, Box::pin(pending()) as UsizeStream);
362 
363     let mut is_pending = false;
364     poll_fn(|cx| {
365         let poll = stream_map.poll_next_many(cx, &mut vec![], 1);
366 
367         is_pending = poll.is_pending();
368 
369         Poll::Ready(())
370     })
371     .await;
372 
373     assert!(is_pending);
374 }
375 
376 #[tokio::test]
poll_next_many_not_enough()377 async fn poll_next_many_not_enough() {
378     let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
379 
380     stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
381     stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
382 
383     let mut buffer = vec![];
384     let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 3)).await;
385 
386     assert_eq!(n, 2);
387     assert_eq!(buffer.len(), 2);
388     assert!(buffer.contains(&(0, 0)));
389     assert!(buffer.contains(&(1, 1)));
390 }
391 
392 #[tokio::test]
poll_next_many_enough()393 async fn poll_next_many_enough() {
394     let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
395 
396     stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
397     stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
398 
399     let mut buffer = vec![];
400     let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 2)).await;
401 
402     assert_eq!(n, 2);
403     assert_eq!(buffer.len(), 2);
404     assert!(buffer.contains(&(0, 0)));
405     assert!(buffer.contains(&(1, 1)));
406 }
407 
408 #[tokio::test]
poll_next_many_correctly_loops_around()409 async fn poll_next_many_correctly_loops_around() {
410     for _ in 0..10 {
411         let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
412 
413         stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
414         stream_map.insert(1, Box::pin(iter([0usize, 1].into_iter())) as UsizeStream);
415         stream_map.insert(2, Box::pin(iter([0usize, 1, 2].into_iter())) as UsizeStream);
416 
417         let mut buffer = vec![];
418 
419         let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 3)).await;
420         assert_eq!(n, 3);
421         assert_eq!(
422             std::mem::take(&mut buffer)
423                 .into_iter()
424                 .map(|(_, v)| v)
425                 .collect::<Vec<_>>(),
426             vec![0, 0, 0]
427         );
428 
429         let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 2)).await;
430         assert_eq!(n, 2);
431         assert_eq!(
432             std::mem::take(&mut buffer)
433                 .into_iter()
434                 .map(|(_, v)| v)
435                 .collect::<Vec<_>>(),
436             vec![1, 1]
437         );
438 
439         let n = poll_fn(|cx| stream_map.poll_next_many(cx, &mut buffer, 1)).await;
440         assert_eq!(n, 1);
441         assert_eq!(
442             std::mem::take(&mut buffer)
443                 .into_iter()
444                 .map(|(_, v)| v)
445                 .collect::<Vec<_>>(),
446             vec![2]
447         );
448     }
449 }
450 
451 #[tokio::test]
next_many_zero()452 async fn next_many_zero() {
453     let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
454 
455     stream_map.insert(0, Box::pin(pending()) as UsizeStream);
456 
457     let n = poll_fn(|cx| pin!(stream_map.next_many(&mut vec![], 0)).poll(cx)).await;
458 
459     assert_eq!(n, 0);
460 }
461 
462 #[tokio::test]
next_many_empty()463 async fn next_many_empty() {
464     let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
465 
466     let n = stream_map.next_many(&mut vec![], 1).await;
467 
468     assert_eq!(n, 0);
469 }
470 
471 #[tokio::test]
next_many_pending()472 async fn next_many_pending() {
473     let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
474 
475     stream_map.insert(0, Box::pin(pending()) as UsizeStream);
476 
477     let mut is_pending = false;
478     poll_fn(|cx| {
479         let poll = pin!(stream_map.next_many(&mut vec![], 1)).poll(cx);
480 
481         is_pending = poll.is_pending();
482 
483         Poll::Ready(())
484     })
485     .await;
486 
487     assert!(is_pending);
488 }
489 
490 #[tokio::test]
next_many_not_enough()491 async fn next_many_not_enough() {
492     let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
493 
494     stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
495     stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
496 
497     let mut buffer = vec![];
498     let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 3)).poll(cx)).await;
499 
500     assert_eq!(n, 2);
501     assert_eq!(buffer.len(), 2);
502     assert!(buffer.contains(&(0, 0)));
503     assert!(buffer.contains(&(1, 1)));
504 }
505 
506 #[tokio::test]
next_many_enough()507 async fn next_many_enough() {
508     let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
509 
510     stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
511     stream_map.insert(1, Box::pin(iter([1usize].into_iter())) as UsizeStream);
512 
513     let mut buffer = vec![];
514     let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 2)).poll(cx)).await;
515 
516     assert_eq!(n, 2);
517     assert_eq!(buffer.len(), 2);
518     assert!(buffer.contains(&(0, 0)));
519     assert!(buffer.contains(&(1, 1)));
520 }
521 
522 #[tokio::test]
next_many_correctly_loops_around()523 async fn next_many_correctly_loops_around() {
524     for _ in 0..10 {
525         let mut stream_map: StreamMap<usize, UsizeStream> = StreamMap::new();
526 
527         stream_map.insert(0, Box::pin(iter([0usize].into_iter())) as UsizeStream);
528         stream_map.insert(1, Box::pin(iter([0usize, 1].into_iter())) as UsizeStream);
529         stream_map.insert(2, Box::pin(iter([0usize, 1, 2].into_iter())) as UsizeStream);
530 
531         let mut buffer = vec![];
532 
533         let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 3)).poll(cx)).await;
534         assert_eq!(n, 3);
535         assert_eq!(
536             std::mem::take(&mut buffer)
537                 .into_iter()
538                 .map(|(_, v)| v)
539                 .collect::<Vec<_>>(),
540             vec![0, 0, 0]
541         );
542 
543         let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 2)).poll(cx)).await;
544         assert_eq!(n, 2);
545         assert_eq!(
546             std::mem::take(&mut buffer)
547                 .into_iter()
548                 .map(|(_, v)| v)
549                 .collect::<Vec<_>>(),
550             vec![1, 1]
551         );
552 
553         let n = poll_fn(|cx| pin!(stream_map.next_many(&mut buffer, 1)).poll(cx)).await;
554         assert_eq!(n, 1);
555         assert_eq!(
556             std::mem::take(&mut buffer)
557                 .into_iter()
558                 .map(|(_, v)| v)
559                 .collect::<Vec<_>>(),
560             vec![2]
561         );
562     }
563 }
564