CQRS w Azure z wykorzystaniem Service Bus, Azure Tables oraz Azure Blob, część II ![Udostępnij na: Facebook](images/gg670867.udostepnij_fb(pl-pl,MSDN.10).png \"Udostępnij na: Facebook\")

Autor: Piotr Zieliński

Opublikowano: 2014-05-21

Wstęp

W poprzedniej części opisałem podstawy CQRS bez konkretnej implementacji. W zależności od systemu istnieje wiele implementacji CQRS, chciałbym więc teraz pokazać przykładową – bazującą na platformie Azure. Myślę, że zanim sięgnie się po zewnętrzne frameworki, warto spróbować zaimplementować CQRS samodzielnie. Pozwoli to w pełni zrozumieć zasadę działania CQRS.

Architektura

Przedstawiona architektura będzie się składała z następujących elementów:

  • Web Roles – aplikacje ASP.NET MVC, działające na zasadzie klienta. To one będą wysyłać komendy i zapytania.
  • Service Bus – implementacja Azure pozwala zaimplementować wiele scenariuszy, takich jak np. komunikacja aplikacji działających w chmurze z innymi usługami znajdującymi się poza nią. Ponadto Azure Service Bus wspiera kolejki, które można wykorzystać do przechowywania komend. Web Role będzie zatem wysyłał komendę do Service Bus, a ten umieści ją w kolejce.
  • Worker Role – procesy działające w tle będą zdejmowały komendy z kolejki, a następnie wykonywały daną logikę.
  • Azure Tables –repozytorium dla zdarzeń (Event Sourcing).
  • Azure Blob – będzie przechowywać aktualny stan danych (ReadModel). Wszelkie zapytania będą operować bezpośrednio na Blob.

Wysyłanie i wykonywanie komend

Rys. 1. Wysyłanie i wykonywanie komend.

Przedstawiona architektura zapewnia wysoką skalowalność – dzięki równoważeniu obciążenia. Wielu klientów może jednocześnie wysyłać komendy i wszystkie one będą wykonywanie współbieżnie przez dostępne workery. Jeśli w danym czasie pojawi się zbyt dużo komend, będą po prostu przechowywane w kolejce – aż do momentu, gdy któryś z workerów będzie mógł obsłużyć żądanie.

Aktualizacja ReadModel\ViewStore wygląda podobnie – odbywa się za pomocą worker role. Po wykonaniu komendy handler wysyła do Service Bus zdarzenie, które (tak samo jak inne komendy) może zostać zdjęte przez jednego z workerów i odpowiednio obsłużone. W przypadku zdarzeń obsługa polega na aktualizacji ReadModel i zapisie sekwencji zdarzeń w repozytorium.

Aktualizacja ViewStore\ReadModel

Rys. 2. Aktualizacja ViewStore\ReadModel.

Service Bus

Podstawami Service Bus nie będę się tutaj zajmował. Generalnie implementacja Service Bus dostarczona przez Azure umożliwia obsługę następujących scenariuszy:

  • Relay – łączenie systemów, które znajdują się poza NAT. Bardzo często jest to wykorzystywane do komunikacji między usługą w chmurze a tą poza nią. Ponadto, gdy dany serwis nie ma publicznego IP lub jest za firewallem, można posłużyć się Service Bus, aby nawiązać połączenie.
  • Kolejki – technologia zawiera również kolejkę danych, która służy do rozproszonej komunikacji między różnymi aplikacjami lub systemami. W sam raz nadaje się to do obsługi komend CQRS.
  • Topics – to również sposób komunikacji. Subskrybenci mogą podłączyć się do różnych „tematów” i odbierać wiadomości w zależności od zdefiniowanych filtrów. Typowy scenariusz to kilku różnych subskrybentów ze zdefiniowanymi filtrami, a więc odbierających tylko te wiadomości, które są w stanie obsłużyć.

Dokumentacja powyższych scenariuszy znajduje się tutaj.

Implementacja obsługi komend: konfiguracja Service Bus

W projekcie (w naszym przypadku – Web Role i Worker Role), w którym chce się wykorzystać Azure Service Bus, należy najpierw zainstalować odpowiedni pakiet z NuGet:

Dzięki temu do dyspozycji będzie klasa QueueClient, niezbędna do wysłania jakiejkolwiek wiadomości.

Kolejnym krokiem jest skonfigurowanie Service Bus. Wystarczy zalogować się do portalu Azure i wykonać następujące kroki:

  1. Kliknąć Service Bus w głównym panelu:

  2. Wprowadzić nazwę przestrzeni:

  3. Stworzyć kolejkę w ramach właśnie stworzonej przestrzeni:

W celu połączenia się z Service Bus będzie potrzebny connection string. Można go uzyskać z panelu Azure, klikając na „View Connection String” w zakładce Queues.

Implementacja obsługi komend: wysyłanie komend

Na tym etapie usługa Service Bus powinna być skonfigurowana, a connection string jest już dostępny. Czas przejść do strony klienckiej, odpowiedzialnej za wysyłanie komend. Warto jednak najpierw zdefiniować kilka interfejsów, a mianowicie:

public interface ICommandBus
{
    void Send(object command);
}
public interface ICommand
{
    Guid Id { get; }
}

ICommand powinien być znany z poprzedniej części artykułu. Z kolei ICommandBus reprezentuje magistralę, po której będzie wysyłać się komendy, aby następnie zostały odebrane i przetworzone przez jednego z workerów. Implementacja ICommandBus za pomocą Azure Service Bus wygląda tak:

public class AzureCommandBus:ICommandBus
{
    private readonly QueueClient _queueClient;

    public AzureCommandBus(string connectionString,string queueName)
    {
        _queueClient=QueueClient.CreateFromConnectionString(connectionString, queueName);
    }

    public void Send(object message)
    {
        using (var memoryStream = new MemoryStream())
        {
            var binaryFormatter = new BinaryFormatter();
            binaryFormatter.Serialize(memoryStream, message);
            memoryStream.Position = 0;

            _queueClient.Send(new BrokeredMessage(memoryStream));
        }          
    }
}

Przykładowa definicja komendy:

[Serializable]
sealed public class BookSeatCommand : ICommand
{     
    public Guid Id { get; private set; }
    public DateTime Date { get; private set; }
    public int SeatNumber { get; private set; }
    public string FirstName { get; private set; }
    public string LastName { get; private set; }

    public BookSeatCommand(DateTime date, int seatNumber, string firstName, string lastName)
    {
        Date = date;
        SeatNumber = seatNumber;
        FirstName = firstName;
        LastName = lastName;
    }
}

W celu wysłania komend wystarczy stworzyć instancję CommandBus i wywołać metodę Send:

_commandBus.Send(new BookSeatCommand(Guid.NewGuid(),1,"Piotr","Zielinski"));

Kod powinien zostać umieszczony w jednym z kontrolerów. Dane takie jak imię, nazwisko czy numer siedzenia mogą zostać pobrane z formularza – nie ma to znaczenia dla zagadnień omawianych w tym artykule. Warto rozważyć jakiś framework IoC, np. Autofac, aby implementacja ICommandBus została wstrzyknięta w konstruktorze kontrolera. Innymi słowy, nie powinno się przechowywać silnej referencji do AzureCommandBus w kontrolerze.

Jeśli komenda została wysłana poprawnie, powinno być to widoczne w panelu Azure, a konkretnie – w kolumnie Queue Length:

Implementacja obsługi komend: odbieranie i przetwarzanie

Poprzednia sekcja dotyczyła w istocie strony klienckiej – wysyłania komend. Prawdopodobnie będą one wysyłane przez Web Role, np. za pomocą aplikacji ASP.NET MVC. Kolejna kwestia do przemyślenia to obsługa komend, która zostanie zaimplementowana za pomocą Worker Role. Klasa będzie odbierać w pętli przychodzące wiadomości i wykonywać skojarzoną logikę. Główny interfejs może zatem wyglądać tak:

public interface IMessageReceiver
{
    void Run(CancellationToken cancellationToken);
}

Warto zaznaczyć, że ten sam mechanizm do obsługi komend zostanie w dalszej części artykułu przeznaczony również do obsługi zdarzeń domenowych.

Najprostsza implementacja oparta na Azure Service Bus to natomiast:

public class AzureMessageReceiver : IMessageReceiver
{
    private readonly QueueClient _queueClient;
    private readonly Dictionary<Type, Func<object>> _handlers = new Dictionary<Type, Func<object>>();

    public AzureMessageReceiver(Dictionary<Type, Func<object>> handlers,string connectionString, string queueName)
    {
        _queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);
        _handlers = handlers;            
    }

    public void Run(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            BrokeredMessage message = _queueClient.Receive();
            
            try
            {
                ProcessMessage(message);
            }
            catch (Exception e)
            {
                Trace.TraceError(e.ToString());

                if (!IsNotCritical(e))
                    throw e;
            }
        }
    }

    private bool IsNotCritical(Exception exception)
    {
        // TODO: To jest tylko przykład. Zawsze trzeba zastanowic sie, ktore wyjatki moga byc ignorowane.
        return true;
    }

    private void ProcessMessage(BrokeredMessage brokeredMessage)
    {
        using (var stream = brokeredMessage.GetBody<Stream>())
        {
            var binaryFormatter = new BinaryFormatter();
            dynamic message = binaryFormatter.Deserialize(stream);           

            Func<object> handlerFactory;
            if (_handlers.TryGetValue(message.GetType(), out handlerFactory))
            {
                dynamic handler = handlerFactory();
                handler.Handle(message);
            }

            brokeredMessage.Complete();
        }
    }
}

