Экспорт (0) Печать
Развернуть все

Использование библиотеки Libuv из Node.js в Azure

Обновлено: Октябрь 2013 г.

Автор: Кристиан Мартинез (Christian Martinez)

Node.js определенно является одним из наиболее популярных инструментов в данный момент. Можно найти блоги о запуске этой платформы в , например, посмотрите этот журнал Натана Тоттена. Это определенно интересно и достойно внимания, но моя запись была вдохновлена вот этой статьей в блоге Node.js автором этой платформы Райаном Дахлом. Точнее, меня заинтересовала вот эта фраза:

«Поскольку Node — это платформа без блокировки, оказывается, что libuv — это довольно полезная библиотека сама по себе: минимальная, высокопроизводительная, кросс-платформенная сетевая библиотека с лицензией BSD.

Меня всегда интересовали хорошие сетевые библиотеки, и в данный момент мой фаворит ― это 0MQ ( http://www.zeromq.org/ ), которая является полнофункциональной, высокопроизводительной и универсальной библиотекой обмена сообщения с поддержкой нескольких MEP. Библиотека Libuv сильно отличается от 0MQ и предназначена для других целей, но оба проекта сходны в том, что являются библиотеками с открытым исходным кодом, которые ставят производительность во главу угла.

Итак, чтобы приступить к исследованию библиотеки, я загрузил исходники libuv версии 5.7 и попытался запустить файл vcbuild.bat. Сделав это, я загрузил python, затем subversion, проверил gyp-скрипт и... в общем проведенные мной махинации тянут на отдельную запись в блоге. В конечном итоге было сформировано решение Visual Studio, поколдовав над которым мне удалось создать статическую 64-разрядную библиотеку. Поскольку над проектами libuv и Node ведется активная работа, то, скорее всего, с момента моего исследования их код изменился, так что, если вы попробуете повторить мой путь, все может пройти несколько иначе.

После того как я получил библиотеку, мне нужно было решить, как развернуть решение в . Libuv написана на собственном коде C и имеет набор тестов, который я мог теоретически развернуть с помощью элемента ProgramEntryPointSDK 1.5. Однако это было скучно и не слишком интересно разработчикам, которые хотят использовать библиотеку с собственным управляемым кодом, так что вместо этого я решил разработать прототип на C++/CLI, который позволил бы мне создать сервер, выполняемый в рабочей роли. Вот этот прототип:

public override void Run()
{
     long cnt =0;
     long byteCnt = 0;
     using (var server = new TcpServer())
     {
        var ep = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["listen"];
        //must assign callback before calling listen
        server.OnBytesReceived += (arg) =>      
         {
          Interlocked.Increment(ref cnt);
          Interlocked.Add(ref byteCnt, arg.Length);
          //Trace.WriteLine(cnt);
         };
        server.Listen(ep.IPEndpoint.Address.ToString(), 
                          ep.IPEndpoint.Port); 
       //if this is successful libuv uv_run blocks exit
      }  
}

Это должно быть понятно любому разработчику, работающему с управляемым кодом, и, хотя прототип лишь увеличивает байты и количество полученных вызовов, в предоставленном делегате вы можете делать все, что пожелаете нужным.

Итак, как же я перевел проект с прямых С-рельс на удобный управляемый код? Сначала пришлось решить, следует мне использовать P/Invoke или C++/CLI. После просмотра исходников libuv и осознания того, что библиотека работает на обратных вызовах, я решил применить C++/CLI. Несмотря на то что работа с указателями на функции-члены всегда полна подводных камней, я все же предпочту их, чем возню с DLLImport, упаковкой и сопоставлением этих функций с помощью P/Invoke. Этот материал сильно помог понять синтаксис обратных вызовов. Фактически вам понадобится объявление Delegate, переменная экземпляра Delegate и GCHandle. Мой итоговый файл заголовка выглядит так:

// LibuvWrapper.h
#pragma once
#include "include\uv.h"
using namespace System;
using namespace System::Collections::Concurrent;
using namespace System::Runtime::InteropServices;

namespace LibuvWrapper {
    
    public delegate void BytesReceived(array<Byte>^ stuff);
    
    public ref class TcpServer sealed 
    {
        public:
            
            TcpServer();
            ~TcpServer();
            !TcpServer();
            void Listen(String^ address, int port);
            //public delegate for passing bits sent 
            //from client onto implementation code
            BytesReceived^ OnBytesReceived; 
            private:
            //delegate declarations. to convert to native callbacks you
            //need the delegate declaration, delegate instance, 
            //gchandle and preferably a typedef
            delegate void AfterRead(uv_stream_t* handle, ssize_t nread, 
                                                        uv_buf_t buf);
            delegate void OnConnection(uv_stream_t* server, 
                                                        int status);
            delegate void AfterWrite(uv_write_t* req, int status);
            delegate void OnClose(uv_handle_t* peer);
            delegate void OnServerClose(uv_handle_t* handle);

            //callback methods
            void AfterReadImpl(uv_stream_t* handle, ssize_t nread, 
                                                uv_buf_t buf);
            void OnConnectionImpl(uv_stream_t* server, int status);
            void AfterWriteImpl(uv_write_t* req, int status);
            void OnCloseImpl(uv_handle_t* peer);
            void OnServerCloseImpl(uv_handle_t* handle);
            

            //handles and delegate members that will be 
            //converted to native callbacks
            AfterRead^ afterReadFunc;
            GCHandle afterReadHandle;
            OnConnection^ onConnectionFunc;
            GCHandle onConnectionHandle;
            AfterWrite^ afterWriteFunc;
            GCHandle afterWriteHandle;
            OnClose^ onCloseFunc;
            GCHandle onCloseHandle;
            OnServerClose^ onServerCloseFunc;
            GCHandle onServerCloseHandle;
            
            
            //libuv members
            uv_tcp_t * server;
            uv_loop_t * loop;
            uv_buf_t * ack; 
            //this is the same response sent to all 
            //clients as ack. created once and never 
            //deleted until object destruction
    };
}

Подключение делегатов производится так:

afterReadFunc = gcnew AfterRead(this, &LibuvWrapper::TcpServer::AfterReadImpl);
afterReadHandle = GCHandle::Alloc(afterReadFunc);

The typedef and passing the callback look like this:
typedef void (__stdcall * AFTERREAD_CB)(uv_stream_t* handle,
                         ssize_t nread, uv_buf_t buf);

...

if(uv_read_start(stream, echo_alloc,static_cast<AFTERREAD_CB>
  (Marshal::GetFunctionPointerForDelegate(afterReadFunc).ToPointer())))
{
    throw gcnew System::Exception(
                 "Error on setting up after read\n");}

Не самый аккуратный код, но он делает свое дело! Оставшаяся часть реализации доступна в опубликованном исходном коде.

После создания сервера с помощью старого доброго C# я сделал простое клиентское приложение TcpClient и запустил его в другой рабочей роли.

var ep = RoleEnvironment.Roles["LibuvServer"].Instances.Select(
               i => i.InstanceEndpoints ["listen"]).ToArray();
var cl = new TcpClient();
byte[] msg = Encoding.ASCII.GetBytes(new string('x',msgSize));
byte[] gbye = Encoding.ASCII.GetBytes("|^|");
byte[] ackBuf = new byte[1];

//it's easy for the client to start before server so wait 
//forever until we connect. 
//for something more sophisticated AKA "the greatest retry library 
//in the history of man" see 
//http://code.msdn.microsoft.com/Transient-Fault-Handling-b209151fwhile (!cl.Connected)
{
    try
    {
        cl.Connect(ep[0].IPEndpoint);
    }
    catch (Exception)
    {
        Thread.Sleep(100);
    }
    
}
using(var stream = cl.GetStream())
{
    var watch = new Stopwatch();
    watch.Start();
    for (int i = 0; i < numTimes ; i++)
    {

    stream.Write(msg, 0, msg.Length);
    stream.Read(ackBuf, 0, ackBuf.Length);
}
watch.Stop();
log = string.Format("Sent {0} messages of size {1} and received  {0} acks in {2} milliseconds", 
                                            numTimes, msgSize, watch.ElapsedMilliseconds);
Trace.WriteLine(log, "Information");
stream.Write(gbye,0,gbye.Length);
}

Затем добавил простой скрипт и задачу запуска в проект моего «сервера», чтобы проверить доступность С-среды выполнения:

"%~dps0vcredist_x64.exe" /q /norestart

И наконец, я развернул проект и запустил его в .

Полный исходный код проекта можно найти здесь.

Показ:
© 2014 Microsoft