W poprzednim wpisie pokazałem podstawowe pułapki, które czekają na nas gdy komunikujemy się asynchronicznie między systemami. Przedstawiłem również fatalne rezultaty przy naiwnej implementacji. Zaproponowałem alternatywne podejście, które sprytnie wykorzystuje gwarancje oferowane z pudełka i przechyla szalę na naszą korzyść. Rozdzielenie części nadawczej od części odbiorczej pozwala stworzyć iluzję transakcji w komunikacji asynchronicznej. Po bardziej szczegółowy opis zapraszam do poprzedniego posta ¨

Jakie problemy trzeba rozwiązać?

Pierwszym krokiem potrzebnym do stworzenia wspomnianej iluzji jest zapewnienie, że wiadomość zostanie opublikowana wtedy gdy powinna. Mówiąc wtedy gdy powinna, mam na myśli scenariusz w którym stan zostaje poprawnie zapisany. Dodatkowo chcę mieć pewność, że broker otrzymał wysłaną wiadomość. Trzeba więc rozwiązać znane już z wcześniej problemy:

  • Opublikowania wiadomości, która nigdy nie powinna trafić do brokera published-uncommited-msg-outbox

  • Niedostarczenia wiadomości lost-msg-outbox

Wykorzystamy do tego Outbox pattern, czyli podzielimy proces wysyłki wiadomości na dwa następujące po sobie etapy.

  1. W pierwszym kroku odłożymy w czasie moment opublikowania zdarzenia i zamiast wysyłać na kolejkę, zapiszemy wiadomość transakcyjnie w bazie danych razem z zapisywaniem stanu systemu.
  2. Następnie odczytamy z bazy danych wiadomości oczekujące na wysłanie, wyślemy je do brokera i jak wszystko się uda to usuniemy je z bazy danych.

Na diagramie wygląda to z grubsza w taki sposób: lost-msg-outbox

Rozdzielenie jednego dużego procesu na dwie mniejsze oraz niezależne części rozwiązuje powyższe problemy ponieważ:

  • Wiadomość zostaje zapisana atomowo włącznie ze stanem systemu. Wiadomość i system są zawsze w stanie ustalonym.
  • Jeżeli broker jest chwilowo niedostępny wiadomość nie zostaje stracona. Opublikowana zostanie jak tylko serwis zacznie działać.

Implementacja Outboxa w F#

Zgodnie z tym co widać na diagramie do zaimplementowania są dwie operacje. Pierwszą z nich jest dodanie wiadomości do Outboxa. Drugą natomiast odczytanie i wysłanie wiadomości.

Model wiadomości zapisywany w bazie danych

Każdy event, który trafia do Outboxa musi zostać zserializowany przy dodawaniu, a w kolejnym kroku zdeserializowany i przesłany dalej. Najłatwiej będzie zrzucić go do JSONa zachowując przy tym szczegółowe informacje o tym, czym dokładnie wspomniany wcześniej event jest. Wystarczy prosty model, który spełnia te potrzeby z delikatnym narzutem na audyt:

type Message = {
    Id: Guid
    OccuredOn: DateTime
    Payload: string
    Type: string
}

Dodawanie wiadomości do Outboxa

W pierwszej części zapisujemy do bazy danych wszystkie otrzymane wiadomości dowolnego typu.

let add (makeId: unit -> Guid) (readDateTimeNow: unit -> DateTime) (save: Message list -> Async<unit>) (messages: Object list) =
    let outboxMessages =
        messages
        |> List.map
            (fun message ->
                { Id = makeId ()
                  OccuredOn = readDateTimeNow ()
                  Payload = JsonConvert.SerializeObject message
                  Type = message.GetType().FullName })

    async { do! save outboxMessages }

Ciało funkcji jest raczej proste. Przyjrzyjmy się zależnościom:

  • makeId : delegat generujący unikalne identyfikatory
  • readDateTimeNow : delegat obliczający obecny czas
  • save : delegat zapisujący listę wiadomości w bazie danych
  • messages: lista wiadomości

Odczytanie i przetworzenie wiadomości oczekujących w Outboxie

[<Literal>]
let ParallelizationThreshold = 10

let execute (read: unit -> Async<Message list>) (setProcessed: Message -> Async<unit>) (publish: obj * Guid -> Async<unit>) (marker: Type) =
    let contractsAssembly = Assembly.GetAssembly(marker)

    let executeInternal message =
        async {
            let deserializedMessage =
                JsonConvert.DeserializeObject(message.Payload, contractsAssembly.GetType(message.Type))

            do! publish (deserializedMessage, message.Id)
            do! setProcessed message
        }


    async {
        let! chunksOfMessages =
            read ()
            |> Async.map (fun messages -> messages |> List.chunkBySize ParallelizationThreshold)

        do!
            chunksOfMessages
            |> List.map
                (fun msgChunk ->
                    msgChunk
                    |> List.map executeInternal
                    |> Async.Parallel)
            |> Async.Sequential
            |> Async.Ignore
    }

W drugiej części procesu pobieramy listę wiadomości, deserializujemy do odpowiednich typów, przetwarzamy i oznaczamy jako przetworzone. Aby zwiększyć przepustowość wrzucamy je współbieżnie w paczkach po dziesięć (wartość dobrana zupełnie losowo). Funkcja ma następujące zależności:

  • read : delegat zwracający listę wiadomości oczekujących na wysłanie
  • setProcessed : delegat, który oznacza wiadomość jako przetworzoną
  • publish : delegat przetwarzający wiadomości. W naszym przypadku jest to funkcja wrzucająca wiadomości na kolejkę
  • marker: dowolny typ, który pozwoli określić w którym assembly będziemy szukać typów przy deserializacji

Zależności - Baza danych

Spora część opisanych wyżej zależności dotyczyła dostępu do danych. Zachowując poziom komplikacji na możliwie niskim poziomie, moja implementacja wykorzystuje MSSQL jak bazę danych oraz Dappera [1], owrapowanego do łatwiejszego używania z F#.

Wymagane table utworzyłem takim skryptem:

CREATE TABLE OutboxMessages
(
    Id        uniqueidentifier NOT NULL,
    OccuredOn DateTime2        NOT NULL,
    Type      NVARCHAR(256)    NOT NULL,
    Payload   NVARCHAR(MAX)    NOT NULL,
    CONSTRAINT [PK_OutboxMessages_Id] PRIMARY KEY (Id)
)

CREATE TABLE ProcessedOutboxMessages
(
    Id          uniqueidentifier NOT NULL,
    OccuredOn   DateTime2        NOT NULL,
    Type        NVARCHAR(256)    NOT NULL,
    Payload     NVARCHAR(MAX)    NOT NULL,
    ProcessedOn DateTime2        NOT NULL
        CONSTRAINT [PK_ProcessedOutboxMessages_Id] PRIMARY KEY (Id)
)

Poniższe metody stanowią implementację zależności Outboxa.

[<Literal>]
let private readSql =
    "SELECT Id, OccuredOn as OccuredOn, Payload, Type FROM OutboxMessages"

[<Literal>]
let private insertSql =
    "INSERT INTO OutboxMessages(Id, OccuredOn, Payload, Type) VALUES (@Id, @OccuredOn, @Payload, @Type)"

[<Literal>]
let private moveToProcessedSql = "DELETE FROM OutboxMessages
      OUTPUT DELETED.Id, DELETED.OccuredOn, DELETED.Payload, DELETED.Type, GETUTCDATE()
      INTO ProcessedOutboxMessages (Id, OccuredOn, Payload, Type, ProcessedOn)
      WHERE Id = @id"

let read createConnection =
    async {
        use! connection = createConnection ()

        return!
            connection
            |> (dbQuery<Message> readSql)
            |> Async.map (List.ofSeq)
    }

let save createConnection outboxMessages =
    async {
        use! connection = createConnection ()

        do!
            connection
            |> dbParametrizedExecute insertSql outboxMessages
    }

let moveToProcessed createConnection outboxMessage =
    async {
        use! connection = createConnection ()

        do!
            connection
            |> dbParametrizedExecute moveToProcessedSql {| id = outboxMessage.Id |}
    }

Pierwsze dwie funkcje to klasyczny przykład odczytu i zapisu. Bardziej nietypowa wdawać się może procedura przeniesienia do tabeli przetworzonych wiadomości. Aby zrobić to atomowo oraz uniknąć zakładania transakcji, wykorzystałem klauzulę OUTPUT [2].

Sam Outbox można zaimplementować wykorzystując tylko jedną tabelę, która wygląda podobnie jak moja ProcessedOutboxMessages. Jedyną różnicą jest kolumna ProcessedOn, która musi być nullowalna. Po tej kolumnie, gdy wartość jest pusta, wyszukiwane są wiadomości oczekujące na przetworzenie. Wraz z przetworzeniem, kolumna ProcessedOn uzupełniana jest obecną datą. Zdecydowałem się jednak na wykorzystanie dwóch oddzielnych tabel, bo po pewnym czasie tabela zaczyna puchnąć. Po co więc filtrować wśród danych, które nas zupełnie (w tym konkretnym momencie) nie interesują?

Zależności - publikacja wiadomości

Kolejnym etapem jest implementacja mechanizmu publikowania zdarzeń na kolejkę. Jako szynę komunikacyjną wybrałem Rabbita [3] do którego łączę się przy pomocy MassTransita [4]. Zdecydowałem się na tę parę, ponieważ w projekcie, nad którym pracuję na lokalnym środowisku, wykorzystujemy dokładnie te same elementy. Sama biblioteka nie do końca sprzyja stylowi funkcyjnemu przez co kod nie jest najpiękniejszy. Jednak jak by nie wyglądał to robi robotę.

Korzystając z MassTransita, podłączenie do Rabbita nie wymaga zbyt dużo kodu.

let connect (serviceCollectionBusConfigurator: IServiceCollectionBusConfigurator) (connectionString: string) =
    serviceCollectionBusConfigurator.UsingRabbitMq(
        fun busRegistrationContext rabbitMqBusFactoryConfigurator -> rabbitMqBusFactoryConfigurator.Host(connectionString))

Podobnie wygląda samo publikowanie wiadomości.

