Blocking XREAD From A Redis Stream Using StackExchange.Redis

By Mike Hadlow, published Feb 18, 2022

The StackExchange.Redis NuGet package is the most popular .NET client for Redis. It’s stable, high-performance, with a great community and wide usage. One of it’s most interesting features is the way it does network multiplexing. This allows multi-threaded applications to use a single TCP/IP connection without blocking. Unfortunately this also means that the library does not support blocking operations such as XREAD BLOCK, which is important if you want to leverage Redis Streams on .NET. This post offers a work-around for this.

From the StackExchange.Redis Multiplexing documentation

… For this reason, the only redis features that StackExchange.Redis does not offer (and will not ever offer) are the “blocking pops” (BLPOP, BRPOP and BRPOPLPUSH) - because this would allow a single caller to stall the entire multiplexer, blocking all other callers.

Redis Streams have been available since Redis version 5.0. They implement an append only log. One of the key features is the ability for clients to listen for new stream entries in a way analogous to a queue subscriber in a messaging system. This is implemented with the XREAD BLOCK command. The XREAD command can also be used without BLOCK, so an alternative is to simply poll the server, but this introduces a polling interval latency that’s unacceptable with high throughput systems.

The lack of XREAD provision for StackExchange.Redis has lead to much discussion on Stack Overflow and GitHub; see here and here for example. There has been a task to rewrite the internals to support blocking operations since 2018, but it is still open. I expect that until Stack Overflow themselves find a need for blocking operations we’re unlikely to see it, especially now that the author, the hugely prolific, Marc Gravell (who someone really should pay just to work on open source) has moved onto Microsoft.

My colleague Marc Stedman has been experimenting with the low level StackExchange.Redis ExecuteAync method to issue the raw XREAD BLOCK command to the server. Of course because of the multiplexer this will block all other operations, so it’s important to run it on a dedicated connection. Marc Gravell also says this on this thread:

If you use the raw commands API to execute blocking operations, you should expect bad things to happen. For example, from the connection’s perspective, it won’t be responding to periodic heartbeat health checks, so it will sever the connection every so often.

However, if we keep the blocking period short, say 500ms, then there’s less chance of timeouts. There’s obviously a trade off between a shorter interval pummeling the server with read requests, especially if you have lots of clients. In our experiments we’ve not seen any issues so far.

The other (minor) disadvantage of using the raw ExecuteAsync command is that the stream entries are returned as a nested RedisResult tree, but it’s straightforward to decode using the XREAD documentation.

Below is an example implementation that you can modify for your own use:

using StackExchange.Redis;
using static System.Console;

public static class BlockingReader
{
    public static async Task Listen(
        string connection, 
        string streamName, 
        CancellationToken cancellation,
        Action<Entry> handler)
    {
        // The blocking reader's connection should not be shared with any other operation.
        var redis = ConnectionMultiplexer.Connect(connection);
        if(redis is null)
        {
            WriteLine($"Connection to {connection} failed");
            return;
        }
        WriteLine($"Started consuming from stream {streamName}");

        try
        {
            var db = redis.GetDatabase();

            var currentId = "$"; // listen for new messages
            while(!cancellation.IsCancellationRequested)
            {
                var arguments = new List<object>
                {
                    "BLOCK",
                    "500",
                    "STREAMS",
                    streamName,
                    currentId
                };

                // ExecuteAsync does not take a CancellationToken, so we have to wait the block time
                // before resonding to a cancellation request.
                var result = await db.ExecuteAsync("XREAD", arguments).ConfigureAwait(false);

                if(!result.IsNull)
                {
                    // should only be a single result if querying a single stream
                    foreach (RedisResult[] subresults in (RedisResult[])result)
                    {
                        var name = (RedisValue)subresults[0];
                        foreach(RedisResult[] messages in (RedisResult[])subresults[1])
                        {
                            var id = (RedisValue)messages[0];
                            currentId = id;

                            var nameValuePairs = (RedisResult[])messages[1];
                            var pairs = new Pair[nameValuePairs.Length/2];

                            for(var i = 0; i < nameValuePairs.Length; i+=2)
                            {
                                pairs[i / 2] = new Pair((RedisValue)nameValuePairs[i], (RedisValue)nameValuePairs[i + 1]);
                            }

                            var entry = new Entry(name, id, pairs);
                            handler(entry);
                        }
                    }
                }
            }
        }
        catch (TaskCanceledException) { }
        catch (Exception ex)
        {
            WriteLine(ex.ToString());
        }
        finally
        {
            WriteLine($"Stopped consuming from stream {streamName}");
        }
    }
}

public record Entry(RedisValue StreamName, RedisValue Id, Pair[] Values);

public record Pair(RedisValue Name, RedisValue Value);

You can use this method like this:

var readerTask = BlockingReader.Listen(
    "localhost", 
    "my-stream", 
    cts.Token,
    entry => 
    {
        WriteLine($"\tId: {entry.Id}");
        foreach(var pair in entry.Values)
        {
            WriteLine($"\t\t{pair.Name}: {pair.Value}");
        }
    });

I’ve also shared a worked example on GitHub, RedisStreamReader, of a simple console application that publishes to a stream and uses the method above to listen for new messages.

Discuss on Reddit

Hi, I’m Mike Hadlow. Software developer, architect, blogger and open source developer.

Find my old blog at Code Rant. This ran from 2005 to 2020 and has hundreds of posts.

All code on this blog is published under an MIT licence. You are free to copy it and use it for any purpose without attribution. There is no warranty whatsoever. All non-code text is copyright Mike Hadlow and cannot be reused without permission.

There are no cookies on this site

The GitHub repository for this site is here.