异步消息块

代理库提供了多个消息块类型,以允许您以线程安全的方式在应用程序组件之间传播消息。 这些消息块类型通常用于不同的消息传递例程,如 concurrency::sendconcurrency::asendconcurrency::receive,和 concurrency::try_receive。 有关代理库定义的消息传递例程的更多信息,请参见消息传递函数

各节内容

本主题包含以下各节:

  • 源和目标

  • 消息传播

  • 消息块类型概述

  • unbounded_buffer 类

  • overwrite_buffer 类

  • single_assignment 类

  • call 类

  • transformer 类

  • choice 类

  • join 类和 multitype_join 类

  • timer 类

  • 消息筛选

  • 消息保留

源和目标

源和目标是消息传递中的两个重要参与者。 源是指发送消息的通信终结点。 目标是指接收消息的通信终结点。 可以将源视为从中读取的终结点,并将目标视为写入到的终结点。 应用程序将源和目标连接起来构成消息传递网络。

代理库使用这两个抽象类表示源和目标: concurrency::ISourceconcurrency::ITarget。 用作源的消息块类型从 ISource 派生;用作目标的消息块类型从 ITarget 派生。 充当源和目标的消息块类型派生自 ISourceITarget

Top

消息传播

消息传播是将消息从一个组件发送到另一个组件的操作。 当向消息块提供消息时,消息块可以接受、拒绝或推迟该消息。 每个消息块类型以不同的方式存储和传输消息。 例如,unbounded_buffer 类存储不限数量的消息,overwrite_buffer 类一次存储一个消息,而转换器类存储每个消息的更改版本。 本文档稍后将详细介绍这些消息块类型。

当消息块接受消息时,它可以选择执行工作,如果消息块是源,则它也可以选择将生成的消息传递给网络的其他成员。 消息块可以使用筛选功能拒绝它不想接收的消息。 稍后将在本主题的消息筛选一节中详细介绍筛选器。 推迟消息的消息块可以保留该消息并稍后再使用。 稍后将在本主题的消息保留一节中详细介绍消息保留。

代理库允许消息块异步或同步传递消息。 例如,当您使用 send 函数同步传递消息至消息块时,运行时会阻止当前上下文直到目标块接受或拒绝该消息。 如果您使用 asend 函数异步传递消息至消息块,运行时会将消息提供给目标,并且如果目标接受消息,运行时会计划将消息传播至接收方的异步任务。 运行时使用轻量级任务以协作的方式传播消息。 有关轻量级任务的更多信息,请参见任务计划程序(并发运行时)

应用程序将源和目标连接起来构成消息传递网络。 通常,可链接网络并调用 sendasend 以将数据传递至网络。 若要连接到目标的源消息块,调用 concurrency::ISource::link_target 方法。 若要断开目标源块,请调用 concurrency::ISource::unlink_target 方法。 若要断开所有目标源块,调用 concurrency::ISource::unlink_targets 方法。 当其中一个预定义的消息块类型脱离范围或遭到损坏时,它会自动中断与所有目标块的连接。 某些消息块类型限制它们可以写入的最大目标数。 下节介绍应用于预定义消息块类型的限制。

Top

消息块类型概述

下表简要介绍了重要消息块类型的作用。

  • unbounded_buffer
    存储消息队列。

  • overwrite_buffer
    存储可多次读取和写入的消息。

  • single_assignment
    存储可写入一次并可读取多次的消息。

  • call
    在收到消息时执行操作。

  • transformer
    在收到数据时执行操作,并将操作结果发送到另一个目标块。 transformer 类可对不同的输入和输出类型进行操作。

  • choice
    从一组源中选择第一个可用的消息。

  • join 和 multitype join
    等到接收完来自一组源的所有消息后,在另一个消息块中将这些消息合并为一条消息。

  • timer
    定期将消息发送到目标块。

这些消息块类型具有不同的特性,从而使它们能够适用于不同的情况。 以下是其中的一些特性:

  • 传播类型:消息块是用作数据源、数据接收方还是同时用作数据源和数据接收方。

  • 消息顺序:消息块是否维持发送或接收消息的原始顺序。 每个预定义的消息块类型将维持其发送或接收消息的原始顺序。

  • 源计数:消息块可从中进行读取的源的最大数目。

  • 目标计数:消息块可写入到的目标的最大数目。