W kodzie produkcyjnym lepiej oczywiście zastosować jakiś framework IoC. Tutaj dla uproszczenia został użyty prosty słownik. ProcessMessage jest wykonywany w pętli aż do momentu, gdy przekaże się token do anulowania przetwarzania. ProcessMessage odbiera wiadomość, która powinna być komendą lub zdarzeniem. Dla uproszczenia kod nie zawiera obsługi błędu, która powinna również rozważyć ewentualność niepoprawnego sformatowania wiadomości. Po odczytaniu wiadomości tworzony jest handler. Przykładowy CommandHandler:

public interface IMessageHandler<T>
{
    void Handle(T command);
}

public sealed class BookSeatCommandHandler : IMessageHandler<BookSeatCommand>
{

    public void Handle(BookSeatCommand command)
    {
    }
}

Zdarzenia domenowe

Kolejne wzywanie to zdarzenia – zarówno ich wysyłanie, jak i odbieranie. Zdarzenia mogą posłużyć m.in. do aktualizacji ReadModel. W poprzedniej części artykułu wyjaśniłem zasadę działania Event Sourcing.  W Azure doskonałym miejscem do ich przechowywania są po prostu tabele (Azure Table). Zdarzenia zwykle przechowuje się w bazach NoSql, dlatego Azure Table jest dobrym wyborem.

Najpierw należy jednak zdefiniować ogólną klasę POCO, reprezentującą konkretne zdarzenie np. „miejsce w samolocie zostało zarezerwowane”:

[Serializable]
public class Event 
{
    public Event(Guid aggregateId)
    {
        AggregateId = aggregateId;
    }
    public int Version { get; set; }
    public Guid AggregateId { get; set; }
}

[Serializable]
public class SeatBookedEvent : Event
{
    public SeatBookedEvent(Guid aggregateId) : base(aggregateId)
    {
    }

    public DateTime Date
    {
        get; set;
    }

    public string FirstName
    {
        get; set;
    }

    public string LastName
    {
        get; set;
    }

    public int SeatNumber
    {
        get; set;
    }
}

Powyższa klasa to wyłącznie obiekt POCO. Nie ma on nic wspólnego z lokalizacją, w której zdarzenie zostanie zapisane. W Azure Tables można zdefiniować konkretną encję. Następnie da się ją zapisać już w bazie:

public class SeatReservationEventEntity : TableEntity
{
    public int Version { get; set; }        
    public Guid AggregateId { get; set; }
    public DateTime Date { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public int SeatNumber { get; set; }
}

Kolejny krok to zdefiniowanie repozytorium (tak jak w poprzedniej części artykułu). Interfejs bazowy:

public interface ISeatRepository
{
    void Book(SeatBookedEvent seatBookedEvent,int expectedVersion);
}

Konkretna implementacja, przechowująca zdarzenia w Azure Table:

public class SeatRepository : ISeatRepository
{
    private readonly CloudTable _eventsTable;

    public SeatRepository(CloudTable cloudTable)
    {
        _eventsTable = cloudTable;
    }

    private SeatReservationEventEntity[] GetLastEventVersion(Guid aggregateId)
    {
        var events=_eventsTable.CreateQuery<SeatReservationEventEntity>().Where(r => r.AggregateId == aggregateId).ToArray();

        return events.ToArray();
    }

