导出 (0) 打印
全部展开

在 Azure 中利用 Node.js 的 libuv

更新时间: 2013年10月

作者: Christian Martinez

Node.js 无疑是当前的热门话题之一,你可以找到很多有关在 上运行它的博文,如 Nathan Totten 所写的此文。尽管这很有趣,此博文的主题是由 Node 的创建者 Ryan Dahl 所写 Node.js 博客上的此文衍生的。尤其是,我对其中的一句话很感兴趣:

由于 Node 是完全无法阻塞的,因此可以认为 libuv 是很有用的一个库:已获得 BSD 许可、最小、高性能、跨平台的网络库。

我一直关注理想的网络库,当前常用的是 0MQ (http://www.zeromq.org/),它的功能齐全,支持多个 MEP,是高性能的通用消息传送库。Libuv 与 0MQ 完全不同,它具有不同的目标但是这两个项目共享相同的开源代码,并且将性能放在首位。

为了开始试用,我下载了 libuv 源代码的 5.7 版本,并尝试运行 vcbuild.bat 文件。之后,我下载了 python,然后设置子版本并签出 gyp 脚本.. 不管怎样,这些操作可能在博客中详细讨论,但是现在我们只知道最终会得到一个 Visual Studio 解决方案,我可以修改它来生成一个 64 位静态库。因为 libuv 和 Node 是很活跃的项目,自从我试用后它可能已更改,因此如果你将它用于自己的项目中,你的体验可能不同。

我有了库后,必须决定要如何将项目部署到 。Libuv 本身就是 C 代码,随测试套件一起提供,从理论上讲,我可以使用 SDK 1.5ProgramEntryPoint 元素部署它。但是,对于要将库用于自己的托管代码的开发人员来说,那可能很乏味,而且没有太大用处。因此,我用 C++/CLI 编写了一个原型,它允许创建以 WorkerRole 运行的服务器,类似于:

public override void Run()
{
     long cnt =0;
     long byteCnt = 0;
     using (var server = new TcpServer())
     {
        var ep = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["listen"];
        //must assign callback before calling listen
        server.OnBytesReceived += (arg) =>      
         {
          Interlocked.Increment(ref cnt);
          Interlocked.Add(ref byteCnt, arg.Length);
          //Trace.WriteLine(cnt);
         };
        server.Listen(ep.IPEndpoint.Address.ToString(), 
                          ep.IPEndpoint.Port); 
       //if this is successful libuv uv_run blocks exit
      }  
}

这对任何托管代码开发人员来说应该很常见,我在原型中所做的是递增字节,对于接收的调用数,你可以随意做在公开的委托中选择的任何有趣的事情。

因此,我如何从直接的 C 代码获得好的托管体验?我要做的第一件事是决定是要使用 P/Invoke 还是 C++/CLI。在了解 libuv 源代码并知道所有事情是回调驱动的后,我决定继续采用 C++/CLI 编程。尽管指向成员函数语法的指针很容易处理,我仍喜欢管理 DLLImport 并使用 P/Invoke 封送和映射一个指针。此资源对了解回调语法有很大帮助。本质上你最终需要的是一个 Delegate 声明、一个 Delegate 实例变量和一个 GCHandle。我的头文件最终类似于:

// LibuvWrapper.h
#pragma once
#include "include\uv.h"
using namespace System;
using namespace System::Collections::Concurrent;
using namespace System::Runtime::InteropServices;

namespace LibuvWrapper {
    
    public delegate void BytesReceived(array<Byte>^ stuff);
    
    public ref class TcpServer sealed 
    {
        public:
            
            TcpServer();
            ~TcpServer();
            !TcpServer();
            void Listen(String^ address, int port);
            //public delegate for passing bits sent 
            //from client onto implementation code
            BytesReceived^ OnBytesReceived; 
            private:
            //delegate declarations. to convert to native callbacks you
            //need the delegate declaration, delegate instance, 
            //gchandle and preferably a typedef
            delegate void AfterRead(uv_stream_t* handle, ssize_t nread, 
                                                        uv_buf_t buf);
            delegate void OnConnection(uv_stream_t* server, 
                                                        int status);
            delegate void AfterWrite(uv_write_t* req, int status);
            delegate void OnClose(uv_handle_t* peer);
            delegate void OnServerClose(uv_handle_t* handle);

            //callback methods
            void AfterReadImpl(uv_stream_t* handle, ssize_t nread, 
                                                uv_buf_t buf);
            void OnConnectionImpl(uv_stream_t* server, int status);
            void AfterWriteImpl(uv_write_t* req, int status);
            void OnCloseImpl(uv_handle_t* peer);
            void OnServerCloseImpl(uv_handle_t* handle);
            

            //handles and delegate members that will be 
            //converted to native callbacks
            AfterRead^ afterReadFunc;
            GCHandle afterReadHandle;
            OnConnection^ onConnectionFunc;
            GCHandle onConnectionHandle;
            AfterWrite^ afterWriteFunc;
            GCHandle afterWriteHandle;
            OnClose^ onCloseFunc;
            GCHandle onCloseHandle;
            OnServerClose^ onServerCloseFunc;
            GCHandle onServerCloseHandle;
            
            
            //libuv members
            uv_tcp_t * server;
            uv_loop_t * loop;
            uv_buf_t * ack; 
            //this is the same response sent to all 
            //clients as ack. created once and never 
            //deleted until object destruction
    };
}

通过以下方式设置委托:

afterReadFunc = gcnew AfterRead(this, &LibuvWrapper::TcpServer::AfterReadImpl);
afterReadHandle = GCHandle::Alloc(afterReadFunc);

The typedef and passing the callback look like this:
typedef void (__stdcall * AFTERREAD_CB)(uv_stream_t* handle,
                         ssize_t nread, uv_buf_t buf);

...

if(uv_read_start(stream, echo_alloc,static_cast<AFTERREAD_CB>
  (Marshal::GetFunctionPointerForDelegate(afterReadFunc).ToPointer())))
{
    throw gcnew System::Exception(
                 "Error on setting up after read\n");}

它并不是最好的,但是完成了所要做的工作!这些内容的其余部分包含在发布的源代码中。

我有服务器后,使用旧的纯 C# 生成了一个简单的测试客户端 TcpClient 并从另一 WorkerRole 来运行它:

var ep = RoleEnvironment.Roles["LibuvServer"].Instances.Select(
               i => i.InstanceEndpoints ["listen"]).ToArray();
var cl = new TcpClient();
byte[] msg = Encoding.ASCII.GetBytes(new string('x',msgSize));
byte[] gbye = Encoding.ASCII.GetBytes("|^|");
byte[] ackBuf = new byte[1];

//it's easy for the client to start before server so wait 
//forever until we connect. 
//for something more sophisticated AKA "the greatest retry library 
//in the history of man" see 
//http://code.msdn.microsoft.com/Transient-Fault-Handling-b209151fwhile (!cl.Connected)
{
    try
    {
        cl.Connect(ep[0].IPEndpoint);
    }
    catch (Exception)
    {
        Thread.Sleep(100);
    }
    
}
using(var stream = cl.GetStream())
{
    var watch = new Stopwatch();
    watch.Start();
    for (int i = 0; i < numTimes ; i++)
    {

    stream.Write(msg, 0, msg.Length);
    stream.Read(ackBuf, 0, ackBuf.Length);
}
watch.Stop();
log = string.Format("Sent {0} messages of size {1} and received  {0} acks in {2} milliseconds", 
                                            numTimes, msgSize, watch.ElapsedMilliseconds);
Trace.WriteLine(log, "Information");
stream.Write(gbye,0,gbye.Length);
}

接下来,我向我的“服务器”项目添加了简单的脚本和启动任务,以确保 C 运行时可用:

"%~dps0vcredist_x64.exe" /q /norestart

最后,我部署了该项目并在 中运行它。

可以在此处找到项目的完整源代码。

显示:
© 2014 Microsoft