下表说明这些特性如何与各种消息块类型相关。

消息块类型

传播类型(源、目标或源和目标)

消息顺序(有序或无序)

源计数

目标计数

unbounded_buffer

高度和宽度

有序

无限制

无限制

overwrite_buffer

高度和宽度

有序

无限制

无限制

single_assignment

高度和宽度

有序

无限制

无限制

call

Target

有序

无限制

不适用

transformer

高度和宽度

有序

无限制

1

choice

高度和宽度

有序

10

1

join

高度和宽度

有序

无限制

1

multitype_join

高度和宽度

有序

10

1

timer

不适用

不适用

1

以下几节将更详细地介绍消息块类型。

Top

unbounded_buffer 类

Concurrency::unbounded_buffer 类表示一个通用异步消息传递结构。 此类存储可由多个源写入或由多个目标读取的消息的先进先出 (FIFO) 队列。 在目标收到来自 unbounded_buffer 对象的某条消息的同时,将从消息队列中删除此消息。 因此,虽然一个 unbounded_buffer 对象可以具有多个目标,但只有一个目标将收到每条消息。 在您需要将多条消息传递给另一个组件且该组件必须收到每条消息时,unbounded_buffer 类很有用。

Dd504833.collapse_all(zh-cn,VS.110).gif示例

下面的示例演示如何使用 unbounded_buffer 类的基本结构。 此示例将三个值发送至 unbounded_buffer 对象,然后从同一个对象读回这些值。

// unbounded_buffer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an unbounded_buffer object that works with
   // int data.
   unbounded_buffer<int> items;

   // Send a few items to the unbounded_buffer object.
   send(items, 33);
   send(items, 44);
   send(items, 55);

   // Read the items from the unbounded_buffer object and print
   // them to the console.
   wcout << receive(items) << endl;
   wcout << receive(items) << endl;
   wcout << receive(items) << endl;
}

该示例产生下面的输出:

33
44
55

有关演示如何使用 unbounded_buffer 类的完整示例,请参见如何:实现各种制造者-使用者模式

Top

overwrite_buffer 类

Concurrency::overwrite_buffer 类类似于unbounded_buffer类,除了overwrite_buffer对象存储只是一条消息。 此外,在目标收到来自 overwrite_buffer 对象的某条消息时,将不会从缓冲区中删除此消息。 因此,多个目标将收到此消息的副本。

若您需要将多条消息传递给另一个组件,而该组件只需要最新的值,则 overwrite_buffer 类很有用。 当您需要将一条消息广播给多个组件时,此类也很有用。

Dd504833.collapse_all(zh-cn,VS.110).gif示例

下面的示例演示如何使用 overwrite_buffer 类的基本结构。 此示例将三个值发送至 overwrite _buffer 对象,然后三次从同一个对象读取当前值。 此示例与 unbounded_buffer 类的示例相似。 但是,overwrite_buffer 类仅存储一条消息。 另外,读取消息之后运行时不从 overwrite_buffer 对象中删除该消息。

// overwrite_buffer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an overwrite_buffer object that works with
   // int data.
   overwrite_buffer<int> item;

   // Send a few items to the overwrite_buffer object.
   send(item, 33);
   send(item, 44);
   send(item, 55);

   // Read the current item from the overwrite_buffer object and print
   // it to the console three times.
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
}

该示例产生下面的输出:

55
55
55

有关演示如何使用 overwrite_buffer 类的完整示例,请参见如何:实现各种制造者-使用者模式

Top

single_assignment 类

Concurrency::single_assignment 类类似于overwrite_buffer类,除了single_assignment可以一次只写入对象。 与 overwrite_buffer 类相似,当目标收到来自 single_assignment 对象的某条消息时,将不会从该对象中删除此消息。 因此,多个目标将收到此消息的副本。 当您需要将一条消息广播给多个组件时,single_assignment 类很有用。

Dd504833.collapse_all(zh-cn,VS.110).gif示例

下面的示例演示如何使用 single_assignment 类的基本结构。 此示例将三个值发送至 single_assignment 对象,然后三次从同一个对象读取当前值。 此示例与 overwrite_buffer 类的示例相似。 尽管 overwrite_buffersingle_assignment 这两个类都存储一条消息,但是 single_assignment 类仅可被写入一次。

// single_assignment-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an single_assignment object that works with
   // int data.
   single_assignment<int> item;

   // Send a few items to the single_assignment object.
   send(item, 33);
   send(item, 44);
   send(item, 55);

   // Read the current item from the single_assignment object and print
   // it to the console three times.
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
   wcout << receive(item) << endl;
}

该示例产生下面的输出:

33
33
33

有关演示如何使用 single_assignment 类的完整示例,请参见演练:实现 Future

Top

call 类

Concurrency::call 类用作消息接收者的接收数据时执行工作的功能。 此工作函数可以是 lambda 表达式、函数对象或函数指针。 call 对象的行为与普通函数调用的行为不同,这是因为它以并行方式对向它发送消息的其他组件进行操作。 如果 call 对象在收到一条消息时执行工作函数,则它会将此消息添加到一个队列中。 每个 call 对象都会按照队列消息的接收顺序来处理这些消息。

Dd504833.collapse_all(zh-cn,VS.110).gif示例

下面的示例演示如何使用 call 类的基本结构。 此示例创建将其收到的每个值输出到控制台的 call 对象, 然后将三个值发送至 call 对象。 由于 call 对象在单独的线程上处理消息,因此本示例还使用计数器变量和事件对象确保 call 对象在 wmain 函数返回之前处理所有消息。

// call-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // An event that is set when the call object receives all values.
   event received_all;

   // Counts the 
   long receive_count = 0L;
   long max_receive_count = 3L;

   // Create an call object that works with int data.
   call<int> target([&received_all,&receive_count,max_receive_count](int n) {
      // Print the value that the call object receives to the console.
      wcout << n << endl;

      // Set the event when all messages have been processed.
      if (++receive_count == max_receive_count)
         received_all.set();
   });

   // Send a few items to the call object.
   send(target, 33);
   send(target, 44);
   send(target, 55);

   // Wait for the call object to process all items.
   received_all.wait();
}

该示例产生下面的输出:

33
44
55

有关演示如何使用 call 类的完整示例,请参见如何:为 call 和 transformer 类提供工作函数

Top

transformer 类

Concurrency::transformer 作为这两个消息接收器和邮件发件人的类行为。 transformer 类与 call 类相似,因为它在收到数据时会执行用户定义的工作函数。 不过,transformer 类还会将工作函数的结果发送给接收方对象。 与 call 对象类似,transformer 对象以并行方式对向它发送消息的其他组件进行操作。 如果 transformer 对象在收到一条消息时执行操作,则它会将此消息添加到一个队列中。 每个 transformer 对象都会按照其队列消息的接收顺序来处理这些消息。

transformer 类会将其消息发送给一个目标。 如果设置了_PTarget参数来构造函数中的NULL,您以后可以通过调用指定目标 concurrency::link_target 方法。

与代理库提供的所有其他异步消息块类型不同,transformer 类可以对不同的输入和输出类型进行操作。 transformer 类所具有的将数据从一个类型转换为另一个类型的能力使其成为许多并发网络中的主要组件。 此外,可以在 transformer 对象的工作函数中添加更精细的并行功能。

Dd504833.collapse_all(zh-cn,VS.110).gif示例

下面的示例演示如何使用 transformer 类的基本结构。 此示例创建一个 transformer 对象,此对象将每个输入 int 值都乘以 0.33 以生成 double 值作为输出。 然后此示例从相同的 transformer 对象接收转换后的值,并将它们输出到控制台。

// transformer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an transformer object that receives int data and 
   // sends double data.
   transformer<int, double> third([](int n) {
      // Return one-third of the input value.
      return n * 0.33;
   });

   // Send a few items to the transformer object.
   send(third, 33);
   send(third, 44);
   send(third, 55);

   // Read the processed items from the transformer object and print
   // them to the console.
   wcout << receive(third) << endl;
   wcout << receive(third) << endl;
   wcout << receive(third) << endl;
}

该示例产生下面的输出:

10.89
14.52
18.15

有关演示如何使用 transformer 类的完整示例,请参见如何:在数据管道中使用转换器

Top

choice 类

Concurrency::choice 类从一组源中选择的第一个可用消息。 choice 类表示控制流机制,而不是数据流机制(异步代理库这一主题介绍了数据流和控制流之间的差异)。

