Ostatni wpis dotyczył implementacji Outboxa w F#, który jest jednym z dwóch klocków całego rozwiązania. Pozostało jeszcze zagwarantować jednokrotne przetwarzanie wiadomości po stronie odbiorczej.
Jakie problemy trzeba rozwiązać?
Wracając pamięcią do pierwszego wpisu z tej serii strona odbiorcza przy konsumowaniu wiadomości zmaga się z dwoma problemami.
- Utrata wiadomości. Dzieje się to wtedy, gdy do brokera zostaje wysłane potwierdzenie przyjęcia komunikatu zanim zostanie on przetworzony oraz przy przetwarzaniu wystąpi błąd. Broker nie wykona wtedy retransmisji.
- Wielokrotne przetworzenie tej samej wiadomości. Gdy próbując uciec od powyższego problemu przeniesiemy proces wysłania potwierdzenia na sam koniec całej operacji, narażamy się na sytuację w której nie zostanie ono dostarczone. Broker po pewnym czasie wykona retransmisję i proces przetwarzania rozpocznie się raz jeszcze.
Oba problemy rozwiązać można w dwojaki sposób.
- Rozdzielić proces na odbieranie i deduplikację wiadomości oraz przetworzenie wiadomości na dwa niezależne procesy. Wygląda to trochę jak zastosowanie Outbox patternu, ale od drugiej strony.
- Odebrać wiadomość bez jej potwierdzania, przetworzyć, transakcyjnie zapisać stan w bazie danych wraz z identyfikatorem odebranej wiadomości w celach deduplikacji i potwierdzić odbiór.
Nawet gdy nie uda się potwierdzić odbioru, ale sama wiadomość zostanie przetworzona to mechanizm deduplikacji wykryje i odrzuci takie wiadomości.
Druga opcja jest trochę prostsza do implementacji i w przedstawionym scenariuszu spełnia swoje zadanie. Pokażę więc jak rozszerzyć projekt z poprzedniego wpisu o opisane funkcjonalności.
Implementacja Inboxa w F#
Funkcja która implementuje proces z powyższego diagramu może wyglądać w taki sposób:
let consume (messageAlreadyProcessed: Guid -> Async<bool>) (saveProcessedMessageId: Guid -> Async<unit>) (func: 'a -> Async<unit>) (messageId: Guid) (message: 'a) =
async {
let! isProcessed = messageAlreadyProcessed (messageId)
match isProcessed with
| false ->
use transactionScope =
new TransactionScope(TransactionScopeOption.Required, TransactionScopeAsyncFlowOption.Enabled)
do! func message
do! saveProcessedMessageId (messageId)
transactionScope.Complete()
| true -> ()
}
Implementacja pokrywa się z przedstawionym diagramem z dokładnością do miejsca otwarcia transakcji. Spójrzmy na zależności:
- messageAlreadyProcessed - delegat sprawdzający czy wiadomość o danym identyfikatorze została już przetworzona
- saveProcessedMessageId - delegat zapisujący identyfikator wiadomości w bazie danych
- func - delegat procesu, który ma zostać wykonany na otrzymanej wiadomości
- messageId - identyfikator wiadomości
- message - payload wiadomości
Zależności - baza danych
Tym razem do implementacji jest tylko kilka prostych operacje bazodanowych. Do deduplikacji wystarcza sam identyfikator wiadomości. W najprostszym wariancie, nie ma potrzeby przetrzymywać nic więcej.
Do utworzenia tabeli wykonałem ten oto skrypt:
CREATE TABLE ProcessedInboxMessages
(
Id uniqueidentifier NOT NULL
CONSTRAINT [PK_ProcessedInboxMessages_Id] PRIMARY KEY(Id)
)
I przygotowałem funkcje dostępu do danych:
[<Literal>]
let private insertSql =
"INSERT INTO ProcessedInboxMessages(Id) VALUES (@Id)"
let save createConnection msgId =
async {
use! connection = createConnection ()
do!
connection
|> dbParametrizedExecute insertSql {|Id = msgId|}
}
[<Literal>]
let private readSql =
"SELECT COUNT(1) FROM ProcessedInboxMessages WHERE Id = @Id"
let readIfExist createConnection id =
async {
use! connection = createConnection ()
return!
connection
|> (dbParametrizedQuerySingle<int> readSql {|Id = id|})
|> Async.map (fun count -> count = 1)
}
Podłączenie pod Mass Transita
Mając zaimplementowane zależności pozostało podłączyć się pod proces Mass Transita. Przygotowałem generyczną implementację interfejsu IConsumer, która opakowuje akcję handlera w transakcyjny proces deduplikacji.
type MassTransitInboxBasedConsumer<'a when 'a: not struct>(messageAlreadyProcessed: Guid -> Async<bool>, saveProcessedMessageId: Guid -> Async<unit>, func: 'a -> Async<unit>) =
interface IConsumer<'a> with
member this.Consume(context: ConsumeContext<'a>) =
InboxBasedConsumer.consume messageAlreadyProcessed saveProcessedMessageId func (context.MessageId.GetValueOrDefault()) context.Message
|> Async.StartAsTask
:> Task
Następnie zarejestrowałem handler dla zdarzenia OrderPlaced.
configurator.Consumer<MassTransitInboxBasedConsumer<OrderPlaced>>
(fun _ ->
let service =
busRegistrationContext.GetService<CompositionRoot>()
MassTransitInboxBasedConsumer<OrderPlaced>(service.ReadIfMessageAlreadyProcessed, service.SaveProcessedMessage, service.OrderPlacedHandler))
Testy
Mając wszystkie klocki na miejscu, sprawdźmy jak zachowa się zaimplementowany proces.
Punktem wejścia w proces jest moment pojawienia się zdarzenia OrderPlaced na kolejce. Aby zasymulować wielokrotne wysłanie wiadomości przez proces Outboxa, ręcznie za pomocą skryptu SQL przeniosę wiadomość z listy przetworzonych wiadomości do kolejki oczekujących.
BEGIN
INSERT INTO ExactlyOnceProcessing.dbo.OutboxMessages
SELECT Id, OccuredOn, [Type], Payload
FROM ExactlyOnceProcessing.dbo.ProcessedOutboxMessages
DELETE FROM ExactlyOnceProcessing.dbo.ProcessedOutboxMessages
END
Testy bez Inboxa
Z poziomu app settingsów wyłączyłem wykorzystywanie Inboxa…
{
[...]
"UseInbox": false,
[...]
}
… i strzeliłem do API takim requestem:
POST https://localhost:5001/orders HTTP/1.1
Content-Type: application/json
{
"Id": "f8723a22-7041-4e87-ae14-15c06cfa0de9",
"ClientId": "4e88f8e1-9c7d-4e70-bb48-acc502c96025",
"TotalValue": 143.99
}
Sprawdzam czy klient został obciążony za zamówienie…
GET https://localhost:5001/client/4e88f8e1-9c7d-4e70-bb48-acc502c96025/payments HTTP/1.1
Content-Type: application/json
… i otrzymuję taką odpowiedź:
[
{
"id": "9c1382b7-cd72-4209-b5b3-bec30a42071d",
"orderId": "f8723a22-7041-4e87-ae14-15c06cfa0de9",
"clientId": "4e88f8e1-9c7d-4e70-bb48-acc502c96025",
"amount": 143.9900
}
]
Następnie symuluję ponowne wysłanie zduplikowanej wiadomości za pomocą skryptu oraz sprawdzam stan obciążeń klienta.
[
{
"id": "9c1382b7-cd72-4209-b5b3-bec30a42071d",
"orderId": "f8723a22-7041-4e87-ae14-15c06cfa0de9",
"clientId": "4e88f8e1-9c7d-4e70-bb48-acc502c96025",
"amount": 143.9900
},
{
"id": "efe8be14-8300-4898-92c6-e72fd324bf93",
"orderId": "f8723a22-7041-4e87-ae14-15c06cfa0de9",
"clientId": "4e88f8e1-9c7d-4e70-bb48-acc502c96025",
"amount": 143.9900
}
]
Wyraźnie widać, że opłata została pobrana podwójnie.
Testy po włączeniu Inboxa
Włączmy Inboxa i upewnijmy się, że zduplikowana wiadomość zostanie odrzucona.
Z poziomu app settingsów włączam wykorzystywanie Inboxa.
{
[...]
"UseInbox": true,
[...]
}
Strzelam do API takim requestem:
POST https://localhost:5001/orders HTTP/1.1
Content-Type: application/json
{
"Id": "a3f62d36-0d8b-4087-8ed8-f0b650ec8f45",
"ClientId": "aa1431d1-65f1-4afd-ab5d-4a4c915ae817",
"TotalValue": 234.56
}
Sprawdzam czy klient został obciążony za zamówienie….
GET https://localhost:5001/client/aa1431d1-65f1-4afd-ab5d-4a4c915ae817/payments HTTP/1.1
Content-Type: application/json
… i otrzymuję taką odpowiedź
[
{
"id": "1d490418-705b-4e5d-b8a9-1ff0272343ab",
"orderId": "a3f62d36-0d8b-4087-8ed8-f0b650ec8f45",
"clientId": "aa1431d1-65f1-4afd-ab5d-4a4c915ae817",
"amount": 234.5600
}
]
Następnie symuluję ponowne wysłanie zduplikowanej wiadomości za pomocą skryptu oraz sprawdzam stan obciążeń klienta
GET https://localhost:5001/client/aa1431d1-65f1-4afd-ab5d-4a4c915ae817/payments HTTP/1.1
Content-Type: application/json
Po czym otrzymuję taką odpowiedź:
[
{
"id": "1d490418-705b-4e5d-b8a9-1ff0272343ab",
"orderId": "a3f62d36-0d8b-4087-8ed8-f0b650ec8f45",
"clientId": "aa1431d1-65f1-4afd-ab5d-4a4c915ae817",
"amount": 234.5600
}
]
Aby mieć pewność, że wszystko działa poprawnie można wykonywać skrypt bazodanowy wielokrotnie. Rezultat pozostanie ten sam. Wiadomość zostanie przetworzona dokładnie raz.
Podsumowanie
Zapewnienie gwarancji przetworzenia wiadomości dokładnie raz wymaga skoordynowanych działań po stronie nadawczej i odbiorczej. Choć poprawna implementacja pozawala tę właściwość osiągnąć, wiąże się to z wprowadzeniem kilku dodatkowych elementów, które mogą ostatecznie wpływać na sumaryczny czas wykonania całego procesu. Warto mieć to z tyłu głowy gdy zajdzie taka potrzeba. Być może problem można rozwiązać w inny sposób, który nie będzie wymagał implementacji całego złożonego mechanizmu.
Zostaw komentarz