2018 年 9 月

33 卷,第 9

此文章由机器翻译

Azure-管理使用 Azure 事件网格的事件交付

通过David Barkol

事件网格是提供一种创新方法进行路由事件在云中和更高版本的 Microsoft Azure 中完全托管的消息传递服务。它具有新解锁和如何事件驱动的解决方案的唯一模式旨在使用功能强大且灵活的发布-订阅模型。

我提供了 2018 年 2 月期刊中的 Azure 事件网格简介 (aka.ms/eventgridarticle),探讨了该服务以及如何使用它来发布和使用各种方法中的事件的基础知识。在这篇新文章,我将深入如何传递事件和选项将重试策略、 无效事件和未成功发送的事件。我深入探讨之前,它将通过事件网格在高级别中的工作方式很有用。

事件网格的简要概述

与 Azure 事件网格事件源可来自越来越多的服务在 Azure 中,例如事件中心和媒体服务中,甚至在本地运行的自定义应用程序或其他云提供商或数据中心内。使用和管理的事件网格中,这是负责消息的引入,并以其分发给每个事件订阅这些事件。事件处理程序用于对传入事件执行操作,并且可以在 Azure 中的服务。非常常见的方案是指利用 Functions 和逻辑应用等其他无服务器技术。在一起,这些高度可扩展且高度灵活的解决方案可以包含非常快速且经济而无需管理任何基础结构的负担。

事件处理程序也可以是简单的 WebHook,就像一个事件源,这意味着它可以位于任何位置,只要它支持 HTTPS,并可以接受的 POST 请求。此平台和语言无关的方法是使才刚刚开始在云中的新解决方案打开的非常特殊服务的事件网格的多个选项之一。图 1说明了如何使用事件网格连接多个源和处理程序。在 Azure 中将与事件网格集成的服务列表中不断增长和子集仅会在关系图描述。

Azure 事件网格概述
图 1 Azure 事件网格概述

事件交付和响应代码

事件网格单独将每个事件。这意味着没有保证的顺序的事件并在某些情况下,可以发送事件超过一次。因此,它是要编写防御性代码和是幂等的事件处理程序的责任。反复发送相同的事件应生成相同的结果。如果事件的排序是一项要求,这应托管任意一侧计算 (在事件处理程序的逻辑) 或使用另一个服务,如服务总线或事件中心来保留其顺序。

事件网格的事件处理程序返回的 HTTP 响应将确定如何将继续事件的管理。状态代码 200 确定和 202 已接受被视为已成功传送事件的确认。

有表明失败的传递尝试以下故障代码:400 错误的请求,401 未经授权,找不到 404,408 请求超时,414 URI 太长,500 内部服务器错误、 503 服务不可用和 504 网关超时。具体取决于故障代码中,事件网格可能会将该事件发送到终结点重试。我将深入考虑到这只是有点。

重试策略

如果未收到确认,或者返回的错误代码,将进行另一次尝试将事件发送到终结点。重试策略使用指数备份然后放置到位,以尝试事件过期之前的最后一个传递。默认情况下,事件将在未成功传送的 24 小时后过期。在后首次尝试传递计划会将关闭,使用以下时间线:10 秒,30 秒,1 分钟、 5 分钟,30 分钟和 1 小时 10 分钟数。在第一次的一小时尝试之后, 每小时,直至达到生存时间 (TTL) 一次由每个后续请求。

事件网格中的新功能,可通过设置两个可能的值来配置事件订阅的重试策略:

最大传递尝试是设置事件订阅的最大重试尝试的可配置值。其默认值和允许,最大值为 30。

事件 TTL 是对应于事件的生存时间设置的值。其默认值,以及其最大值设置为 1,440 分钟 (24 小时)。

是的这些属性可用于管理事件订阅级别的重试策略和仅为短时间内,事件是您感兴趣或有意引发错误,如 503 在高 loa 过程中保护自己时很有用d。

无效和死信通道

消息传送服务的主要职责之一是确保正确传递消息。但是,它不能保证该消息的接收方将成功它的处理。

在某些情况下,事件接收方可能会拒绝该消息,如果它不能满足特定的预期。这可能会在无效的数据类型或有效负载或未经授权的消息的窗体中。在此情况下,消息通常会移动到指定位置,通常被视为无效消息通道。

另一个常见方案是一条消息成功发送,但在接收端错误导致无法处理。这些通常是 500 级状态代码,以表明服务错误或服务不可用。通常情况下,消息传送服务将重新发送这些消息,直到达到阈值,并认为消息是未送达。死信 channelis 使用,类似于无效消息通道,以存储这些消息以及任何相关的元数据。

在这两种方案中的愿望是,将存在一些实用程序或服务,用于监视通道并知道要执行的操作的消息。这可能会在计划的报表或另一可能的事件驱动的服务,例如,逻辑应用,可以选取该消息并通知最终用户或其他系统和服务呈现。

使用事件网格时视为死信

极高的功能的 Azure 事件网格很快就已释放后是提供用于捕获无法发送的事件的机制。此功能具有最近附带趋向成熟了配置的重试策略和每个事件订阅的死信位置的功能。使用此功能,我现在可以配置到 Azure 存储帐户的消息和相关详细信息的传递,将捕获的死信通道。此相同的终结点将充当一个死信和无效的消息的通道。图 2说明了如何死信事件现在支持 Azure 事件网格中。

死信事件
图 2 死信事件

我想要调出几个重要详细信息之前将组合在一起的一个示例使用重试策略和死信传递。

请注意,来自事件网格主题的箭头将处理程序的方向。我想指出这一点为了强调这一概念事件网格是一种推送推送模型。事件处理程序提供时就可以发送事件时,事件网格会调用 webhook。此重要的设计决策强调事件驱动的服务的性质。不同于其他消息传送服务是不再需要求助于长时间轮询或 hammer 轮询以检查消息是否有可用的技术。相反,处理程序可以依赖于此模型以新接收事件通知,这是为什么事件处理程序函数和逻辑应用等无服务器技术是吸引人的另一个原因。让我们将这放入的做法。

设置

现在我将指导您完成设置和配置的重试策略和死信终结点的步骤。此外,将会非常棒,还检查死信事件并变得可用后立即对其做出反应。

我将在 Azure 中设置的所有内容将从 Azure Cloud Shell 中完成。这将确保你可以从任何计算机执行此操作而不会存留完全取决于任何特定工具或其他依赖项。有关 Azure Cloud Shell 的详细信息,请参阅bit.ly/2CsFtQB

在撰写本文时,此功能处于预览状态,但它很可能将被释放之前或在文章发布后不久。在预览时,必须将安装要使用命令行界面 (CLI) 中的事件网格扩展:

az extension add -–name eventgrid

考虑到这种,我将初始化将在整个练习中重复使用的几个本地变量:

rgname=msdndemo
topicname=<your-unique-grid-topic-name>
storagename=<your-unique-storage-account-name>
containername=deadletterevents

我将创建一个资源组、 存储帐户和一个容器,它将用于接收死信事件,如中所示图 3

图 3: 创建资源组、 存储帐户和容器

# create resource group
az group create --name $rgname -l westus2
# create storage account
az storage account create \
  -–name $storagename \
  --location westus2 \
  --resource-group $rgname \
  --sku Standard_LRS \
  --kind StorageV2 \
  --access-tier Hot
# create storage container
export AZURE_STORAGE_ACCOUNT=$storagename
export AZURE_STORAGE_ACCESS_KEY=”$(az storage account keys list -–account-name $storagename
  --resource-group $rgname –-query “[0].value” –-output tsv)”
az storage container create –-name $containername

接下来,我需要存储帐户的资源 id。这将用于,容器名称,以及定义死信终结点:

# storage ID
storageid=$(az storage account show –-name $storagename
  –-resource-group $rgname –-query id --output tsv)  
# container ID for dead letter channel
containerid="$storageid/blobservices/default/containers/$containername"

我将会将事件发送到自定义的事件网格主题。让我们来创建主题并保存的终结点地址和访问密钥。发布事件时,将为这些值可使用更高版本:

# create custom topic
az eventgrid topic create -g $rgname –-name $topicname -l westus2
# save topic endpoint
topicendpoint=$(az eventgrid topic show –-name $topicname -g $rgname
  –-query “endpoint” –-output tsv) 
# save topic key
topickey=$(az eventgrid topic key list --name $topicname -g $rgname
  --query "key1" --output tsv)

Azure Functions 事件处理程序

在此方案中,我将假设用户将歌曲请求发送到虚构的广播电台,致力于为的布鲁斯音乐。因为请求进入时,它们是将其放到工作站的播放列表上。但是,如果歌曲的类型不匹配广播电台的目标音乐,它将是拒绝并被置于死信通道上。

事件处理程序是在版本 2 运行时生成一个 Azure 函数。它将检查该事件的主题字段并批准或相应地拒绝歌曲请求。Azure 函数的代码所示图 4

图 4 Azure 函数的事件处理程序

using System.IO;
using System.Linq;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json;
using Microsoft.Extensions.Logging; 

namespace WeWantTheFunc
{
  public static class SongRequestHandler
  {
    [FunctionName("SongRequestHandler")]
    public static IActionResult Run(
      [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]
      HttpRequest req, ILogger log)
    {
      // Get the body of the request
      var requestBody = new StreamReader(req.Body).ReadToEnd();
      // Check the header for the event type           
      if (!req.Headers.TryGetValue("Aeg-Event-Type", out var headerValues))
        return new BadRequestObjectResult("Not a valid request");
      var eventTypeHeaderValue = headerValues.FirstOrDefault();
      if (eventTypeHeaderValue == "SubscriptionValidation")
      {
        // Validate the subscription
        var events = JsonConvert.DeserializeObject<EventGridEvent[]>(requestBody);
        dynamic data = events[0].Data;
        var validationCode = data["validationCode"];
        return new JsonResult(new
        {
          validationResponse = validationCode
        });
      }
      else if (eventTypeHeaderValue == "Notification")
      {
        // Handle the song request
        log.LogInformation(requestBody);
        var events = JsonConvert.DeserializeObject<EventGridEvent[]>(requestBody);
        // Reject the request if it does not
        // match the genre for the station.
        if (events[0].Subject != "genre/blues")
          return new BadRequestObjectResult("Sorry, this is a Blues station");
        return new OkObjectResult("");
      }
      return new BadRequestObjectResult("Not a valid request");
    }
  }
}

此函数具有两个部分。首先会查看是否传入事件旨在用于验证终结点。如果验证请求,该函数将返回验证代码来证明所有权和接受传入的消息。

该函数的第二部分是为从事件网格的事件通知。通过返回的错误的请求响应 (401),进行显式语句以不能将事件发送到处理程序。这样就完成了过程的最重要的部分-创建事件订阅。

订阅事件

与所有准备就绪后,它具有的最终时刻创建演示重试策略和时视为死信功能的事件订阅。假设是,中的 Azure 函数图 4是否已部署并在 Azure 中当前正在运行。要创建订阅的命令如下所示:

az eventgrid event-subscription create \
  --endpoint <your-azure-function-url> \
  --topic-name $topicname \
  -g $rgname \
  --name song-request-sub \
  --deadletter-endpoint $containerid \
  --max-delivery-attempts 2
  --event-ttl 1

请注意,最大传递尝试和事件的 ttl 设置可。这不是要求以包括这两个设置,但它们可用于一起配置的重试策略。我设置最大传递尝试次数为 2,为一分钟的时间。另一个称为死信终结点的新自变量初始化为使用此前创建的变量的存储帐户容器。就可以发送一些事件,请参阅此工作端到端。

发送事件

从 CLI 中,我可以将复制包含歌曲请求的示例正文。此请求将实际包含的事件处理程序将拒绝无效音乐流派 (摇滚)。

# copy the request body
body=$(eval echo "'$(curl https://raw.githubusercontent.com/dbarkol/azure-event-grid-patterns/master/badsongrequest.json)'")
# post the request to the custom topic endpoint
curl -X POST -H "aeg-sas-key: $topickey" -d "$body" $topicendpoint

预期结果是,几分钟后将消息发送时,将其作为死信通道我配置的存储帐户容器中。Azure 存储资源管理器之类的工具可用于查看为每个死消息创建新 blob。但是,这不是非常令人兴奋的。作为替代方法,我想要在新消息添加到死信通道时获得通知。

检查死信事件

因为死信事件是只需在容器中创建的新 blob,可以使用事件网格开始时创建的文件,将触发的另一个工作流。图 5演示工作流如何事件可来自存储帐户和最终由逻辑应用接收和发送一封电子邮件。

所处理的逻辑应用的 blob 事件
图 5 所处理的逻辑应用的 Blob 事件

唯一需要整理出这是开始使用事件网格触发器的逻辑应用。在应用程序中的前三个步骤包含以下操作和触发器:

事件网格触发器配置为存储帐户。它使用两个筛选器 — 一个用于事件类型 (Microsoft.Storage.BlobCreated),第二个容器使用前缀筛选器选项。这些筛选器确保专用的死信容器中创建一个新文件时才会激活应用程序。

初始化变量是从事件网格消息的正文中检索该 URL 的下一步操作。其值为以下表达式:

triggerBody()?['data']['url']

获取 blob 内容使用路径是将读取 blob 的内容的操作。设置 blob 的路径使用此字符串操作表达式中删除不需要的部分的值的格式:

replace(variables('DeadLetterUrl'),
  'https://<storage-account-name>.blob.core.windows.net', '')

这些初始步骤所示图 6

事件网格触发逻辑应用
图 6 事件网格触发逻辑应用

下面是逻辑应用的最终操作:

分析 JSON 将计算 blob 的内容,并提供了一组我可以更高版本中引用的变量。内容属性表达式为:

json(body('Get_blob_content_using_path'))

我还需要提供如下所示的示例有效负载:

[{
  "id":"100",
  "eventTime":"2017-08-21T06:42:20.0000000+00:00",
  "eventType":"type",
  "dataVersion":"",
  "metadataVersion":"1",
  "topic":"enpoint",
  "subject":"testsubject",
  "deadLetterReason":"reason",
  "deliveryAttempts":1,
  "lastDeliveryOutcome":"BadRequest",
  "lastHttpStatusCode":400,
  "data":{"something":"data"}
}]

发送一封电子邮件正文和电子邮件主题 (死信原因和使用者) 的事件的项目设置格式的最后一个操作。因为有效负载中实际包含的数组,逻辑应用可以识别和包装中的操作为每个操作。

请注意,每个步骤中的每个循环,而不是按顺序在逻辑应用; 中的并行执行这是可以更改按顺序运行,如果所需的默认行为。最终结果显示在图 7

分析 JSON 和发送通知
图 7 解析 JSON 并发送通知

测试此端到端所做的工作会为每个死信事件发送一封电子邮件。

总结

使用其原始设计和越来越多的 Azure 服务深度集成,事件网格仅开始以显示创新方法来生成事件驱动的解决方案。在本文中,我演示了如何利用新的重试策略和 Azure 事件网格中的功能时视为死信。这些备受瞩目的功能是功能强大的选项来进行更有弹性且可缩放的事件网格上构建的解决方案。这篇文章中的代码,请参阅bit.ly/2LGKjhN


David Barkol是 microsoft 全球黑带团队的 Azure 专家。可以在 Twitter 上与他联系: @dbarkol或通过电子邮件dabarkol@microsoft.com。有关事件网格中的定期他的博客madeofstrings.com

衷心感谢以下 Microsoft 技术专家对本文的审阅:Bahram Banisadr

Bahram Banisadr 是负责 Azure 事件网格的 PM,致力于生成 Azure 服务的连接体系结构。


在 MSDN 杂志论坛讨论这篇文章