W poprzednim wpisie przedstawiłem garść informacji na temat CQRS. Post był mocno teoretyczny, praktycznie bez linijki kodu. Pod koniec wpisu powiedziałem, że zacznę od najprostszej z możliwych implementacji i wykorzystam tylko jedną bazę danych, ale żeby było to możliwe to potrzebny jest szkielet na którym przykład zostanie oparty. Przed jego implementacją, pokażę bardzo prosty schemat.

cqrs-scheme

Powyższy obrazek przedstawia sposób działania od strony aktora, który konsumuje API aplikacji. W oczy rzuca się podział przepływu informacji na Commandy i Queriesy, ale wygląda on bardzo podobnie. Aktor wysyła odpowiednio komendę lub kwerendę, które to stanowią źródło informacji na temat akcji, którą należy wykonać i zawierają dodatkowe parametry potrzebne do ich wywołania. Kolejno, wywołania transportowane są przez szynę prosto do Handlera, który stanowi kluczowy element całego procesu. W nim następuje wykonanie zadania - zapisanie stanu lub odczytanie stanu z bazy danych - a następnie zwrócenie wyniku tą samą drogą. Diagram przepływu dla strony odczytowej przedstawiłem poniżej. Analogicznie wygląda też przepływ strony zapisowej, dlatego go pominąłem.

cqrs-flow

Ciekawym i przydatnym elementem, choć w gruncie rzeczy opcjonalnym, jest szyna. Komponent ten udostępnia wspólny interfejs dla wszystkich komend lub kwerend i pośredniczy w komunikacji pomiędzy nimi oraz ich Handlerami. Bus stanowi dodatkową warstwę abstrakcji, która zwalnia element wywołujący żądanie z obowiązku znajomości miejsca do którego musi je dostarczyć. Element wywołujący żądanie, przy pomocy jednorodnego interfejsu w każdym miejscu aplikacji, potrafi komunikować tylko z szyną, która zna wszystkie Handlery i potrafi przekazać do nich wywołanie. Innymi słowami jest to wykorzystanie wzorca behawioralnego Mediator[1]. Rezygnacja z mediatora wprowadzi konieczność jawnego wskazywania elementu, który ma obsłużyć żądanie, co nie tylko utrudni to jego użycie, ale również wprowadza konieczność pilnowania aby dla komendy lub kwerendy nie zostały zaimplementowany więcej niż jeden handler.

Implementacja wzorca w praktyce jest prawie tak prosta jak schemat. Wykorzystując interfejsy, generyczność i wstrzykiwanie zależności już w kilku klasach można zbudować szkielet, który zapewni przedstawioną funkcjonalność.

Implementacja Commandów

Komendy są prostsze do implementacji, więc od nich zacznę. Aby je rozpoznać wprowadzę marker - zwyczajny pusty interfejs. Oznaczań nim będę wszystkie komendy dostępne w aplikacji.

public interface ICommand
{
}

Każda komenda obsługiwana będzie przez jeden i tylko jeden handler, który implementuje metodę HandleAsync. Zgodnie z podziałem przedstawionym w CQS, komenda nie zwraca żadnego rezultatu. W sieci można znaleźć sporo dyskusji czy aby na pewno komendy nie powinny nic zwracać. Jest to temat dość kontrowersyjny, którym zajmę się w oddzielnym wpisie i tam wyrażę swoją opinię na ten temat.

public interface ICommandHandler<in TCommand>
    where TCommand : ICommand
{
    Task HandleAsync(TCommand command, CancellationToken cancellationToken);
}

Elementem spinającym stronę zapisową jest szyna, która pozwala na przesłanie komendy do jej Handlera. Przechodząc do faktycznej implementacji, szyna ma odnaleźć odbiorcę i wywołać na nim jedyną metodę którą implementuje. W swoim konstruktorze przyjmuje ona delegat, który pozwala jej na utworzenie instancji dla wskazanego typu żądania. W całej implementacji nie ma szczególnej magii. Jedyną sztuczką jest utworzenie typu generycznego, który jest potrzebny do wyszukania znalezienia odbiorcy.

public interface ICommandBus
{
    Task SendAsync<TCommand>(TCommand command)
        where TCommand : ICommand;
}

public class CommandBus : ICommandBus
{
    private readonly Func<Type, object> _handlersResolver;

    public CommandBus(Func<Type, object> typeResolver)
    {
        _handlersResolver = typeResolver;
    }