    public void Book(SeatBookedEvent seatBookedEvent,int expectedVersion)
    {
        SeatReservationEventEntity[] events = GetLastEventVersion(seatBookedEvent.AggregateId);
      
        if (events.Any(e=>e.Version>=expectedVersion))
        {
            throw new DBConcurrencyException();
        }           

        var entity = new SeatReservationEventEntity();
        entity.PartitionKey = seatBookedEvent.Date.Year.ToString(CultureInfo.InvariantCulture);
        entity.RowKey = Guid.NewGuid().ToString();
        entity.Date = seatBookedEvent.Date;
        entity.FirstName = seatBookedEvent.FirstName;
        entity.LastName = seatBookedEvent.LastName;
        entity.SeatNumber = seatBookedEvent.SeatNumber;
        entity.Version = seatBookedEvent.Version + 1;

        TableOperation insert = TableOperation.Insert(entity);
        _eventsTable.Execute(insert);
    }
}

Wysłanie zdarzenia domenowego nie będzie się niczym różnić od wysłania komendy. Przykład:

var bookedEvent = new SeatBookedEvent(Guid.NewGuid());
bookedEvent.FirstName = command.FirstName;
bookedEvent.LastName = command.LastName;
bookedEvent.Date = command.Date;
bookedEvent.SeatNumber = command.SeatNumber;
bookedEvent.Version = -1;
_commandBus.Send(bookedEvent);

CommandBus został zaimplementowany w dość generyczny sposób, a zatem nic nie szkodzi na przeszkodzie, aby obsługa zdarzeń implementowała również IMessageHandler:

public sealed class BookSeatEventHandler : IMessageHandler<SeatBookedEvent>
{
    private readonly ISeatRepository _seatRepository;

    public BookSeatEventHandler(ISeatRepository seatRepository)
    {
        _seatRepository = seatRepository;        
    }

    public void Handle(SeatBookedEvent command)
    {  
        // obsługa zdarzenia tutaj
    }
}

Aktualizacja modelu do odczytu

W systemach opartych na CQRS bardzo często korzysta się z dodatkowego modelu, który służy wyłącznie do odczytu. Jakakolwiek operacja (komenda) generuje zdarzenia, a ich efektem jest aktualizacja ReadModel. W poprzedniej części artykułu przedstawiono następujący scenariusz:

  1. Użytkownik wysyła komendę.
  2. Command Handler obsługuje zdarzenie – wykonuje logikę biznesową i generuje serie zdarzeń.
  3. Zdarzenia są zapisywane do bazy danych (Event Sourcing).
  4. Wysłanie zdarzenia powoduje aktualizację ReadModel.

Kroki 1, 2, 3 zostały już zaimplementowane. Pokazałem, jak wysłać i obsłużyć komendę oraz jak działają zdarzenia.

Kolejnym krokiem jest aktualizacja ReadModel. W zależności od konkretnego systemu może on mieć formę relacyjnej bazy danych, NoSQL albo po prostu blob. W tym artykule to Azure Blob będzie przechowywać listę zarezerwowanych miejsc.

W systemie potrzebować będziemy klas do odczytu i zapisu blob. Na początek warto zdefiniować interfejsy:

public interface IReservationReader
{
    ReservationInfo[] GetReservations();
}
public interface IReservationWriter
{
    void AddReservation(ReservationInfo reservation);
}

Klasa ReservationInfo to wartość przechowywana w blob, a konkretnie:

[Serializable]
public class ReservationInfo
{
    public string FullName { get; set; }
    public int SeatNumber { get; set; }
}

Dla uproszczenia jedna klasa zaimplementuje oba interfejsy:

public class ReservationManager : IReservationReader, IReservationWriter
{
    private readonly CloudBlockBlob _cloudBlock;

    public ReservationManager(CloudBlockBlob cloudBlock)
    {
        _cloudBlock = cloudBlock;
    }

    public ReservationInfo[] GetReservations()
    {
        using (var memoryStream = new MemoryStream())
        {
           if(!_cloudBlock.Exists())
                return new ReservationInfo[0];         
   
            _cloudBlock.DownloadToStream(memoryStream);

            if(memoryStream.Length==0)
                return new ReservationInfo[0];

            memoryStream.Position = 0;

            var binaryFormatter = new BinaryFormatter();
            return (ReservationInfo[]) binaryFormatter.Deserialize(memoryStream);
        }
    }

    public void AddReservation(ReservationInfo reservation)
    {
        ReservationInfo[] currentResevations = GetReservations();

        IEnumerable<ReservationInfo> allReservations = currentResevations.Union(new[] {reservation});

        using (var memoryStream = new MemoryStream())
        {                
            var binaryFormatter = new BinaryFormatter();
            binaryFormatter.Serialize(memoryStream,allReservations.ToArray());
            memoryStream.Position = 0;
            _cloudBlock.UploadFromStream(memoryStream);
        }
    }
}

W powyższym kodzie nie ma chyba nic nadzwyczajnego. Została użyta binarna serializacja w celu zapisu i odczytu danych do Azure Blob. Implementacja zależeć będzie od tego, gdzie chcemy przechowywać aktualny stan systemu. Nic nie szkodzi na przeszkodzie, aby skorzystać np. z Azure SQL Server.

Nie została jeszcze w pełni zaimplementowana metoda BookSeatCommandHandler:Handle. Teraz należy obsłużyć dwa scenariusze: gdy można złożyć rezerwację, wysłane jest zdarzenie SeatBookedEvent; w przeciwnym razie generuje się zdarzenie domenowe ReservationRejectedEvent:

public sealed class BookSeatCommandHandler : IMessageHandler<BookSeatCommand>
{
    private readonly ICommandBus _commandBus;
    private readonly IReservationReader _reservationReader;

