Июнь 2016

Том 31 номер 6

Инфраструктура Reactive - Масштабирование асинхронных связей клиент-сервер с помощью Reactive

Питер Вогел | Июнь 2016

Продукты и технологии:
Rx (Reactive Extensions), ObservableCollection, асинхронное программирование

В статье рассматриваются:

  • получение результатов от длительно выполняемого процесса по мере их появления, используя шаблон Observer (наблюдатель);
  • простой способ создания приложений с помощью ObservableCollection, которые ведут мониторинг длительно выполняемых процессов;
  • обновление до Reactive Extensions, позволяющее асинхронно принимать вывод от процессов, управляемых событиями.

Исходный код можно скачать по ссылке

Асинхронная обработка стала более распространенной в создании приложений благодаря тому, что Microsoft .NET Framework обрела широкий спектр средств, которые поддерживают специфические проектировочные шаблоны асинхронного программирования. Зачастую создание качественно спроектированного асинхронного приложения сводится к выбору подходящего проектировочного шаблона, реализуемого в вашем приложении, и последующему подбору правильного набора .NET-компонентов.

В некоторых случаях это требует интеграции нескольких .NET-компонентов. В статье Стивена Клири (Stephen Cleary) «Patterns for Asynchronous MVVM Applications: Commands» (bit.ly/233Kocr) поясняется, как обеспечить полную поддержку шаблона Model-View-ViewModel (MVVM) в асинхронном стиле. В других случаях для поддержки достаточно одного компонента из .NET Framework. Я обсуждал реализацию шаблона Provider/Consumers с применением BlockingCollection в своих статьях «Create Simple, Reliable Asynchronous Apps with BlockingCollection» (bit.ly/1TuOpE6) и «Create Sophisticated Asynchronous Applications with BlockingCollection» (bit.ly/1SpYyD4) на VisualStudioMagazine.com.

Другой пример реализации проектировочного шаблона Observer (наблюдатель) — асинхронный мониторинг длительно выполняемой операции. В этом сценарии асинхронный метод, который просто возвращает объект Task, не работает, поскольку клиент нередко возвращает поток результатов. Для таких сценариев можно задействовать минимум два средства из .NET Framework: ObservableCollection и Reactive Extensions (Rx). В простых решениях достаточно ObservableCollection (наряду с ключевыми словами async и await). Но в более интересных случаях, особенно в тех, где процессы управляются событиями, Rx обеспечивает больший контроль над процессом.

Определение шаблона

Хотя шаблон Observer часто применяется в проектировочных шаблонах UI, включая Model-View-Controller (MVC), Model-View-Presenter (MVP) и MVVM, UI-интерфейсы следует рассматривать как всего один из случаев большего набора ситуаций, где используется шаблон Observer. Определение этого шаблона (взятое из английской версии Википедии) таково: «Объект, называемый субъектом (subject), [который] поддерживает список своих зависимых объектов (dependents), называемых наблюдателями (observers), и автоматически уведомляет их о любых изменениях состояния — обычно вызовом одного из их методов».

На практике суть шаблона Observer сводится к получению клиентом результатов от длительно выполняемых процессов по мере того, как эти результаты становятся доступны. Без какой-либо версии шаблона Observer клиенту пришлось бы ждать до тех пор, пока не стал бы доступен последний результат, а затем он получил бы сразу все результаты одним пакетом. В связи с постоянно растущей долей асинхронных операций вы предпочтете, чтобы наблюдатели обрабатывали результаты параллельно с клиентом по мере того, как они становятся доступны. Чтобы подчеркнуть, что при использовании шаблона Observer речь идет о гораздо большем, чем UI, я буду употреблять в остальной части статьи термины «клиент» и «сервер» вместо «наблюдатель» и «субъект».

Проблемы и возможности

При использовании шаблона Observer возникают минимум три проблемы и две возможности. Первая — проблема с удалением слушателей (lapsed-listener problem): многие реализации этого шаблона требуют, чтобы сервер удерживал ссылки на все свои клиенты. В итоге клиенты сохраняются в памяти, пока сервер не завершает работу. Очевидно, что это не оптимальное решение для длительно выполняемого процесса в динамичной системе, где клиенты часто подключаются и отключаются.

Однако эта проблема является всего лишь симптомом второй, более крупной проблемы: многие реализации шаблона Observer требуют жесткого связывания сервера и клиента, заставляя и сервер, и клиент постоянно присутствовать. Как минимум, клиент должен иметь возможность определять, присутствует ли сервер, и выбирать, подключаться или нет; кроме того, сервер должен быть в состоянии работать, даже если нет никаких клиентов, принимающих результаты.

Третья проблема связана с производительностью: сколько времени уйдет у сервера на уведомление всех клиентов? На производительность шаблона Observer прямо влияет количество уведомляемых клиентов. Поэтому есть возможность улучшить производительность этого шаблона, позволив клиенту с упреждением фильтровать результаты, которые поступают обратно от сервера. Это также улучшает ситуацию там, где сервер генерирует больше результатов, чем в этом заинтересован клиент: клиент может указать, что его следует уведомлять только в специфических случаях. Вторая возможность повысить производительность связана с распознаванием того, что у сервера нет никаких результатов или что он закончил их генерацию. Тогда клиенты не будут запрашивать ресурсы, необходимые для обработки событий на сервере, пока гарантированно не появится что-то, что нужно обработать, и клиенты смогут освобождать эти ресурсы сразу после того, как им становится известно, что обработан последний результат.

От Observer к Publish/Subscribe

Учет этих соображений ведет нас от простых реализаций шаблона Observer к соответствующей модели publish/subscribe (публикация/подписка). Publish/subscribe реализует шаблон Observer в свободно связанном стиле, что позволяет серверам и клиентам работать, даже если одна из сторон в данный момент недоступна. Publish/subscribe также реализует (как правило) фильтрацию на клиентской стороне, позволяя клиенту подписываться либо на конкретные темы (topics) и каналы (channels) («Уведомлять меня о заказах на покупку»), либо на атрибуты, сопоставленные с различными видами контента («Уведомлять меня о любых срочных запросах»).

Но одна проблема остается. Во всех реализациях шаблона Observer наблюдается тенденция к жесткому связыванию клиентов и серверов с конкретным форматом сообщений. Изменение формата сообщения в большинстве реализаций publish/subscribe может оказаться затруднительным, так как на использование нового формата нужно обновить все клиенты.

Во многих отношениях это аналогично описанию серверного курсора в базе данных. Чтобы свести к минимуму издержки передачи данных, сервер базы данных не возвращает результаты по мере выборки каждой записи. Однако в случае больших наборов записей база данных не возвращает все записи одним пакетом. Вместо этого сервер базы данных обычно возвращает подмножества из курсора, хранящегося на сервере, по мере того, как эти подмножества становятся доступны. В случае базы данных клиент и сервер не обязательно должны присутствовать одновременно: сервер базы данных может выполняться, когда никаких клиентов нет; клиент может проверять, доступен ли сервер, и, если нет, заняться чем-то другим (если в том есть необходимость). Процесс фильтрации (в SQL) тоже очень гибкий. Но, если в СУБД изменяется формат, используемый для возврата записей, все клиенты придется, как минимум, перекомпилировать.

Обработка кеша объектов

Поскольку я рассматриваю простую реализацию шаблона Observer, в качестве сервера я использую класс, который ищет в памяти кеш счетов на оплату (invoices). Этот сервер мог бы в конце обработки возвращать набор всех счетов. Однако я предпочел бы, чтобы клиент обрабатывал счета индивидуально и параллельно с процессом поиска, выполняемым сервером. Это означает, что я выбираю такую версию процесса, которая возвращает каждый счет по мере его нахождения и позволяет клиенту обрабатывать каждый счет параллельно с поиском следующего счета.

Простая реализация сервера может выглядеть так:

private List<Invoice> foundInvoices = new List<Invoice>();
public List<Invoice> FindInvoices(decimal Amount)
{
  foundInvoices.Clear();
  Invoice inv;
    // ...логика поиска для добавления счетов в набор
     foundInvoices.Add(inv);
    // ...повторяем, пока не будут найдены все счета
    return foundInvoices;
}

Более изощренные решения могли бы использовать yield return для возврата каждого счета по мере его нахождения вместо того, чтобы собирать счета в список. Так или иначе, клиент, вызывающий метод FindInvoices, захочет выполнить некоторые важные операции до и после обработки. Например, как только найден первый элемент, клиенту может понадобиться список MatchingInvoices для хранения счетов на своей стороне или для запроса/инициализации любых ресурсов, требуемых для обработки счета. По мере добавления дополнительных счетов клиенту понадобилось бы обрабатывать каждый счет и, когда сервер оповещает о выборке последнего счета, освобождать любые ресурсы, поскольку обрабатывать больше нечего.

При выборке из базы данных, например, операция чтения будет блокировать остальную работу, пока не будет возвращена первая запись. После возврата первой записи клиент инициализирует любые ресурсы, необходимые для обработки этой записи. Кроме того, операция чтения возвращает false, когда извлекается последняя запись, позволяя клиенту освободить занятые ресурсы, так как больше нет записей для обработки.

Создание простых решений с помощью ObservableCollection

Наиболее очевидный выбор при реализации шаблона Observer в .NET Framework — ObservableCollection. Он будет уведомлять клиент (через какое-либо событие) всякий раз, когда происходит изменение.

Чтобы переписать мой пример сервера под использование ObservableCollection, достаточно всего двух изменений. Во-первых, набор, хранящий результаты, нужно определить как ObservableCollection и сделать его открытым. Во-вторых, больше нет необходимости в методе, который возвращает результат: серверу достаточно добавлять счета в набор.

Новая реализация сервера могла бы выглядеть следующим образом:

public List<Invoice> FindInvoices(decimal Amount)
{
  public ObservableCollection<Invoice> foundInvoices =
    new ObservableCollection<Invoice>();

  public void FindInvoices(decimal Amount)
  {
    foundInvoices.Clear();
    Invoice inv;
    // ...логика поиска для присваивания inv
     foundInvoices.Add(inv);
    // ...повторяем, пока все счета не будут добавлены в набор
  }

Клиенту, использующему эту версию сервера, достаточно подключить обработчик к событию CollectionChanged набора foundInvoices в InvoiceManagement. В следующем коде показан класс, реализующий интерфейс IDisposable для поддержки отключения от события:

public class SearchInvoices: IDisposable
{
  InvoiceManagement invMgmt = new InvoiceManagement();

  public void SearchInvoices()
  {
    invMgmt.foundInvoices.CollectionChanged += InvoicesFound;
  }
  public void Dispose()
  {
    invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged;
  }

В клиенте событие CollectionChanged передает объект NotifyCollectionChangedEventArgs как свой второй параметр. Свойство Action этого объекта указывает, какое изменение произошло в наборе (очистка набора, добавление новых элементов, перемещение/замена/удаление существующих элементов), и сообщает информацию об измененных элементах (набор добавленных элементов, набор элементов, присутствовавших в наборе до добавления новых элементов, позицию перемещенного/удаленного/замененного элемента).

Простой код в клиенте, который будет асинхронно обрабатывать каждый счет по мере его добавления в набор на сервере, приведен на рис. 1.

Рис. 1. Асинхронная обработка счетов с помощью ObservableCollection

private async void InvoicesFound(object sender,
  NotifyCollectionChangedEventArgs e)
{
  switch (e.Action)
  {
    case NotifyCollectionChangedAction.Reset:
      {
        // ...обработка начального элемента
        return;
      }
    case NotifyCollectionChangedAction.Add:
      {
        foreach (Invoice inv in e.NewItems)
        {
          await HandleInvoiceAsync(inv);
        }
        return;
      }
  }
}

Хоть и простой, этот код может оказаться неадекватным для ваших потребностей, особенно если вы обрабатываете длительно выполняемый процесс или работаете в динамической среде. Например, с точки зрения асинхронного дизайна, этот код мог бы захватывать объект Task, возвращаемый HandleInvoiceAsync, чтобы клиент был в состоянии управлять асинхронными задачами. Кроме того, вы должны удостовериться, что событие CollectionChanged генерируется в UI-потоке, даже если FindInvoices выполняется в фоновом потоке.

Благодаря тому, где вызывается метод Clear в классе сервера (перед самым началом поиска первого Invoice), значение Reset свойства Action можно использовать как оповещение о том, что начинается поиск первого элемента. Но, разумеется, может случиться так, что при поиске не будет найдено никаких счетов, поэтому использование Reset Action способно заставить клиент выделить ресурсы, которые на самом деле никогда не будут задействованы. Для реального уведомления о начале обработки первого элемента вам понадобилось бы добавить какой-то флаг, чтобы обработка Add Action выполнялась только при нахождении первого элемента.

Более того, сервер имеет ограниченное количество вариантов для того, чтобы сообщать о нахождении последнего Invoice и чтобы клиент соответственно мог прекратить ожидание очередного элемента. Сервер мог бы, допустим, очищать набор после нахождения последнего элемента, но это привело бы к еще большему усложнению обработки в Reset Action. (Обрабатывал ли я объекты Invoice? Если да, значит, я обработал последний Invoice. Если нет, тогда я приступаю к обработке первого Invoice.)

Хотя для простых задач ObservableCollection отлично подходит, любая достаточно сложная реализация на основе ObservableCollection (и любое приложение, где важна эффективность работы) потребует весьма изощренного кода, особенно на клиентской стороне.

Решения на основе Rx

Если вам нужна асинхронная обработка, то Rx (доступная через NuGet) может предоставить более эффективное решение для реализации шаблона Observer, заимствованное из модели публикации/подписки. Это решение также предоставляет модель фильтрации на основе LINQ, более удобное оповещение о первом/последнем элементе и более эффективную обработку ошибок.

Rx также может обрабатывать более интересные реализации Observer, чем возможные на основе ObservableCollection. В моем случае после возврата начального списка счетов сервер мог бы продолжить проверку на новые счета, которые добавляются в кеш по завершении первого поиска (и которые, разумеется, соответствуют критериям поиска). Когда появляется счет, отвечающий критериям, клиент нужно будет уведомить об этом событии, чтобы новый счет был добавлен в список. Rx поддерживает эти виды расширений шаблона Observer на основе событий, которые более эффективны, чем ObservableCollection.

В Rx есть два ключевых интерфейса для поддержки шаблона Observer. Первый — IObservable<T>, реализуемый сервером и определяющий единственный метод, Subscribe. Серверу, реализующему метод Subscribe, будет передана ссылка на объект от клиента. Для устранения проблемы с удалением слушателя (lapsed listener problem) метод Subscribe возвращает клиенту ссылку на объект, реализующий интерфейс IDisposable. Клиент может использовать этот объект для отключения от сервера. При отключении клиента сервер удалит этот клиент изо всех своих внутренних списков.

Второй интерфейс — IObserver<T>, который должен быть реализован клиентом. Этот интерфейс требует от клиента реализовать и предоставить три метода серверу: OnNext, OnCompleted и OnError. Самый важный здесь метод — OnNext, используемый сервером для передачи сообщения клиенту (в моем случае это были бы новые объекты Invoice, возвращаемые по мере их появления). Метод OnCompleted клиента может быть задействован сервером для оповещения о том, что данных больше нет. Третий метод, OnError, обеспечивает серверу возможность уведомлять клиент о том, что произошло исключение.

Конечно, вы можете сами реализовать интерфейс IObserver (он является частью .NET Framework). Наряду с ObservableCollection этого может оказаться достаточно, если вы создаете синхронное решение. Однако Rx включает несколько пакетов, которые предоставляют асинхронные реализации этих интерфейсов, в том числе реализации для JavaScript и сервисов RESTful. Rx-класс Subject предоставляет реализацию IObservable, упрощающую реализацию асинхронной версии модели publish/subscribe шаблона Observer.

Создание асинхронного решения

Создание сервера для работы с объектом Subject требует минимальных изменений в исходном синхронном коде на серверной стороне. Я заменяю старый ObservableCollection объектом Subject, который будет передавать каждый Invoice по мере его появления всем прослушивающим клиентам. Я объявляю объект Subject открытым (public), чтобы к нему могли обращаться все клиенты:

public class InvoiceManagement
{
  public IObservable<Invoice> foundInvoice =
    new Subject<Invoice>();

В теле метода вместо добавления счета в набор я использую метод OnNext из Subject для передачи клиенту каждого счета по мере нахождения:

public void FindInvoices(decimal Amount)
{
  inv = GetInvoicesForAmount(Amount) // опрос наличия счетов
  foundInvoice.OnNext(inv);
  // ...повторяем...
}

В своем клиенте я сначала объявляю экземпляр класса сервера. Затем в методе, помеченном как async, я вызываю метод Subscribe класса Subject, сообщая, что я хочу получать сообщения:

public class InvoiceManagementTests
{
  InvoiceManagement invMgmt = new InvoiceManagement();

  public async void ProcessInvoices()
  {
    invMgmt.foundInvoice.Subscribe<Invoice>();

Чтобы фильтровать результаты для получения только нужных мне счетов, я могу применить LINQ-выражение к объекту Subject. В примере ниже фильтрация осуществляется так, чтобы я получал только неоплаченные счета (для использования расширений Rx LINQ вам понадобится добавить выражение using для пространства имен System.Reactive.Linq):

invMgmt.foundInvoice.Where(i => i.BackOrder ==
  "BackOrder").Subscribe();

Начав прослушивать субъект, можно указать, какая обработка должна выполняться при получении счета. Я могу, например, использовать FirstAsync для обработки только первого счета, возвращенного сервисом. В этом примере я использую выражение await с вызовом FirstAsync, чтобы была возможность вернуть управление основному телу приложения на время обработки счета. Этот код ждет получения первого счета, затем переходит к тому коду, который используется мной для инициализации процесса обработки счета, и, наконец, обрабатывает счет:

Invoice inv;
inv = await invMgmt.foundInvoice.FirstAsync();
HandleInvoiceAsync(inv);

Один подвох: FirstAsync будет блокировать приложение, если сервер еще не сгенерировал какие-либо результаты. Если вы хотите избежать блокировки, то можете использовать FirstOrDefaultAsync, которая вернет null, если на сервере пока нет результатов.

Более типичный случай — клиент хочет обработать все возвращенные счета (после фильтрации) и делать это асинхронно. Тогда вместо использования комбинации Subscribe и OnNext вы можете просто вызывать метод ForEachAsync. Вы можете передать метод или лямбда-выражение, которое обрабатывает входящие результаты. Если вы передаете метод (который не может быть асинхронным), как я делаю здесь, то этому методу будет передат счет, который инициировал ForEachAsync:

invMgmt.foundInvoice.ForEachAsync(HandleInvoice);

Методу ForEachAsync также можно передать маркер отмены, чтобы позволить клиенту уведомлять о своем отключении. Хорошей практикой считается передача маркера при вызове любого из Rx-методов *Async для поддержки завершения обработки клиентом, не дожидаясь окончания обработки всех объектов.

ForEachAsync не будет обрабатывать результат, уже обработанный методом First (или FirstOrDefaultAsync), поэтому вы можете использовать FirstOrDefaultAsync с ForEachAsync для проверки того, есть ли у сервера что-то для обработки, прежде чем обрабатывать последующие объекты. Однако метод IsEmpty в Subject заметно упрощает ту же проверку. Если клиент должен выделить какие-то ресурсы, необходимые для обработки результатов, IsEmpty позволяет клиенту проверить, есть ли для него работа, до выделения этих ресурсов (альтернативой было бы выделение этих ресурсов при обработке первого элемента в цикле). Применяя IsEmpty с клиентом, проверяющим наличие результатов до выделения ресурсов (и начала обработки), обеспечивает и отмену операций (рис. 2).

Рис. 2. Код для поддержки отмены и откладывания обработки до готовности результатов

CancellationTokenSource cancelSource =
  new CancellationTokenSource();

CancellationToken cancel;
cancel = cancelSource.Token;
if (!await invMgmt.foundInvoice.IsEmpty())
{
  // ...подготовительный код для обработки счетов...
  try
  {
    invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel);
  }
  catch (Exception ex)
  {
    if (ex.GetType() != typeof(CancellationToken))
    {
      // ...выдача сообщения
    }
   }
   // Код очистки после обработки всех счетов
   // или отключения клиента
}

Заключение

Если вам нужна лишь простая реализация шаблона Observer, тогда ObservableCollection обеспечит все, что нужно для обработки потока результатов. Для более эффективного контроля и для приложения на основе событий следует использовать класс Subject и расширения, поставляемые с Rx; это позволит приложению работать в асинхронном режиме с поддержкой мощной реализации модели publish/subscribe (не говоря уже о богатой библиотеке операторов, предлагаемой Rx). Если вы работаете с Rx, стоит скачать Rx Design Guide (bit.ly/1VOPxGS), где даны рекомендации по использованию и генерации наблюдаемых потоков.

Rx также обеспечивает некоторую поддержку преобразования типа сообщений, передаваемых между клиентом и сервером, благодаря использованию интерфейса ISubject<TSource, TResult>. Этот интерфейс определяет два типа данных: in и out. Внутри класса Subject, который реализует этот интерфейс, можно выполнять любые операции, необходимые для преобразования результата от сервера (тип данных in) в результат, требуемый клиентом (тип данных out). Более того, параметр in является ковариантным (принимает указанный тип данных или все, от чего наследует этот тип данных), а параметр out — контравариантным (принимает указанный тип данных или все, что наследует от него), что дает дополнительную гибкость.

Мы живем в мире, где асинхронность применяется все шире и шире, и в этом мире шаблон Observer становится более важным — это полезное средство для любого интерфейса между процессами, где сервер возвращает более одного результата. К счастью, у вас есть несколько вариантов реализации шаблона Observer в .NET Framework, в том числе применение ObservableCollection и Rx.


Питер Вогел*(Peter Vogel) — архитектор систем и руководитель в PH&V Information Services. PH&V предоставляет комплексные консалтинговые услуги в самых разных областях — от дизайна UX до объектного моделирования и проектирования баз данных.*

Выражаю благодарность за рецензирование статьи экспертам Microsoft Стивену Клири (Stephen Cleary), Джеймсу Маккафри (James McCaffrey) и Дэйву Секстону (Dave Sexton).