Efficiently Handling Asynchronous Request-Reply with an In-Memory Queue and MediatR in C#

April 1, 2023 in MediatR Domain Driven Design Architecture
Read Time: 9 minutes
Efficiently Handling Asynchronous Request-Reply with an In-Memory Queue and MediatR in C#

What is the Asynchronous Request-Reply Pattern

Over the years I’ve often needed this pattern, I just didn’t know it had a name. Oftentimes, we’d like to kick off long running process from the user interface to your backend server. The problem is that HTTP isn’t great at handling long-running processes. Case in point, the default timeout on the HttpClient class is 100 seconds.

What I’ve typically done is simply extend the timeout to something ridiculous to ensure that my process won’t time out. It works, but not a great solution. In modern applications, it’s increasingly common to perform time-consuming or resource-intensive tasks in the background, without blocking the main thread or user interface.

Enter the Asynchronous Request-Reply pattern. The Microsoft Article explains this with all the specific details. But in a nutshell:

  • The client kicks off the long-running process.
  • The server kicks off the process asynchronously (fire and forget) and immediately returns to the client
    • That “asynchronously” is doing a lot of heavy lifting in that statement.
    • Most likely a process or job identifier is returned to the client. Or possibly the location of another endpoint to poll for status.
  • The client will then poll the status endpoint periodically.
    • I’ve even seen some fancy implementations where the status message includes an estimated completion time.
  • When the status endpoint responds with completed, the client can then take the appropriate action.
    • Just display ok to the user or go to the destination of the thing it was waiting for.
    • Some APIs may respond with the destination of the new resource via the Location header.

How Do I Implement It?

Let’s get the naive implementation out of the way:

1[HttpPost("/api/longRunning")]
2public IActionResult Start()
3{
4    Task.Run(() => 
5    {
6        myLongRunningProcessHere();
7    });
8    return Ok();
9}

I’m sure, at some point, we’ve all tried something like this and got burned. But just in case you haven’t here’s the problem: Task.Run() will run your process asynchronously (kinda), however, as soon as the controller returns a response ASP.NET will dispose that instance of the controller. If your long-running process relies on anything instantiated during that controller action, things will start to fail spectacularly. The errors may look like this: Error: the object is already disposed.

Consider the example where data needs to be fetched using Entity Framework (EF) Core. In a web application, EF Core is usually wired up to the DI container using a scoped lifetime. That way, the EF instance (along with the controller instance) is disposed after the HTTP request is completed. If a process is still relying on that instance, bad things happen.

What we need is a dedicated process that runs this outside of the scope of the controller. That typically leads us to a background process backed by a queue. The term queue has a lot of different meanings here. Some great examples are: RabbitMQ, AWS SQS, Azure Storage Queues, and Azure Service Bus.

Any of these will fit the scope of the problem, however, they come with their pros and cons.

  • They are great for the scalability of your application. It’s often trivial to have these scale horizontally to handle more load. Plus the cloud options can be configured.
  • They are an excellent option for communication between different systems or services.
  • They are reliable and durable. Once your message makes it into the queue, you’re pretty much guaranteed it will get delivered (at least once). Even if your services fail or were offline. Once things are back up and running, messages will be consumed.
  • Most of them have built-in retry capabilities, if you need it.

However:

  • There is a cost to consider. If you’re using something on-premises there’s labor cost to keep this service up and running. For cloud services, that’s something you can look up. However, those costs can be fuzzy, since they tend to grow with your usage.
  • The biggest con is usually one of complexity. Things to consider include networking, security, message routing, etc.

Even though these systems can solve our problem, they don’t seem very targeted for it. Caveat: I will say, if you’re maintaining a system that already uses a queue service and there is an established usage pattern for it, I would definitely lean on that to solve this problem.

In-Process Queue Service

If we don’t want to bite off the added complexity of an infrastructure-based queue service, we’re left with an in-process queue. This is by no means a silver bullet, but it does fit the Asynchronous Request-Reply Pattern pretty well.

