Streams

Futures are about a single value that will eventually be produced, but many event sources naturally produce a stream of values over time.

use futures::stream;
use futures::stream::StreamExt;
use futures::Stream;

async fn count_to_five() -> impl Stream<Item = u32> {
    stream::iter(1..=5)
}

#[tokio::main]
async fn main() {
    let mut stream = count_to_five().await;
    // `for` loops are not usable with Streams, but for imperative-style
    // code, `while let` and the `next`/`try_next` functions can be
    // used:
    while let Some(num) = stream.next().await {
        println!("{}", num);
    }
}

There are combinator-style methods such as map, filter, and fold, and their early-exit-on-error cousins try_map, try_filter, and try_fold.

To process multiple items from a stream concurrently, use the for_each_concurrent and try_for_each_concurrent methods:

use futures::StreamExt;
use tokio::fs::File;
use tokio::io;

type Result = std::result::Result<(), Box<dyn std::error::Error>>;

async fn download_file(url: &str, filename: &str) -> Result {
    let response = reqwest::get(url).await?;
    let content = response.bytes().await?;
    let mut file = File::create(filename).await?;
    io::copy(&mut content.as_ref(), &mut file).await?;
    Ok(())
}

#[tokio::main]
async fn main() -> Result {
    let urls = ["https://www.gutenberg.org/cache/epub/43/pg43.txt"]; // add more here...
    let filenames = ["file1.txt"]; // add more here...

    let futures = urls
        .iter()
        .zip(filenames.iter())
        .map(|(url, filename)| download_file(url, filename));

    let fut = futures::stream::iter(futures).for_each_concurrent(
        4,
        |fut| async move {
            if let Err(e) = fut.await {
                println!("Error: {}", e);
                if let Some(source) = e.source() {
                    println!("  Caused by: {}", source);
                }
            }
        },
    );

    fut.await;

    println!("Downloaded files successfully!");
    Ok(())
}

See also Tokio async-stream (GitHub)async-stream-badge.