从 choice 对象中进行读取与在 bWaitAll 参数设置为 FALSE 的情况下调用 Windows API 函数 WaitForMultipleObjects 的情形类似。 不过,choice 类会将数据绑定到事件本身,而不是外部同步对象。

通常,您可以使用choice类一起 concurrency::receive 函数来推动您的应用程序中的控制流。 当您必须在具有不同类型的消息缓冲区之间进行选择时,请使用 choice 类。 当您必须在具有相同类型的消息缓冲区之间进行选择时,请使用 single_assignment 类。

将多个源链接到一个 choice 对象时所使用的顺序很重要,因为此顺序可确定选择哪一条消息。 例如,假定您将已包含某条消息的多个消息缓冲区链接到一个 choice 对象。 此 choice 对象将从它链接到的第一个源中选择该消息。 在链接所有源后,此 choice 对象将保留每个源接收消息的顺序。

Dd504833.collapse_all(zh-cn,VS.110).gif示例

下面的示例演示如何使用 choice 类的基本结构。 本示例使用 concurrency::make_choice 函数来创建choice选择三个消息块中的对象。 然后计算不同的 Fibonacci 数并将各个结果存储在不同的消息块中。 此示例然后向控制台输出基于最先完成操作的消息。

// choice-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
   if (n < 2)
      return n;
   return fibonacci(n-1) + fibonacci(n-2);
}

int wmain()
{
   // Although the following thee message blocks are written to one time only, 
   // this example illustrates the fact that the choice class works with 
   // different message block types.

   // Holds the 35th Fibonacci number.
   single_assignment<int> fib35;
   // Holds the 37th Fibonacci number.
   overwrite_buffer<int> fib37;
   // Holds half of the 42nd Fibonacci number.
   unbounded_buffer<double> half_of_fib42;   

   // Create a choice object that selects the first single_assignment 
   // object that receives a value.
   auto select_one = make_choice(&fib35, &fib37, &half_of_fib42);

   // Execute a few lengthy operations in parallel. Each operation sends its 
   // result to one of the single_assignment objects.
   parallel_invoke(
      [&fib35] { send(fib35, fibonacci(35)); },
      [&fib37] { send(fib37, fibonacci(37)); },
      [&half_of_fib42] { send(half_of_fib42, fibonacci(42) * 0.5); }
   );

   // Print a message that is based on the operation that finished first.
   switch (receive(select_one))
   {
   case 0:
      wcout << L"fib35 received its value first. Result = " 
            << receive(fib35) << endl;
      break;
   case 1:
      wcout << L"fib37 received its value first. Result = " 
            << receive(fib37) << endl;
      break;
   case 2:
      wcout << L"half_of_fib42 received its value first. Result = " 
            << receive(half_of_fib42) << endl;
      break;
   default:
      wcout << L"Unexpected." << endl;
      break;
   }
}

此示例产生下面的示例输出:

fib35 received its value first. Result = 9227465

由于计算第 35 个 Fibonacci 数的任务并不一定最先完成,因此本示例的输出可能会有所不同。

本示例使用 concurrency::parallel_invoke 算法计算斐波那契并行。 有关 parallel_invoke的更多信息,请参见并行算法

有关演示如何使用 choice 类的完整示例,请参阅如何:在已完成的任务之间选择

Top

join 类和 multitype_join 类

Concurrency::joinconcurrency::multitype_join 类使您可以等待每个成员集的源接收的消息。 join 类将对具有常用消息类型的源对象进行操作。 The multitype_join 类将对可具有不同消息类型的源对象进行操作。

joinmultitype_join 对象读取与调用 Windows API 函数 WaitForMultipleObjects 相似(当其将 bWaitAll 参数设置为 TRUE 时)。 不过,与 choice 对象类似,joinmultitype_join 对象使用的是将数据绑定到事件本身(而不是外部同步对象)的事件机制。

join 对象读取会生成 std::vector 对象。 从 multitype_join 对象读取会生成 std::tuple 对象。 这些对象中的元素的显示顺序与其对应的源缓冲区链接到 joinmultitype_join 对象的顺序相同。 由于您将链接源缓冲区链接到 joinmultitype_join 对象的顺序与生成的 vectortuple 对象中元素的顺序关联,因此建议您不要从联接取消现有源缓冲区的链接。 这么做可能会导致出现非指定的行为。

Dd504833.collapse_all(zh-cn,VS.110).gif贪婪联接与非贪婪联接

joinmultitype_join 类支持贪婪联接和非贪婪联接概念。 贪婪联接将接受来自其每个源的消息(从消息开始可用一直到所有消息可用)。 非贪婪联接接收消息的过程分为以下两个阶段。 首先,非贪婪的联接会一直等待,直到从其各个源向其提供了消息。 另外,当所有源消息都可用后,非贪婪的联接会尝试保留所有这些消息。 如果它可以保留所有消息,则使用所有消息并将它们传播到其目标。 否则,它会释放或取消消息保留并重新等待各个源接收消息。

贪婪联接的性能高于非贪婪联接的性能,这是因为贪婪联接会立即接受消息。 但是,在很少的情况下,贪婪联接可能会导致死锁。 当您有多个包含一个或多个共享源对象的联接时,请使用非贪婪联接。

Dd504833.collapse_all(zh-cn,VS.110).gif示例

下面的示例演示如何使用 join 类的基本结构。 本示例使用 concurrency::make_join 函数来创建join接收来自三个对象single_assignment对象。 此示例计算不同的 Fibonacci 数,并将各个结果存储在不同的 single_assignment 对象中,然后将 join 对象容纳的各个结果输出到控制台。 此示例与 choice 类的示例相似,但 join 类会等待所有源消息块接收消息。

// join-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
   if (n < 2)
      return n;
   return fibonacci(n-1) + fibonacci(n-2);
}

int wmain()
{
   // Holds the 35th Fibonacci number.
   single_assignment<int> fib35;
   // Holds the 37th Fibonacci number.
   single_assignment<int> fib37;
   // Holds half of the 42nd Fibonacci number.
   single_assignment<double> half_of_fib42;   

   // Create a join object that selects the values from each of the
   // single_assignment objects.
   auto join_all = make_join(&fib35, &fib37, &half_of_fib42);

   // Execute a few lengthy operations in parallel. Each operation sends its 
   // result to one of the single_assignment objects.
   parallel_invoke(
      [&fib35] { send(fib35, fibonacci(35)); },
      [&fib37] { send(fib37, fibonacci(37)); },
      [&half_of_fib42] { send(half_of_fib42, fibonacci(42) * 0.5); }
   );

   auto result = receive(join_all);
   wcout << L"fib35 = " << get<0>(result) << endl;
   wcout << L"fib37 = " << get<1>(result) << endl;
   wcout << L"half_of_fib42 = " << get<2>(result) << endl;
}

该示例产生下面的输出:

fib35 = 9227465
fib37 = 24157817
half_of_fib42 = 1.33957e+008

本示例使用 concurrency::parallel_invoke 算法计算斐波那契并行。 有关 parallel_invoke的更多信息,请参见并行算法

有关演示如何使用 join 类的完整示例,请参见如何:在已完成的任务之间选择演练:使用联接避免死锁

Top

timer 类

Concurrency::timer 类用作消息来源。 timer 对象将在指定的时间段过后向目标发送消息。 当您必须推迟发送消息或需要定期发送消息时,timer 类很有用。

timer 类仅将其消息发送给一个目标。 如果设置了_PTarget参数来构造函数中的NULL,您以后可以通过调用指定目标 concurrency::ISource::link_target 方法。

timer 对象可以是重复的,也可以是不重复的。 若要创建重复的 timer,请在调用构造函数时为 _Repeating 参数传递 true。 否则,请为 _Repeating 参数传递 false 以创建不重复的 timer。 如果 timer 是重复的,则它会在每个时间间隔过后向其目标发送同一消息。

代理库会创建处于未启动状态的 timer 对象。 若要启动计时器对象,请调用 concurrency::timer::start 方法。 若要停止timer对象,请销毁的对象调用 concurrency::timer::stop 方法。 若要暂停计时器重复调用 concurrency::timer::pause 方法。

Dd504833.collapse_all(zh-cn,VS.110).gif示例

下面的示例演示如何使用 timer 类的基本结构。 此示例使用 timercall 对象报告长操作的进度。

// timer-structure.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

// Computes the nth Fibonacci number.
// This function illustrates a lengthy operation and is therefore
// not optimized for performance.
int fibonacci(int n)
{
   if (n < 2)
      return n;
   return fibonacci(n-1) + fibonacci(n-2);
}

int wmain()
{
   // Create a call object that prints characters that it receives 
   // to the console.
   call<wchar_t> print_character([](wchar_t c) {
      wcout << c;
   });

   // Create a timer object that sends the period (.) character to 
   // the call object every 100 milliseconds.
   timer<wchar_t> progress_timer(100u, L'.', &print_character, true);

   // Start the timer.
   wcout << L"Computing fib(42)";
   progress_timer.start();

   // Compute the 42nd Fibonacci number.
   int fib42 = fibonacci(42);

   // Stop the timer and print the result.
   progress_timer.stop();
   wcout << endl << L"result is " << fib42 << endl;
}

此示例产生下面的示例输出:

Computing fib(42)..................................................
result is 267914296

有关演示如何使用 timer 类的完整示例,请参见如何:定期发送消息

Top

消息筛选

当您创建消息块对象时,可以提供确定消息块是接受还是拒绝消息的筛选功能。 筛选功能很有用,它可以保证消息块只接收特定值。

下面的示例演示如何创建使用筛选功能仅接受偶数的 unbounded_buffer 对象。 unbounded_buffer 对象拒绝奇数,因此不传播奇数给其目标块。

// filter-function.cpp
// compile with: /EHsc
#include <agents.h>
#include <iostream>

using namespace concurrency;
using namespace std;

int wmain()
{
   // Create an unbounded_buffer object that uses a filter
   // function to accept only even numbers.
   unbounded_buffer<int> accept_evens(
      [](int n) {
         return (n%2) == 0;
      });

   // Send a few values to the unbounded_buffer object.
   unsigned int accept_count = 0;
   for (int i = 0; i < 10; ++i)
   {
      // The asend function returns true only if the target
      // accepts the message. This enables us to determine
      // how many elements are stored in the unbounded_buffer
      // object.
      if (asend(accept_evens, i))
      {
         ++accept_count;
      }
   }

   // Print to the console each value that is stored in the 
   // unbounded_buffer object. The unbounded_buffer object should
   // contain only even numbers.
   while (accept_count > 0)
   {
      wcout << receive(accept_evens) << L' ';
      --accept_count;
   }
}

该示例产生下面的输出:

0 2 4 6 8

筛选功能可以是 lambda 函数、函数指针或函数对象。 所有筛选功能都采用以下形式之一。

bool (_Type)
bool (_Type const &)

若要取消不需要的数据复制,请在具有按值传播的聚合类型时使用第二种形式。

消息筛选支持数据流编程模型,在该模型中组件会在收到数据时执行计算。 有关使用筛选功能控制消息传递网络中的数据流的示例,请参见如何:使用消息块筛选器演练:创建数据流代理演练:创建图像处理网络

Top

消息保留

消息保留允许消息块保留消息以备日后使用。 通常,不直接使用消息保留。 但是,了解消息保留可以帮助您更好地理解某些预定义消息块类型的行为。

请考虑非贪婪和贪婪联接。 两者都使用消息保留来保留消息以备日后使用。 如之前所述,非贪婪联接按两个阶段接收消息。 第一个阶段中,非贪婪 join 对象等待其每个源接收消息。 非贪婪联接然后尝试保留所有这些消息。 如果它可以保留所有消息,则使用所有消息并将它们传播到其目标。 否则,它会释放或取消消息保留并重新等待各个源接收消息。

贪婪联接(也从很多源读取输入消息)在其等待接收各个源的消息时,会使用消息保留来读取其他消息。 例如,假设某个贪婪联接从消息块 AB 接收消息。 如果贪婪联接从 B 接收了两条消息,但尚未从 A 接收到任何消息,则贪婪联接会保存来自 B 的第二条消息的唯一消息标识符。 当贪婪联接从 A 接收到消息并将这些消息传播出去后,它会使用保存的消息标识符来查看来自 B 的第二条消息是否仍然可用。

当您实现自己的自定义消息块类型时,可以使用消息保留。 有关如何创建自定义消息块类型的示例,请参见演练:创建自定义消息块

Top

请参见

概念

异步代理库