    public BookSeatCommandHandler(ICommandBus commandBus, IReservationReader reservationReader)
    {
        _commandBus = commandBus;
        _reservationReader = reservationReader;
    }
    public void Handle(BookSeatCommand command)
    {
        if (_reservationReader.GetReservations().Any(r => r.SeatNumber == command.SeatNumber))
        {
            var reservationRejectedEvent = new ReservationRejectedEvent(Guid.NewGuid());
            reservationRejectedEvent.FirstName = command.FirstName;
            reservationRejectedEvent.LastName = command.LastName;
            reservationRejectedEvent.SeatNumber = command.SeatNumber;
            _commandBus.Send(reservationRejectedEvent);
        }
        else
        {
            var bookedEvent = new SeatBookedEvent(Guid.NewGuid());
            bookedEvent.FirstName = command.FirstName;
            bookedEvent.LastName = command.LastName;
            bookedEvent.Date = command.Date;
            bookedEvent.SeatNumber = command.SeatNumber;
            bookedEvent.Version = -1;
            _commandBus.Send(bookedEvent);
        }
    }
}

Obsługa ReservationRejectedEvent może zawierać np. kod odpowiedzialny za wysłanie e-maila. Dużo ciekawszym zdarzeniem jest jednak SeatBookedEvent, które powinno zaktualizować Model:

public sealed class BookSeatEventHandler : IMessageHandler<SeatBookedEvent>
{
    private readonly ISeatRepository _seatRepository;
    private readonly IReservationWriter _reservationWriter;

    public BookSeatEventHandler(ISeatRepository seatRepository, IReservationWriter reservationWriter)
    {
        _seatRepository = seatRepository;
        _reservationWriter = reservationWriter;
    }

    public void Handle(SeatBookedEvent command)
    {  
        _seatRepository.Book(command,-1);
        _reservationWriter.AddReservation(new ReservationInfo
        {
            FullName = string.Format("{0} {1}", command.FirstName, command.LastName),
            SeatNumber = command.SeatNumber
        });
    }
}

Metoda Handle korzysta przede wszystkim z repozytorium, aby zapisać zdarzenie do Azure Tables. Następie za pomocą IReservationWriter aktualizuje ReadModel, oparty na Azure Blob.

Wysyłanie zapytań – odczyt danych

Większość elementów została już zaimplementowana. Ostatnim jest odczyt danych. W powyższej sekcji napisano klasę ReservationManager, odpowiedzialną również za odczyt. W CQRS można z niej bezpośrednio skorzystać, np. w kontrolerze:

public ActionResult Index()
{
    ReservationInfo[] reservations = _reservationReader.GetReservations();
    return View(reservations);
}

Pole _reservationReader to implementacja interfejsu IReservationReader, zdefiniowanego w poprzedniej sekcji.

Podsumowanie

CQRS to wzorzec, który zdecydowanie nadaje się do skalowalnych i rozproszonych systemów. Rozdzielenie logiki odpowiedzialnej za odczyt i zapis jest w systemach rozproszonych dość naturalnym podejściem, co wpisuje się bardzo dobrze w architekturę CQRS.

Przedstawiony przykład jest bardzo sztuczny – rzeczywisty scenariusz zawierałby dużo więcej logiki biznesowej i walidacji danych. Moim zdaniem nie warto jednak komplikować kodu logiką biznesową w artykule, który przedstawia wyłącznie działanie CQRS. Nic nie szkodzi na przeszkodzie, aby rozszerzyć handlery i zaimplementować kilka obiektów domenowych. W środowisku produkcyjnym warto również zastanowić się nad gotowymi frameworkami, przetestowanymi już przez licznych użytkowników.

Produkcyjny kod jest zawsze bardziej skomplikowany niż implementacje przedstawione w artykułach (takie jak powyższa). Moim celem było wyłącznie naszkicowanie CQRS i ćwiczenie polegające na jego implementacji, będące doskonałym sposobem na głębsze zrozumienie wzorca.