匯出 (0) 列印
全部展開

在 Azure 中利用 Node.js 的 libuv

更新日期: 2013年10月

作者:Christian Martinez

Node.js 無疑目前正當紅,您可以在 上找到有關如何執行的部落格,例如 Nathan Totten 所寫的這篇部落格。這當然很有趣,也很酷,可是本文的靈感卻是來自 Node 的創作者 Ryan Dahl 在 Node.js 部落格上的這篇文章。具體的說,引我感興趣的是這句引言:

因為 Node 是完全未封鎖的,所以原來 libuv 本身就是相當實用的程式庫:有 BSD 授權、最基本、高可用性、跨平台的網路程式庫。

我一向對超酷的網路程式庫有興趣,目前最愛的是 0MQ ( http://www.zeromq.org/ ),它是功能完備、支援多 MEP、高效能、一般用途的訊息程式庫。Libuv 是與 0MQ 截然不同的產品,雖然目標不同,但兩個專案的共通點是兩者都是開放原始碼,而且效能一級棒。

為了展開我的探索之旅,我下載了 5.7 版的 libuv 原始檔,並嘗試執行 vcbuild.bat 檔案。結果失敗,之後我下載了 Python,然後是 Subversion,又試過 gyp 指令碼和......總之光是這樣的波折就夠寫一篇部落格,可是言歸正傳,想不到所產生的就是個 Visual Studio 方案,接著我終於想辦法根據它產生了一個 64 位元靜態程式庫。因為 libuv 和 Node 都是高度進展中的專案,很可能在我上次練習之後,狀況已經改變,所以如果您想自行嘗試,您的經驗或許會也或許不會一樣。

當我拿到程式庫時,我必須決定如何在 上進行部署。Libuv 是原生 C 程式碼,隨附有測試套件,理論上可以使用 SDK 1.5ProgramEntryPoint 項目進行部署。可是這樣很無趣,而且對於想在 Managed 程式碼中使用此程式庫的開發人員,這樣也可能不太有用。所以我改成用 C++/CLI 開發原型,讓我可以建立以 WorkerRole 執行的伺服器。結果類似:

public override void Run()
{
     long cnt =0;
     long byteCnt = 0;
     using (var server = new TcpServer())
     {
        var ep = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["listen"];
        //must assign callback before calling listen
        server.OnBytesReceived += (arg) =>      
         {
          Interlocked.Increment(ref cnt);
          Interlocked.Add(ref byteCnt, arg.Length);
          //Trace.WriteLine(cnt);
         };
        server.Listen(ep.IPEndpoint.Address.ToString(), 
                          ep.IPEndpoint.Port); 
       //if this is successful libuv uv_run blocks exit
      }  
}

這對任何 Managed 程式碼的開發人員而言應該看起來很正常,我只是在原型中增加位元組數和收到的呼叫數而已,您可以隨意在公開的委派中做任何有趣的作業。

所以,我是如何從單純的 C 程式碼轉換到美好的 Managed 經驗呢?首先,我必須決定要使用 P/Invoke 或 C++/CLI。在研究過 libuv 原始碼,了解整體是由回呼驅動之後,我決定採用 C++/CLI。雖然成員函數指標語法很好用,我還是寧願使用 C++/CLI,而不希望像使用 P/Invoke 一樣要管理 DLLImport 以及封送處理和對應。這個資源對於了解回呼語法有很大的助益。基本上,您需要的是 Delegate 宣告、Delegate 執行個體變數和 GCHandle。我的標頭檔最後是:

// LibuvWrapper.h
#pragma once
#include "include\uv.h"
using namespace System;
using namespace System::Collections::Concurrent;
using namespace System::Runtime::InteropServices;

namespace LibuvWrapper {
    
    public delegate void BytesReceived(array<Byte>^ stuff);
    
    public ref class TcpServer sealed 
    {
        public:
            
            TcpServer();
            ~TcpServer();
            !TcpServer();
            void Listen(String^ address, int port);
            //public delegate for passing bits sent 
            //from client onto implementation code
            BytesReceived^ OnBytesReceived; 
            private:
            //delegate declarations. to convert to native callbacks you
            //need the delegate declaration, delegate instance, 
            //gchandle and preferably a typedef
            delegate void AfterRead(uv_stream_t* handle, ssize_t nread, 
                                                        uv_buf_t buf);
            delegate void OnConnection(uv_stream_t* server, 
                                                        int status);
            delegate void AfterWrite(uv_write_t* req, int status);
            delegate void OnClose(uv_handle_t* peer);
            delegate void OnServerClose(uv_handle_t* handle);

            //callback methods
            void AfterReadImpl(uv_stream_t* handle, ssize_t nread, 
                                                uv_buf_t buf);
            void OnConnectionImpl(uv_stream_t* server, int status);
            void AfterWriteImpl(uv_write_t* req, int status);
            void OnCloseImpl(uv_handle_t* peer);
            void OnServerCloseImpl(uv_handle_t* handle);
            

            //handles and delegate members that will be 
            //converted to native callbacks
            AfterRead^ afterReadFunc;
            GCHandle afterReadHandle;
            OnConnection^ onConnectionFunc;
            GCHandle onConnectionHandle;
            AfterWrite^ afterWriteFunc;
            GCHandle afterWriteHandle;
            OnClose^ onCloseFunc;
            GCHandle onCloseHandle;
            OnServerClose^ onServerCloseFunc;
            GCHandle onServerCloseHandle;
            
            
            //libuv members
            uv_tcp_t * server;
            uv_loop_t * loop;
            uv_buf_t * ack; 
            //this is the same response sent to all 
            //clients as ack. created once and never 
            //deleted until object destruction
    };
}

連通委派的方式是:

afterReadFunc = gcnew AfterRead(this, &LibuvWrapper::TcpServer::AfterReadImpl);
afterReadHandle = GCHandle::Alloc(afterReadFunc);

The typedef and passing the callback look like this:
typedef void (__stdcall * AFTERREAD_CB)(uv_stream_t* handle,
                         ssize_t nread, uv_buf_t buf);

...

if(uv_read_start(stream, echo_alloc,static_cast<AFTERREAD_CB>
  (Marshal::GetFunctionPointerForDelegate(afterReadFunc).ToPointer())))
{
    throw gcnew System::Exception(
                 "Error on setting up after read\n");}

雖然不是十分完美,但是的確可以完成任務!內部其餘部分隨附在張貼的原始檔中。

備妥伺服器之後,我用單純的老式 C#、TcpClient 建立了一個簡單的測試用戶端,並從另一個 WorkerRole 執行:

var ep = RoleEnvironment.Roles["LibuvServer"].Instances.Select(
               i => i.InstanceEndpoints ["listen"]).ToArray();
var cl = new TcpClient();
byte[] msg = Encoding.ASCII.GetBytes(new string('x',msgSize));
byte[] gbye = Encoding.ASCII.GetBytes("|^|");
byte[] ackBuf = new byte[1];

//it's easy for the client to start before server so wait 
//forever until we connect. 
//for something more sophisticated AKA "the greatest retry library 
//in the history of man" see 
//http://code.msdn.microsoft.com/Transient-Fault-Handling-b209151fwhile (!cl.Connected)
{
    try
    {
        cl.Connect(ep[0].IPEndpoint);
    }
    catch (Exception)
    {
        Thread.Sleep(100);
    }
    
}
using(var stream = cl.GetStream())
{
    var watch = new Stopwatch();
    watch.Start();
    for (int i = 0; i < numTimes ; i++)
    {

    stream.Write(msg, 0, msg.Length);
    stream.Read(ackBuf, 0, ackBuf.Length);
}
watch.Stop();
log = string.Format("Sent {0} messages of size {1} and received  {0} acks in {2} milliseconds", 
                                            numTimes, msgSize, watch.ElapsedMilliseconds);
Trace.WriteLine(log, "Information");
stream.Write(gbye,0,gbye.Length);
}

接下來,我為我的「伺服器」專案加入一個簡單的指令碼和啟動工作,以確保可以使用 C 執行階段:

"%~dps0vcredist_x64.exe" /q /norestart

最後,我在 中部署專案並執行此專案。

此專案的完整程式碼在這裡

顯示:
© 2015 Microsoft