Step by step examples to teach you Async Stream.

Learn Async Stream implementation via these examples.

1. tokio-rs/async-stream

Asynchronous streams for Rust using async & await notation.

Asynchronous stream of elements.

Provides two macros, stream! and try_stream!, allowing the caller to define asynchronous streams of elements. These are implemented using async & await notation. This crate works without unstable features.

The stream! macro returns an anonymous type implementing the [Stream](https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html) trait. The Item associated type is the type of the values yielded from the stream. The try_stream! also returns an anonymous type implementing the [Stream](https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html) trait, but the Item associated type is Result<T, Error>. The try_stream! macro supports using ? notation as part of the implementation.
Stream
Stream

Usage

A basic stream yielding numbers. Values are yielded using the yield keyword. The stream block must return ().

use async_stream::stream;

use futures_util::pin_mut;
use futures_util::stream::StreamExt;

#[tokio::main]
async fn main() {
    let s = stream! {
        for i in 0..3 {
            yield i;
        }
    };

    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

Streams may be returned by using impl Stream<Item = T>:

use async_stream::stream;

use futures_core::stream::Stream;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;

fn zero_to_three() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

#[tokio::main]
async fn main() {
    let s = zero_to_three();
    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

Streams may be implemented in terms of other streams - async-stream provides for await syntax to assist with this:

use async_stream::stream;

use futures_core::stream::Stream;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;

fn zero_to_three() -> impl Stream<Item = u32> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

fn double<S: Stream<Item = u32>>(input: S)
    -> impl Stream<Item = u32>
{
    stream! {
        for await value in input {
            yield value * 2;
        }
    }
}

#[tokio::main]
async fn main() {
    let s = double(zero_to_three());
    pin_mut!(s); // needed for iteration

    while let Some(value) = s.next().await {
        println!("got {}", value);
    }
}

Rust try notation (?) can be used with the try_stream! macro. The Item of the returned stream is Result with Ok being the value yielded and Err the error type returned by ?.

use tokio::net::{TcpListener, TcpStream};

use async_stream::try_stream;
use futures_core::stream::Stream;

use std::io;
use std::net::SocketAddr;

fn bind_and_accept(addr: SocketAddr)
    -> impl Stream<Item = io::Result<TcpStream>>
{
    try_stream! {
        let mut listener = TcpListener::bind(addr).await?;

        loop {
            let (stream, addr) = listener.accept().await?;
            println!("received on {:?}", addr);
            yield stream;
        }
    }
}

Implementation

The stream! and try_stream! macros are implemented using proc macros. The macro searches the syntax tree for instances of sender.send($expr) and transforms them into sender.send($expr).await.

The stream uses a lightweight sender to send values from the stream implementation to the caller. When entering the stream, an Option<T> is stored on the stack. A pointer to the cell is stored in a thread local and poll is called on the async block. When poll returns. sender.send(value) stores the value that cell and yields back to the caller.

Supported Rust Versions

async-stream is built against the latest stable release. The minimum supported version is 1.45 due to function-like procedural macros in expression, pattern, and statement positions.

This project is licensed under the MIT license.

Full Example

Let us look at a full Async Stream Example.

Step 1. Write Code

Finally we need to write our code as follows:

(a). tcp_accept.rs

use async_stream::stream;
use futures_util::pin_mut;
use futures_util::stream::StreamExt;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();

    let incoming = stream! {
        loop {
            let (socket, _) = listener.accept().await.unwrap();
            yield socket;
        }
    };
    pin_mut!(incoming);

    while let Some(v) = incoming.next().await {
        println!("handle = {:?}", v);
    }
}

Reference

Download the code below:

No. Link
1. Download Full Code
2. Read more here.
3. Follow code author here.

Read More.


More

Here are some more examples related to Async Stream.

Categorized in: