Ostatni wpis dotyczył implementacji Outboxa w F#, który jest jednym z dwóch klocków całego rozwiązania. Pozostało jeszcze zagwarantować jednokrotne przetwarzanie wiadomości po stronie odbiorczej.

Jakie problemy trzeba rozwiązać?

Wracając pamięcią do pierwszego wpisu z tej serii strona odbiorcza przy konsumowaniu wiadomości zmaga się z dwoma problemami.

  • Utrata wiadomości. Dzieje się to wtedy, gdy do brokera zostaje wysłane potwierdzenie przyjęcia komunikatu zanim zostanie on przetworzony oraz przy przetwarzaniu wystąpi błąd. Broker nie wykona wtedy retransmisji.

lost-msg-inbox

  • Wielokrotne przetworzenie tej samej wiadomości. Gdy próbując uciec od powyższego problemu przeniesiemy proces wysłania potwierdzenia na sam koniec całej operacji, narażamy się na sytuację w której nie zostanie ono dostarczone. Broker po pewnym czasie wykona retransmisję i proces przetwarzania rozpocznie się raz jeszcze.

duplicated-msg-inbox

Oba problemy rozwiązać można w dwojaki sposób.

  • Rozdzielić proces na odbieranie i deduplikację wiadomości oraz przetworzenie wiadomości na dwa niezależne procesy. Wygląda to trochę jak zastosowanie Outbox patternu, ale od drugiej strony.

inbox-two-transactions

  • Odebrać wiadomość bez jej potwierdzania, przetworzyć, transakcyjnie zapisać stan w bazie danych wraz z identyfikatorem odebranej wiadomości w celach deduplikacji i potwierdzić odbiór.

inbox-single-transaction

Nawet gdy nie uda się potwierdzić odbioru, ale sama wiadomość zostanie przetworzona to mechanizm deduplikacji wykryje i odrzuci takie wiadomości.

deduplicated-msg-inbox

Druga opcja jest trochę prostsza do implementacji i w przedstawionym scenariuszu spełnia swoje zadanie. Pokażę więc jak rozszerzyć projekt z poprzedniego wpisu o opisane funkcjonalności.

Implementacja Inboxa w F#

Funkcja która implementuje proces z powyższego diagramu może wyglądać w taki sposób:

let consume (messageAlreadyProcessed: Guid -> Async<bool>) (saveProcessedMessageId: Guid -> Async<unit>) (func: 'a -> Async<unit>) (messageId: Guid) (message: 'a) =
        async {
            let! isProcessed = messageAlreadyProcessed (messageId)

            match isProcessed with
            | false ->
                use transactionScope =
                    new TransactionScope(TransactionScopeOption.Required, TransactionScopeAsyncFlowOption.Enabled)

                do! func message
                do! saveProcessedMessageId (messageId)
                transactionScope.Complete()
            | true -> ()
        }

Implementacja pokrywa się z przedstawionym diagramem z dokładnością do miejsca otwarcia transakcji. Spójrzmy na zależności:

  • messageAlreadyProcessed - delegat sprawdzający czy wiadomość o danym identyfikatorze została już przetworzona
  • saveProcessedMessageId - delegat zapisujący identyfikator wiadomości w bazie danych
  • func - delegat procesu, który ma zostać wykonany na otrzymanej wiadomości
  • messageId - identyfikator wiadomości
  • message - payload wiadomości

Zależności - baza danych

Tym razem do implementacji jest tylko kilka prostych operacje bazodanowych. Do deduplikacji wystarcza sam identyfikator wiadomości. W najprostszym wariancie, nie ma potrzeby przetrzymywać nic więcej.

Do utworzenia tabeli wykonałem ten oto skrypt:

CREATE TABLE ProcessedInboxMessages
(
    Id uniqueidentifier NOT NULL 
    CONSTRAINT [PK_ProcessedInboxMessages_Id] PRIMARY KEY(Id)
)

I przygotowałem funkcje dostępu do danych:

[<Literal>]
let private insertSql =
    "INSERT INTO ProcessedInboxMessages(Id) VALUES (@Id)"

let save createConnection msgId =
    async {
        use! connection = createConnection ()

        do!
            connection
            |> dbParametrizedExecute insertSql {|Id = msgId|}
    }
[<Literal>]
let private readSql =
    "SELECT COUNT(1) FROM ProcessedInboxMessages WHERE Id = @Id"

let readIfExist createConnection id =
    async {
        use! connection = createConnection ()

        return!
            connection
            |> (dbParametrizedQuerySingle<int> readSql {|Id = id|})
            |> Async.map (fun count -> count = 1)
    }

Podłączenie pod Mass Transita

Mając zaimplementowane zależności pozostało podłączyć się pod proces Mass Transita. Przygotowałem generyczną implementację interfejsu IConsumer, która opakowuje akcję handlera w transakcyjny proces deduplikacji.

type MassTransitInboxBasedConsumer<'a when 'a: not struct>(messageAlreadyProcessed: Guid -> Async<bool>, saveProcessedMessageId: Guid -> Async<unit>, func: 'a -> Async<unit>) =
    interface IConsumer<'a> with
        member this.Consume(context: ConsumeContext<'a>) =
            InboxBasedConsumer.consume messageAlreadyProcessed saveProcessedMessageId func (context.MessageId.GetValueOrDefault()) context.Message
            |> Async.StartAsTask
            :> Task

Następnie zarejestrowałem handler dla zdarzenia OrderPlaced.

configurator.Consumer<MassTransitInboxBasedConsumer<OrderPlaced>>
    (fun _ ->
        let service =
            busRegistrationContext.GetService<CompositionRoot>()
        MassTransitInboxBasedConsumer<OrderPlaced>(service.ReadIfMessageAlreadyProcessed, service.SaveProcessedMessage, service.OrderPlacedHandler))

Testy

Mając wszystkie klocki na miejscu, sprawdźmy jak zachowa się zaimplementowany proces.

Punktem wejścia w proces jest moment pojawienia się zdarzenia OrderPlaced na kolejce. Aby zasymulować wielokrotne wysłanie wiadomości przez proces Outboxa, ręcznie za pomocą skryptu SQL przeniosę wiadomość z listy przetworzonych wiadomości do kolejki oczekujących.

BEGIN

INSERT INTO ExactlyOnceProcessing.dbo.OutboxMessages
SELECT Id, OccuredOn, [Type], Payload 
FROM ExactlyOnceProcessing.dbo.ProcessedOutboxMessages

DELETE FROM ExactlyOnceProcessing.dbo.ProcessedOutboxMessages

END

Testy bez Inboxa

Z poziomu app settingsów wyłączyłem wykorzystywanie Inboxa

{
  [...]
  "UseInbox": false,
  [...]
}

… i strzeliłem do API takim requestem:

POST https://localhost:5001/orders HTTP/1.1
Content-Type: application/json

{
    "Id": "f8723a22-7041-4e87-ae14-15c06cfa0de9",
    "ClientId": "4e88f8e1-9c7d-4e70-bb48-acc502c96025",
    "TotalValue": 143.99
}

Sprawdzam czy klient został obciążony za zamówienie…

GET https://localhost:5001/client/4e88f8e1-9c7d-4e70-bb48-acc502c96025/payments HTTP/1.1
Content-Type: application/json

… i otrzymuję taką odpowiedź:

[
  {
    "id": "9c1382b7-cd72-4209-b5b3-bec30a42071d",
    "orderId": "f8723a22-7041-4e87-ae14-15c06cfa0de9",
    "clientId": "4e88f8e1-9c7d-4e70-bb48-acc502c96025",
    "amount": 143.9900
  }
]

Następnie symuluję ponowne wysłanie zduplikowanej wiadomości za pomocą skryptu oraz sprawdzam stan obciążeń klienta.

[
  {
    "id": "9c1382b7-cd72-4209-b5b3-bec30a42071d",
    "orderId": "f8723a22-7041-4e87-ae14-15c06cfa0de9",
    "clientId": "4e88f8e1-9c7d-4e70-bb48-acc502c96025",
    "amount": 143.9900
  },
  {
    "id": "efe8be14-8300-4898-92c6-e72fd324bf93",
    "orderId": "f8723a22-7041-4e87-ae14-15c06cfa0de9",
    "clientId": "4e88f8e1-9c7d-4e70-bb48-acc502c96025",
    "amount": 143.9900
  }
]

Wyraźnie widać, że opłata została pobrana podwójnie.

Testy po włączeniu Inboxa

Włączmy Inboxa i upewnijmy się, że zduplikowana wiadomość zostanie odrzucona.

Z poziomu app settingsów włączam wykorzystywanie Inboxa.

{
  [...]
  "UseInbox": true,
  [...]
}

Strzelam do API takim requestem:

POST https://localhost:5001/orders HTTP/1.1
Content-Type: application/json

{
    "Id": "a3f62d36-0d8b-4087-8ed8-f0b650ec8f45",
    "ClientId": "aa1431d1-65f1-4afd-ab5d-4a4c915ae817",
    "TotalValue": 234.56
}

Sprawdzam czy klient został obciążony za zamówienie….

GET https://localhost:5001/client/aa1431d1-65f1-4afd-ab5d-4a4c915ae817/payments HTTP/1.1
Content-Type: application/json

… i otrzymuję taką odpowiedź

[
  {
    "id": "1d490418-705b-4e5d-b8a9-1ff0272343ab",
    "orderId": "a3f62d36-0d8b-4087-8ed8-f0b650ec8f45",
    "clientId": "aa1431d1-65f1-4afd-ab5d-4a4c915ae817",
    "amount": 234.5600
  }
]

Następnie symuluję ponowne wysłanie zduplikowanej wiadomości za pomocą skryptu oraz sprawdzam stan obciążeń klienta

GET https://localhost:5001/client/aa1431d1-65f1-4afd-ab5d-4a4c915ae817/payments HTTP/1.1
Content-Type: application/json

Po czym otrzymuję taką odpowiedź:

[
  {
    "id": "1d490418-705b-4e5d-b8a9-1ff0272343ab",
    "orderId": "a3f62d36-0d8b-4087-8ed8-f0b650ec8f45",
    "clientId": "aa1431d1-65f1-4afd-ab5d-4a4c915ae817",
    "amount": 234.5600
  }
]

Aby mieć pewność, że wszystko działa poprawnie można wykonywać skrypt bazodanowy wielokrotnie. Rezultat pozostanie ten sam. Wiadomość zostanie przetworzona dokładnie raz.

Podsumowanie

Zapewnienie gwarancji przetworzenia wiadomości dokładnie raz wymaga skoordynowanych działań po stronie nadawczej i odbiorczej. Choć poprawna implementacja pozawala tę właściwość osiągnąć, wiąże się to z wprowadzeniem kilku dodatkowych elementów, które mogą ostatecznie wpływać na sumaryczny czas wykonania całego procesu. Warto mieć to z tyłu głowy gdy zajdzie taka potrzeba. Być może problem można rozwiązać w inny sposób, który nie będzie wymagał implementacji całego złożonego mechanizmu.

Kod źródłowy

https://github.com/marcinlovescode/ExactlyOnceProcessing

Zostaw komentarz