    public async Task SendAsync<TCommand>(TCommand command) where TCommand : ICommand
    {
        var handler = GetHandler(command);
        if (handler == null)
            throw new HandlerNotFoundException();
        await handler.HandleAsync(command);
    }
    private ICommandHandler<TCommand> GetHandler<TCommand>(TCommand command)
        where TCommand : ICommand
    {
        var handlerType = typeof(ICommandHandler<>).MakeGenericType(command.GetType());
        return (ICommandHandler<TCommand>) _handlersResolver.Invoke(handlerType);
    }
}

Implementacja Queriesów

Implementacja Queriesów jest bardziej wymagająca z uwagi na fakt, że zwracają typy różne od pustego Taska. Typy te określane są w definicjach kwerend. Poza tym drobnym detalem, markery nie różnią się znacząco od przedstawionych wcześniej markerów komend. Prawdziwą różnicę widać będzie dopiero podczas implementacji szyny, ale wszystko w swoim czasie.

public interface IQuery<out TResult>
{
}

public interface IQueryHandler<in TQuery, TResult>
    where TQuery : IQuery<TResult>
{
    Task<TResult> HandleAsync(TQuery query, CancellationToken cancellationToken = default);
}

public interface IQueryBus
{
    Task<TResult> SendAsync<TResult>(IQuery<TResult> query, CancellationToken cancellationToken = default);
}

Zgodnie z tym co wspomniałem wcześniej, szyna do transportu kwerend jest trochę bardziej skomplikowana. Aby była w stanie realizować swoje zadanie, a jednocześnie wystawić interfejs wygodny użyciu, jej implementacja wymaga zastosowania dwóch sztuczek. Problemem, który trzeba zaadresować jest sposób określenia typu kwerendy oraz typu, który zwróci. Sztuczka polega na wykorzystaniu dwóch dodatkowych klas do rozwiązania tych dwóch problemów. Po jednej klasie na każdy z problemów. Inspirację zaczerpnąłem z biblioteki MediatR [2].

internal abstract class QueryBusInternal<TResult>
{
    private readonly Func<Type, object> _handlersResolver;

    protected QueryBusInternal(Func<Type, object> typeResolver)
    {
        _handlersResolver = typeResolver;
    }
    
    protected THandler GetHandler<THandler>()
    {
        var handler = _handlersResolver.Invoke(typeof(THandler));
        if (handler == null)
            throw new HandlerNotFoundException();
        return (THandler)handler;
    }
    
    public abstract Task<TResult> SendAsync(IQuery<TResult> query);
}

internal class QueryBusInternalImpl<TQuery, TResult> : QueryBusInternal<TResult>
    where TQuery : IQuery<TResult>
{
    public QueryBusInternalImpl(Func<Type, object> handler) : base(handler)
    {}
    
    public override async Task<TResult> SendAsync(IQuery<TResult> query)
    {
       return await GetHandler<IQueryHandler<TQuery, TResult>>()
            .HandleAsync((TQuery) query);
    }
}

Z gotowymi wrapperami reszta jest już prosta.

public class QueryBus : IQueryBus
{
    private readonly Func<Type, object> _handler
    public QueryBus(Func<Type, object> handler)
    {
        _handler = handler;
    }
    
    public async Task<TResult> SendAsync<TResult>(IQuery<TResult> query)
    {
        var queryBusInternal = (QueryBusInternal<TResult>) Activator.CreateInstance(
            typeof(QueryBusInternalImpl<,>).MakeGenericType(query.GetType(), typeof(TResult)), _handler);
        return await queryBusInternal.SendAsync(query);
    }
}

Odnajdywanie Handlerów przez kontener zależności

Jak wspomniałem, Handlery pobierane będą przez kontener zależności. Jest to wyjątkowo proste i sprowadza się do przekazania odpowiedniej funkcji podczas rejestracji szyn. Poniższy przykład wykorzystuje implementację wbudowaną we framework.

private static void ComposeBuses(IServiceCollection serviceCollection)
{
    serviceCollection.AddScoped<ICommandBus>(serviceProvider => new CommandBus(serviceProvider.GetService));
    serviceCollection.AddScoped<IQueryBus>(serviceProvider => new QueryBus(serviceProvider.GetService));
}

Wszystkie klasy dostępne w ramach danego assembly mogą zostać zarejestrowane manualnie bądź też automatycznie, przy pomocy wcześniej przygotowanych markerów. Aby zrobić to raz i o tym zapomnieć, wykorzystałem rejestrację automatyczną. Z pomocą metod z biblioteki Scrutor[3] jest to niebywale proste.

private static void ComposeHandlers(IServiceCollection serviceCollection) =>
    serviceCollection.Scan(scan => scan
        .FromAssemblyOf<IApplicationAssemblyMarker>()
        .AddClasses(classes => classes.AssignableTo(typeof(ICommandHandler<>)))
        .AsImplementedInterfaces()
        .WithScopedLifetime()
        .AddClasses(classes => classes.AssignableTo(typeof(IQueryHandler<,>)))
        .AsImplementedInterfaces()
        .WithScopedLifetime()
    );

