22

Getting rid of the slow Masstransit test harness

 1 year ago
source link: https://andersmalmgren.com/2022/11/23/getting-rid-of-the-slow-masstransit-test-harness/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Getting rid of the slow Masstransit test harness

I wrote a blog about replacing the timeout based test harness with a semaphore driven one here. This made things much more robust when you want blackbox type testing, fire a number of events and wait until all have been digested and their spawned child events are digested.

This worked well and robust. But it still used the Masstransit harness for hosting. This made the InMemory bus more than twice as slow as hosting Masstransit in a service, including database I/O so probably a lot slower when only looking at bus performance.

But it’s pretty easy hosting Masstransit from a none service project like a test project. Instead of configuring with AddMassTransitTestHarness use the standard AddMasstransit extension method. Now events will not be consumed when you publish them, this is because the IHostedService haven’t been started. So that’s an easy fix. If we base the code on the IHarness from my previous blog post.

public Harness(IEnumerable<IHostedService> services)
{
_services = services;
}
public async Task Start()
{
var source = new CancellationTokenSource();
foreach (var service in _services)
await service.StartAsync(source.Token);
}
public async Task Stop()
{
var source = new CancellationTokenSource();
foreach (var service in _services)
await service.StopAsync(source.Token);
}

Call Start from your test setup and stop from your test teardown. This will start the background workers for Masstransit and make sure it listens and consumes events. The service will not work unless you add logging to your IoC config.

new ServiceCollection()
.AddLogging();

Coupled with the harness-code from previous blog post you now have a very robust and fast test harness. Full code below

public static class HarnessSetup
{
public static void ConfigureTestHarness(this IInMemoryBusFactoryConfigurator bus, IBusRegistrationContext ctx)
{
bus.UseConsumeFilter(typeof(ConsumeFilter<>), ctx);
bus.UsePublishFilter(typeof(PublishFilter<>), ctx);
bus.ConnectPublishObserver(new PublishObserver());
}
public static IServiceCollection AddTestHarness(this IServiceCollection services)
{
services.AddSingleton<IHarness, Harness>();
return services;
}
private class PublishObserver : IPublishObserver
{
public static readonly ConcurrentDictionary<object, Action<Guid>> PublishHandlers = new();
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
if (!PublishHandlers.ContainsKey(context.Message))
{
return Task.CompletedTask;
}
PublishHandlers[context.Message](context.MessageId!.Value);
while (!PublishHandlers.Remove(context.Message, out _)) { }
return Task.CompletedTask;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
}
private class PublishFilter<TMessage> : IFilter<PublishContext<TMessage>> where TMessage : class
{
private readonly IHarness _harness;
public PublishFilter(IHarness harness)
{
_harness = harness;
}
public Task Send(PublishContext<TMessage> context, IPipe<PublishContext<TMessage>> next)
{
PublishObserver.PublishHandlers[context.Message] = _harness.MessagePublished;
return Task.CompletedTask;
}
public void Probe(ProbeContext context)
{
}
}
private class ConsumeFilter<TMessage> : IFilter<ConsumeContext<TMessage>> where TMessage : class
{
private readonly IHarness _harness;
public ConsumeFilter(IHarness harness)
{
_harness = harness;
}
public async Task Send(ConsumeContext<TMessage> context, IPipe<ConsumeContext<TMessage>> next)
{
try
{
await next.Send(context);
}
catch (Exception e)
{
_harness.Throw(e);
return;
}
_harness.ConsumeMessage(context.MessageId!.Value);
}
public void Probe(ProbeContext context)
{
}
}
}
public interface IHarness
{
Task Start();
Task Stop();
Task WaitForBus();
void ConsumeMessage(Guid message);
void MessagePublished(Guid message);
void Throw(Exception exception);
IEnumerable<OutboxMessage> PopScheduledMessages();
}
public class Harness : IHarness
{
private readonly IEnumerable<IHostedService> _services;
private readonly ConcurrentDictionary<Guid, int> _publishedMessages  = new ();
private readonly ConcurrentBag<OutboxMessage> _scheduledMessage = new();
private readonly SemaphoreSlim _semaphore = new(0);
public Harness(IEnumerable<IHostedService> services)
{
_services = services;
}
public async Task Start()
{
var source = new CancellationTokenSource();
foreach (var service in _services)
await service.StartAsync(source.Token);
}
public async Task Stop()
{
var source = new CancellationTokenSource();
foreach (var service in _services)
await service.StopAsync(source.Token);
}
public void MessagePublished(Guid message) => _publishedMessages[message] = 1;
public void ConsumeMessage(Guid message)
{
while(!_publishedMessages.TryRemove(message, out _)) {}
_semaphore.Release();
}
private Exception? _exception;
public void Throw(Exception exception)
{
_publishedMessages.Clear();
_exception = exception;
_semaphore.Release();
}
public IEnumerable<OutboxMessage> PopScheduledMessages()
{
var result = _scheduledMessage.ToList();
_scheduledMessage.Clear();
return result;
}
public async Task WaitForBus()
{
do
{
await _semaphore.WaitAsync();
}
while (_publishedMessages.Count != 0);
if (_exception != null) throw _exception;
}
}

Posted in Continuous delivery, Testing and tagged C#, Event driven design, Integration tests, Masstransit, Testing on November 23, 2022. Leave a comment


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK