无责任 Azure SDK .NET 开发入门篇 - 使用 ServiceBus Queue 服务


王豫翔


2015年9月

服务总线队列支持中转消息通信模型。在使用队列时,分布式应用程序的组件不会直接相互通信,而是通过充当中介的队列交换消息。消息创建方(发送方)将消息传送到队列,然后继续对其进行处理。消息使用方(接收方)以异步方式从队列中提取消息并处理它。创建方不必等待使用方的答复即可继续处理并发送更多消息。队列为一个或多个竞争使用方提供"先入先出 (FIFO)"消息传递方式。也就是说,接收方通常会按照消息添加到队列中的顺序来接收并处理消息,并且每条消息仅由一个消息使用方接收并处理。下图可以了解到主要服务组件

服务总线队列是一种可用于 各种应用场景的通用技术:

  • 多层 Azure 应用程序中 Web 角色和辅助角色之间的通信
  • 混合解决方案中本地应用程序和 Azure 托管应用程序之间的通信
  • 在不同组织或组织的各部门中本地运行的分布式应用程序组件之间的通信

有意思的是,服务总线是通过NamespaceManager类进行操作的,但是Azure SDK中我没有发现可以管理创建Namespace的类(RESTAPI有),所以命名空间我们还是需要通过管理门户去建立。然后将命名空间的密钥保存到配置文件中

<add key="Microsoft.ServiceBus.Namespace" value="Endpoint=sb: " />

现在我们建立ServiceBusQueueController控制器,基础代码为

[Authorize]
public class ServiceBusQueueController : Controller
{
    string connectionString = Microsoft.Azure.CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.Namespace");
    NamespaceManager namespaceManager = null;

    public ServiceBusQueueController()
    {
        namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
    }
}

该控制器有如下方法

  • Index
  • Create
  • Delete
  • Details
  • Send
  • List

8.1 Index获取当前命名空间中队列集合

代码比较简单

public async Task<ActionResult> Index()
{
    var queues = await namespaceManager.GetQueuesAsync();

    return View(queues);
}

对应的View说明了常用的队列信息

@model IEnumerable<Microsoft.ServiceBus.Messaging.QueueDescription>

@{
    ViewBag.Title = "Index";
}

<h2>Index</h2>

<p>
    @Html.ActionLink("创建新的队列", "Create")
</p>
<table class="table">
    <tr>
        <th>队列的名称</th>
        <th>当前状态</th>
        <th>大小<br />(以字节为单位)</th>
        <th>队列中的消息数</th>
        <th>是否可匿名访问</th>
        <th>锁定消息时间</th>
        <th>最大大小<br />(以 MB 为单位)</th>
        <th>支持排序</th>
        <th>更新时间</th>
    </tr>

    @foreach (var item in Model)
    {
        <tr>
            <td>
                @Html.ActionLink(item.Path, "Details", new { path = item.Path }) |
            </td>
            <td>
                @Html.Label(item.Status.ToString())
            </td>
            <td>
                @Html.Label(item.SizeInBytes.ToString())
            </td>
            <td>
                @Html.Label(item.MessageCount.ToString())
            </td>
            <td>
                @Html.Label(item.IsAnonymousAccessible.ToString())
            </td>
            <td>
                @Html.Label(item.LockDuration.ToString())
            </td>
            <td>
                @Html.Label(item.MaxSizeInMegabytes.ToString())
            </td>
            <td>
                @Html.Label(item.SupportOrdering.ToString())
            </td>
            <td>
                @Html.Label(item.UpdatedAt.ToString())
            </td>
            <td>
                @Html.ActionLink("发消息", "Send", new { path = item.Path }) |
                @Html.ActionLink("删除", "Delete", new { path = item.Path }) |
                @Html.ActionLink("查看消息", "List", new { path = item.Path })|
                @*@Html.ActionLink("数据可视化", "Chart", new { path = item.Path })*@
            </td>
        </tr>
    }

</table>

运行的结果如下

8.2 Create创建队列

由于SDK封装出色,所以代码非常清晰

[HttpPost]
public ActionResult Create(FormCollection values)
{
    if (!namespaceManager.QueueExists(values["path"]))
    {
        var queue = new Microsoft.ServiceBus.Messaging.QueueDescription(values["path"])
        {
            AutoDeleteOnIdle = new TimeSpan(0, int.Parse(values["autoDeleteOnIdle"]), 0),
            DefaultMessageTimeToLive = new TimeSpan(0, int.Parse(values["defaultMessageTimeToLive"]), 0),
            MaxSizeInMegabytes = int.Parse(values["maxSizeInMegabytes"]) * 1024,
        };
        namespaceManager.CreateQueue(queue);
    }
    return RedirectToAction("Index");
}

对应的View代码为

@model Microsoft.ServiceBus.Messaging.QueueDescription

@{
    ViewBag.Title = "Create";
}

<h2>Create Service Bus Queue</h2>


@using (Html.BeginForm())
{
    @Html.AntiForgeryToken()

    <div class="form-horizontal">
        <h4>QueueDescription</h4>
        <hr />
        @Html.ValidationSummary(true, "", new { @class = "text-danger" })
        <div class="form-group">
            <p>设置队列的名称</p>
            @Html.TextBox("path")
        </div>
        <div class="form-group">
            <p>设置在自动删除队列之前需经过的 TimeSpan 空闲时间间隔。最短持续时间为 5 分钟。</p>
            @Html.TextBox("autoDeleteOnIdle", "5")
        </div>
        <div class="form-group">
            <p>设置默认的消息生存时间值。这是未对消息本身设置 TimeToLive 时使用的默认值。</p>
            @Html.TextBox("defaultMessageTimeToLive", "10")
        </div>
        <div class="form-group">
            <p>该队列的总大小。最大为5GB</p>
            @Html.TextBox("maxSizeInMegabytes")
        </div>
        <div class="form-group">
            <div class="col-md-offset-2 col-md-10">
                <input type="submit" value="创建" class="btn btn-default" />
            </div>
        </div>
    </div>
}

<div>
    @Html.ActionLink("Back to List", "Index")
</div>

@section Scripts {
    @Scripts.Render("~/bundles/jqueryval")
}

运行结果为

创建成功后

门户中同样可以看到

8.3 Details了解队列中消息的情况

控制器代码

public ActionResult Details(string path)
{
    ViewBag.Path = path;
    return View(namespaceManager.GetQueue(path).MessageCountDetails);
}

对应的View中可以看到队列中的消息被分成不同情况

@model Microsoft.ServiceBus.Messaging.MessageCountDetails

@{
    ViewBag.Title = "Details";
}

<h2>Details</h2>

<div>
    <h4>@ViewBag.Path</h4>
    <hr />
    <dl class="dl-horizontal">
        <dt>
            队列、主题或订阅中的活动消息数
        </dt>
        <dd>
            @Html.DisplayFor(model => model.ActiveMessageCount)
        </dd>

        <dt>
            死信消息数
        </dt>
        <dd>
            @Html.DisplayFor(model => model.DeadLetterMessageCount)
        </dd>

        <dt>
            预定消息数
        </dt>
        <dd>
            @Html.DisplayFor(model => model.ScheduledMessageCount)
        </dd>

        <dt>
            传输到死信队列的消息数
        </dt>
        <dd>
            @Html.DisplayFor(model => model.TransferDeadLetterMessageCount)
        </dd>

        <dt>
            传输到其他队列、主题或订阅的消息数
        </dt>
        <dd>
            @Html.DisplayFor(model => model.TransferMessageCount)
        </dd>
    </dl>
</div>
<p>
    @Html.ActionLink("Back to List", "Index")
</p>

运行后效果大致如下

8.4 Send向队列发送消息

控制器的代码

[HttpPost]
public ActionResult Send(string path, int count)
{
    QueueClient myQueueClient = QueueClient.CreateFromConnectionString(connectionString, path);

    List<BrokeredMessage> messageList = new List<BrokeredMessage>();

    Random r = new Random();
    var msglist = new List<BrokeredMessage>();


    for (var i = 0; i < count; i++)
    {

        var msg = new BrokeredMessage(string.Format("test:{0}", i))
        {
            CorrelationId = Guid.NewGuid().ToString(),
            MessageId = Guid.NewGuid().ToString(),
            ReplyToSessionId = Guid.NewGuid().ToString(),
            TimeToLive = new TimeSpan(42, 0, 0),
            Label = string.Format("test:{0}-{1}", i, DateTime.Now.ToLongDateString()),
        };
        msg.Properties.Add("city", ChinaCity.GetCityRandom());
        msg.Properties.Add("job", Source.GetSourceRandom());
        msg.Properties.Add("point", r.Next(1, 100));
        msg.Properties.Add("sex", r.Next(1, 3) == 1 ? "男" : "女");

        msglist.Add(msg);
    }
    myQueueClient.SendBatch(msglist);

    ViewBag.Path = path;
    return RedirectToAction("Details", new { path = path });
}

8.5 List 获取队列中的消息

控制器的代码为

public ActionResult List(string path)
{
    ViewBag.Path = path;
    return View();
}

对应的View代码

@{
    ViewBag.Title = "Get";
}


@section scripts{
    <script>
        $(function () {
            $("#getmessage").click(function () {
                $.ajax(
                    {
                        method: "POST",
                        url: "@Url.Action("Receive", "ServiceBusQueue")",
                        data: { path: "@ViewBag.Path" ,count:$("#count").val()}
                    }).done(function (msglist) {
                        msglist.forEach(function(msg)
                        {
                            var trhtml = "<tr><td>" + msg.Label + "</td><td>" + msg.Properties.sex + "</td><td>" + msg.Properties.city + "</td><td>" + msg.Properties.job + "</td></tr>";
                            $("tbody tr:last").after(trhtml);
                        });
                    });
            });
        })
    </script>
}

<h2>@ViewBag.Path</h2>

<input type="text" id="count"  value="10"/>
<input type="button" value="获取消息" id="getmessage" />
<table class="table">
    <tr>
        <th>标签</th>
        <th>性别</th>
        <th>城市</th>
        <th>职业</th>
    </tr>
    <tbody></tbody>

</table>

View中对应的Ajax的控制器代码为

[HttpPost]
public JsonResult Receive(string path, int count)
{

    QueueClient client = QueueClient.CreateFromConnectionString(connectionString, path);
    var msgList = new List<BrokeredMessage>();

    for (int i = 0; i < count; i++)
    {
        BrokeredMessage message = client.Receive();
        if (message == null)
        {
            break;
        }
        try
        {
            msgList.Add(message);
            message.Complete();
        }
        catch (Exception)
        {
            message.Abandon();
        }
    }
    return Json(msgList);
}

运行结果如图

8.6 Delete删除队列

 

public ActionResult Delete(string path)
{
    namespaceManager.DeleteQueue(path);
    return RedirectToAction("Index");
}


相关文章 |  无责任 Azure SDK .NET 开发入门篇 - Azure 开发前准备工作 | 使用 Azure AD 进行身份验证 | 使用 Azure AD 管理用户信息 | 创建管理云服务 | 使用 Blob Storage 服务 | 使用 Table Storage 服务 | 使用 Queue Storage 服务 | 使用 ServiceBus Queue 服务 | 使用 ServiceBus Topic 服务 | 使用 Azure SQL 数据库