Rejestracja była ostatnim elementem, który łączy wszystkie komponenty ze sobą. Wykorzystując powyższy kod można w wygodny sposób wywoływać komendy i kwerendy z dowolnego miejsca wykorzystując wstrzykiwanie zależności. Wystarczy wskazać odpowiedni element przez konstruktor i reszta zadziała już sama.

Czy implementację szyny odczytowej można uprościć?

Tak, alternatywnym podejściem, jest zmiana interfejsu szyny co pozwoli wykorzystać implementację analogiczną do tej w komendach. Liczba klas może wtedy zostać zredukowana do jednej, jednakże nie obejdzie się bez konsekwencji. Koszt który trzeba wtedy ponieść, to interfejs znacznie mniej przyjemny w użyciu. Dzieje się tak ponieważ na podstawie samej kwerendy nie będzie możliwości określenia typu zwracanego.

public class SampleQuery : IQuery<string>
{
}

var sampleQuery = new SampleQuery();

// Wywołanie szyny z "przyjaznym" interfejsem
var queryResult = await _queryBus.SendAsync(sampleQuery);

// Wywołanie szyny z prostszą implementacją
var queryResult = await _queryBus.SendAsync<SampleQuery, string>(sampleQuery);

Prawda, że pierwszy sposób jest znacznie prostszy? Mimo wszystko pokażę jak wygląda taka implementacja, sam jednak zostanę przy pierwszej. Uważam, że skoro implementacja szyny jest tylko w jednym miejscu a zmian raczej nie będzie wiele to większe znaczenie wygodny interfejs, który wykorzystywany będzie wszędzie. Tym bardziej, że z pomocą automagicznych mechanizmów wszystko dzieje się samo po rejestracji w kontenerze i dosłownie można o tym zapomnieć.

  public interface IQueryBus
  {
    Task<TResult> SendAsync<TQuery, TResult>(TQuery query)
          where TQuery : IQuery<TResult>;
  }

public class QueryBus : IQueryBus
{
    private readonly Func<Type, object> _handler;

    public QueryBus(Func<Type, object> handler)
    {
        _handler = handler;
    }

    private dynamic GetHandler<TResult>(IQuery<TResult> query)
    {
        Type handlerType = typeof(IQueryHandler<,>).MakeGenericType(query.GetType(), typeof(TResult));
        return _handler.Invoke(handlerType);
    }
    
    public async Task<TResult> SendAsync<TQuery, TResult>(TQuery query) where TQuery : IQuery<TResult>
    {
        dynamic handler = GetHandler(query);
        if (handler == null)
            throw new HandlerNotFoundException();
        return await handler.HandleAsync(query);        
    }
}

Po co trzymać każdy Handler w osobnej klasie, przecież wystarczy tylko zaimplementować odpowiednie interfejsy.

Trzymanie Handlerów w osobnych klasach jest wygodne z kilku powodów:

  • Handlery bardzo często są elementem spinającym wiele różnych komponentów i wstrzykiwane są do nich rozmaite serwisy. Przetrzymywanie kilku w jednej klasie może spowodować, że konstruktor będzie wymagał przekazania wielu parametrów. Utrudni testowanie jednostkowe.
  • Wydzielenie Handlerów do różnych klas sprzyja zachowaniu SRP - jedna klasa jedna odpowiedzialność
  • Częstym wymaganiem jest dodanie logowania, tracingu lub innych elementów monitorujących aplikację w każdego wywołania. Taki podział umożliwia wykorzystanie podejścia aspektowego w prosty sposób przez klasy bazowe czy dekoratory.

Tymi przemyśleniami zakończę wpis. W kolejnym wpisie, mając już gotowy szkielet, przedstawię sposób jego użycia w prostej aplikacji.

[1] Erich Gamma, Richard Helm, Ralph Johnson, John Vlissides (1994), Design Patterns: Elements of Reusable Object-Oriented Software. ISBN 0201633612 (1994 ed)
[2] MediatR, [kod źródłowy]. Źródło: https://github.com/jbogard/MediatR
[3] Scrutor, [biblioteka]. Źródło: https://github.com/khellang/Scrutor
[4] Aspect-oriented programming, [artykuł]. Źródło: https://en.wikipedia.org/wiki/Aspect-oriented_programming
[5] Command and Query Responsibility Segregation (CQRS) pattern, [artykuł]. Źródło: https://docs.microsoft.com/en-US/azure/architecture/patterns/cqrs
Zdjęcie : https://pl.freeimages.com/photo/rusty-rails-1505864

Zostaw komentarz