Function futures::prelude::stream::select_with_strategy
source · pub fn select_with_strategy<St1, St2, Clos, State>(
stream1: St1,
stream2: St2,
which: Clos
) -> SelectWithStrategy<St1, St2, Clos, State>where
St1: Stream,
St2: Stream<Item = <St1 as Stream>::Item>,
Clos: FnMut(&mut State) -> PollNext,
State: Default,
Expand description
This function will attempt to pull items from both streams. You provide a
closure to tell SelectWithStrategy
which stream to poll. The closure can
store state on SelectWithStrategy
to which it will receive a &mut
on every
invocation. This allows basing the strategy on prior choices.
After one of the two input streams completes, the remaining one will be polled exclusively. The returned stream completes when both input streams have completed.
Note that this function consumes both streams and returns a wrapped version of them.
Examples
Priority
This example shows how to always prioritize the left stream.
use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
let left = repeat(1);
let right = repeat(2);
// We don't need any state, so let's make it an empty tuple.
// We must provide some type here, as there is no way for the compiler
// to infer it. As we don't need to capture variables, we can just
// use a function pointer instead of a closure.
fn prio_left(_: &mut ()) -> PollNext { PollNext::Left }
let mut out = select_with_strategy(left, right, prio_left);
for _ in 0..100 {
// Whenever we poll out, we will alwas get `1`.
assert_eq!(1, out.select_next_some().await);
}
Round Robin
This example shows how to select from both streams round robin.
Note: this special case is provided by [futures-util::stream::select
].
use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };
let left = repeat(1);
let right = repeat(2);
let rrobin = |last: &mut PollNext| last.toggle();
let mut out = select_with_strategy(left, right, rrobin);
for _ in 0..100 {
// We should be alternating now.
assert_eq!(1, out.select_next_some().await);
assert_eq!(2, out.select_next_some().await);
}