let publish (publishEndpoint: IPublishEndpoint) (data: 'a, eventId: Guid) : Async<unit> =
    let assignMessageId (publishContext: PublishContext) = publishContext.MessageId <- eventId

    publishEndpoint.Publish(data, assignMessageId)
    |> Async.AwaitTask

Element na który trzeba zwrócić szczególną uwagę to przypisywanie Id wysyłanej wiadomości. Kluczowe jest aby identyfikator nie był losowy i odpowiadał dokładnie temu, który nadany został przy zapisywaniu wiadomości w Outboxie. Utrzymanie ciągłości pozwoli na wprowadzenie deduplikacji przy odbiorze. Jeżeli Id będzie za każdym razem inne to odbiorca może mieć problem przy ustaleniu czy wiadomość jest poprawna, czy jest duplikatem. Tym problemem zajmiemy się przy implementacji Inboxa.

Cykliczne uruchamianie procesu

Wszystkie klocki są już na miejscu. Pozostało je tylko poskładać i zapewnić, że proces przetwarzający oczekujące wiadomości uruchamiać się będzie cyklicznie. Jeżeli w projekcie wykorzystywany jest background worker typu Hangfire, można go wykorzystać. Mi w zupełności wystarczył wbudowany BackgroundService, który zgodnie zgodnie z przekazanym harmonogramem cyklicznie sprawdza czy ma coś do przetworzenia. Do parsowania wyrażenia użyłem biblioteki NCrontab [5].

type CronBackgroundService(logger: ILogger<CronBackgroundService>, cronExpression: string, job: unit -> Async<unit>) =
    inherit BackgroundService()

    override _.ExecuteAsync(ct: CancellationToken) =
        let executeJobAndSleepUntilNextSchedule (schedule: CrontabSchedule) job =
            async {
                let nextExecutionTime = schedule.GetNextOccurrence(DateTime.Now)
                do! job ()
                return!
                    match nextExecutionTime.Ticks - DateTime.Now.Ticks with
                    | ticks when ticks > 0L -> Task.Delay(TimeSpan(ticks)) |> Async.AwaitTask
                    | _ -> async.Return()
            }

        let schedule =
            CrontabSchedule.Parse(cronExpression, CrontabSchedule.ParseOptions(IncludingSeconds = true))

        AsyncSeq.initInfinite (fun _ -> executeJobAndSleepUntilNextSchedule schedule job)
        |> AsyncSeq.takeWhile (fun _ -> ct.IsCancellationRequested = false)
        |> AsyncSeq.iterAsync id
        |> Async.StartAsTask
        :> Task

Rejestracja komponentów

Wszystkie opisane elementy zarejestrowałem przy starcie starcie aplikacji.

let configureServices (appSettings: AppSettings) (services: IServiceCollection) =
    services
        .AddSingleton<CompositionRoot>(fun provider -> CompositionRoot.compose appSettings (provider.GetService<IBusControl>()))
        .AddMassTransit(fun cfg -> QueueListener.connect cfg appSettings.RabbitMqEndpoint appSettings.UseInbox )
        .AddHostedService(fun provider ->
            new CronBackgroundService.CronBackgroundService(
                provider.GetService<ILogger<CronBackgroundService>>(),
                "*/5 * * * * *",
                provider
                    .GetService<CompositionRoot>()
                    .OutboxProcessor
            ))
        .AddMassTransitHostedService()
        .AddGiraffe()
    |> ignore

Przedstawiony kod to tylko fragment entrypointa. Po więcej szczegółów odsyłam na GitHuba](https://github.com/marcinlovescode/ExactlyOnceProcessing/tree/master/outbox)]

Testy

Aby upewnić się, że Outbox działa poprawnie i rozwiązuje opisane problemy przygotowałem odchudzony scenariusz testowy, który przedstawiłem na diagramach.

Klient składa zamówienie i publikowane jest zdarzenie na które nasłuchuje serwis płatności. Po jego otrzymaniu, klient zostaje obciążony kwotą o wartości złożonego zamówienia. Jednakże proces składania zamówienia może się nie udać. Aby zasymulować wystąpienie błędu, proces rzuca wyjątek.

type Command =
    { Id: Guid
      ClientId: Guid
      TotalValue: decimal }

type IO =
    { Save: Order -> Async<unit>
      PublishEvent: OrderPlaced -> Async<unit> }

let handle shouldFailAfterSave io command =
    let order, orderPlacedEvent =
        Orders.placeOrder command.Id command.ClientId command.TotalValue

    async {
        do! orderPlacedEvent |> io.PublishEvent

        match shouldFailAfterSave with
        | true -> failwith "ERROR!"
        | false -> do! order |> io.Save
    }
type IO =
    {
        Save: Payment -> Async<unit>
    }

let handle io (event: OrderPlaced) =
    let paymentId = Guid.NewGuid()
    let payment = Payments.chargeCreditCard paymentId event.OrderId event.ClientId event.TotalValue
    payment |> io.Save

Aby “złożyć zamówienie” wystarczy strzelić do Api, które wystawiłem używając Giraffe [6]

let routing : (HttpFunc -> HttpContext -> HttpFuncResult) =
    choose [ POST
             >=> route "/orders"
             >=> placeOrderHandler
             POST
             >=> route "/failingOrders"
             >=> placeOrderHandlerFailing
             GET
             >=> routef "/orders/%O" readOrderById
             GET
             >=> routef "/client/%O/payments" readPaymentsByClientId  ]

W zależności od wybranej ścieżki, złożenie zamówienia się uda lub zostanie rzucony wyjątek. Dodatkowo, Api pozwala też na sprawdzenie czy zamówienie zostało utworzone oraz listę płatności danego klienta.

Scenariusz bez użycia Outboxa

Czas sprawdzić jak działa przedstawiony mechanizm. Przy wyłączonym Outboxie spodziewamy się, że zamówienie nie zostanie utworzone a mimo tego klient zostanie obciążony opłatą.

Z poziomu app settingsów wyłączyłem wykorzystywanie Outboxa

{
  [...]
  "UseOutbox": false,
  [...]
}

I strzeliłem do API takim requestem

POST https://localhost:5001/failingOrders HTTP/1.1
Content-Type: application/json

{
    "Id": "69f25b8f-46f9-48fc-9dda-6debe85b8eb8",
    "ClientId": "4e88f8e1-9c7d-4e70-bb48-acc502c96025",
    "TotalValue": 876.54
}

Następnie sprawdzam czy zamówienie zostało utworzone.

GET https://localhost:5001/orders/69f25b8f-46f9-48fc-9dda-6debe85b8eb8 HTTP/1.1
Content-Type: application/json

Odpowiedź, którą otrzymuję jest zgodna z przypuszczeniami. Zamówienie nie zostało utworzone.

"Not found order of id 69f25b8f-46f9-48fc-9dda-6debe85b8eb8"

Sprawdzę więc czy klient został za to zamówienie obciążony.

GET https://localhost:5001/client/4e88f8e1-9c7d-4e70-bb48-acc502c96025/payments HTTP/1.1
Content-Type: application/json

I otrzymuję taką odpowiedź

[
  {
    "id": "7a758e6f-2ad7-4ea2-802a-2702ae355e68",
    "orderId": "69f25b8f-46f9-48fc-9dda-6debe85b8eb8",
    "clientId": "4e88f8e1-9c7d-4e70-bb48-acc502c96025",
    "amount": 876.5400
  }
]

Zamówienia nie ma, a mimo tego klient został obciążony.

Scenariusz przy użyciu Outboxa

Zobaczmy teraz jak sprawy się mają, gdy Outbox jest wykorzystywany przy procesie składania zamówienia.

Z poziomu app settingsów włączam Outboxa

{
  [...]
  "UseOutbox": true,
  [...]
}

I podobnie jak wcześniej składam zamówienie na failującym endponcie.

POST https://localhost:5001/failingOrders HTTP/1.1
Content-Type: application/json

{
    "Id": "f7ceb858-d400-4602-a1f9-b5fc16bc282c",
    "ClientId": "14a92c5e-4cb1-4564-ad37-22a7cc8c1a1e",
    "TotalValue": 567.98
}

Następnie upewniam się, że zamówienie nie zostało utworzone .

GET https://localhost:5001/orders/f7ceb858-d400-4602-a1f9-b5fc16bc282c HTTP/1.1
Content-Type: application/json
"Not found order of id f7ceb858-d400-4602-a1f9-b5fc16bc282c"

Przy włączonym Outboxie klient nie powinien zostać obciążony żadnymi opłatami. Sprawdźmy:

GET https://localhost:5001/client/14a92c5e-4cb1-4564-ad37-22a7cc8c1a1e/payments HTTP/1.1
Content-Type: application/json

Dostajemy pustą listę płatności klienta

[]

Podsumowanie

Wykorzystanie Outboxa pozwoliło nam na wyeliminowanie problemu wysłania wiadomości, która nie powinna zostać wysłana oraz zagwarantować, że wszystkie wiadomości zostaną dostarczone. Jest to pierwszy z dwóch etapów zagwarantowania przetworzenia wiadomości dokładnie raz. Przed nami druga część procesu - strona odbiorcza. Implementację Inboxa pokaże w kolejnym wpisie.

Kod źródłowy

https://github.com/marcinlovescode/ExactlyOnceProcessing

Bibliografia

[1] Dapper [biblioteka] https://github.com/DapperLib/Dapper
[2] Microsoft. [artykuł] OUTPUT Clause. Źródło: https://docs.microsoft.com/en-us/sql/t-sql/queries/output-clause-transact-sql?view=sql-server-ver15
[3] RabbitMQ, [broker wiadomości] https://www.rabbitmq.com/
[4] MassTransit, [biblioteka] https://masstransit-project.com/
[5] NCrontab, [biblioteka] https://github.com/atifaziz/NCrontab
[6] Giraffe, [biblioteka] https://github.com/giraffe-fsharp/Giraffe
[7] Kamil Grzybek, [artykuł] The Outbox Pattern Źródło: http://www.kamilgrzybek.com/design/the-outbox-pattern/
[8] Marcin Golenia, [artykuł] Outbox pattern in F# with polling publisher https://mcode.it/blog/2021-02-18-fsharp_outbox/

Zostaw komentarz