Conceptually, the implementation would look like this:

  • A in-memory queue (the data structure definition)
    • This store handles a FIFO list of tasks or jobs.
  • A background process that listens for entries into the queue, pops them off and runs the job.

C# has queues (of course). We will actually be using Channels, which wrap our queue in a much cleaner and more performant abstraction. You can read more about channels here.

ASP.NET Core gives us a wonderful implementation for Background Services, using … well BackgroundService.

It just so happens the Microsoft documentation has a great example that combines both of these elements to build a Queue Service.

The problem with this example is that it lacks some real-world problems. Since the task queue is only storing a list of Func it seems you could only perform trivial tasks. It does not account for resolving other dependencies or performing real tasks.

When trying to solve this problem for myself, I poked at it for a while, until I realized what I really needed was a way to dispatch these tasks.

Enter MediatR

If you’re unfamiliar with the MediatR library, here’s a good usage example. I won’t be able to do it justice in this post.

I often use this library as a way to dispatch Domain Events in a lot of my projects. So I leveraged them for this problem. Here’s what the modified implementation looks like:

First, we have a BackgroundTaskQueue:

 1public interface IBackgroundTaskQueue
 2{
 3    Task<INotification> DequeueAsync(CancellationToken stoppingToken);
 4    Task QueueTaskAsync(INotification task, CancellationToken stoppingToken = default);
 5}
 6
 7public class BackgroundTaskQueue : IBackgroundTaskQueue
 8{
 9    private readonly Channel<INotification> _queue = Channel.CreateUnbounded<INotification>();
10
11    public async Task QueueTaskAsync(INotification task, CancellationToken stoppingToken = default)
12    {
13        await _queue.Writer.WriteAsync(task);
14    }
15
16    public async Task<INotification> DequeueAsync(CancellationToken stoppingToken)
17    {
18        return await _queue.Reader.ReadAsync(stoppingToken);
19    }
20}

This class allows our consumers to add tasks to the queue. These tasks are simply instances of INotification from MediatR. Most of this is taken straight from the Microsoft Example.

Then we implement the actual background service:

 1/// <summary>
 2/// https://learn.microsoft.com/en-us/dotnet/core/extensions/queue-service
 3/// Using MediatR to dispatch tasks
 4/// </summary>
 5public sealed class QueuedHostedService : BackgroundService
 6{
 7    private readonly IBackgroundTaskQueue _queue;
 8    private readonly ILogger<QueuedHostedService> _logger;
 9    private readonly IServiceScopeFactory _scopeFactory;
10
11    public QueuedHostedService(ILogger<QueuedHostedService> logger,
12        IServiceScopeFactory scopeFactory, IBackgroundTaskQueue queue)
13    {
14        _logger = logger;
15        _scopeFactory = scopeFactory;
16        _queue = queue;
17    }
18
19    protected override Task ExecuteAsync(CancellationToken stoppingToken)
20    {
21        _logger.LogInformation($"{nameof(QueuedHostedService)} is running.");
22        return ProcessTaskQueueAsync(stoppingToken);
23    }
24
25    private async Task ProcessTaskQueueAsync(CancellationToken stoppingToken)
26    {
27        while (!stoppingToken.IsCancellationRequested)
28        {
29            try
30            {
31                _logger.LogInformation("Waiting for new queue message.");
32                var backgroundTask = await _queue.DequeueAsync(stoppingToken);
33
34                using var scope = _scopeFactory.CreateScope();
35                var publisher = scope.ServiceProvider.GetRequiredService<IPublisher>();
36
37                _logger.LogInformation("Running task {TaskType}", backgroundTask.GetType());
38                await publisher.Publish(backgroundTask, stoppingToken);
39                _logger.LogInformation("Completed task {TaskType}", backgroundTask.GetType());
40            }
41            catch (OperationCanceledException)
42            {
43                // Prevent throwing if stoppingToken was signaled
44            }
45            catch (Exception ex)
46            {
47                _logger.LogError(ex, "Error occurred executing task work item.");
48            }
49        }
50    }
51
52    public override async Task StopAsync(CancellationToken stoppingToken)
53    {
54        _logger.LogInformation($"{nameof(QueuedHostedService)} is stopping.");
55        await base.StopAsync(stoppingToken);
56    }
57}

The great thing about IBackgroundTaskQueue using Channel<T> under the hood, is that the call the DequeueTask on line 32, simply awaits until there is something put in the queue. We are not doing long pooling, there’s no Thread.Sleep() or await Task.Delay(...) anywhere here. The instant something is added to the queue, the background service wakes up and starts processing.

On line 35, we’re resolving an instance of IPublisher, which MediatR uses for publishing notifications. Once a notification is published, any handlers will be automatically resolved by MediatR and they’ll run.

In your Program.cs we wire these into the service collection as follows:

1builder.Services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
2builder.Services.AddHostedService<QueuedHostedService>();

An example handler:

 1public record MyLongRunningJob(Guid JobId) : INotification;
 2
 3public class MyLongRunningJobHandler : INotificationHandler<Ping>
 4{
 5    private readonly IStatusService _statusService;
 6    // Can inject any required dependencies here. They will be scoped to this instance.
 7    // Note they will not have access to HttpContext since this runs in the background.
 8    public MyLongRunningJobHandler(IStatusService statusService) 
 9    {
10        _statusService = statusService;
11    }
12
13    public async Task Handle(MyLongRunningJob notification, CancellationToken cancellationToken)
14    {
15        _statusService.SetStatus(notification.JobId, "Running");
16        Debug.WriteLine($"Running job: {notification.JobId}");
17        await myLongRunningProcessHere();
18        _statusService.SetStatus(notification.JobId, "Done");
19    }
20}

Now to refactor your controller method from above:

 1[HttpPost("/api/longRunning")]
 2public Task<IActionResult> Start()
 3{
 4    var jobId = Guid.NewGuid();
 5    await _backgroundTaskQueue.QueueTaskAsync(new MyLongRunningJob(jobId));
 6    return Ok(jobId);
 7}
 8
 9[HttpGet(/api/longRunning/status/{id:guid})]
10public IActionResult GetStatus(Guid id)
11{
12    var status = _statusService.GetStatus(id);
13    return Ok(status);
14}

And there you have it.

Conclusion

The Asynchronous Request-Reply (ARR) pattern can be challenging to implement without over-engineering and over-complicating. External, distributed queue mechanisms aren’t always needed. An efficient and cost-effective solution can be whipped up using very few dependencies. In the project I was working on, we already included MediatR, so it was a win-win. Not only can we solve the ARR issue, but we now have a framework for queuing other background tasks:

  • In a system that implements DDD with Domain Events, some domain event handlers could queue their own background tasks. One example would be to a domain event handlers for updates to Product. This could queue a task to generate a materialized view for use in the read model or projection for Product. It could be a denormalized version called ProductSummary that includes, NumberOfReviews, AverageReviewRating, NumberOfPurchases, etc.
  • Background thumbnail generation when an image is uploaded.
  • Triggering background rebuilding of some large cache.

This solution also lays a great foundation for extension in the future, if your needs change.

  • IBackgroundTaskQueue could be extended to persist the tasks when queued. That way, the queue can be recovered from storage in the event of an application failure.
  • IBackgroundTaskQueue could even delegate to an actual distributed queue service.

As with all cases of extension, be careful that you’re not reinventing the wheel. If the problem set changes drastically, it might be time to reevaluate from the start.

Now I’d be doing a disservice if I didn’t mention Hangfire as an option here. However, for my needs, it was a bit overkill. But it’s also a decent candidate for assisting with AAR.

Do let me know if you have found other interesting uses for an in-process queue.

Comments

comments powered by Disqus