Export (0) Print
Expand All

Leveraging Node.js’ libuv in Azure

Updated: October 1, 2013

Author: Christian Martinez

Node.js is definitely one of the hottest things going right now and you can find blogs about running it in such as this one by Nathan Totten. While that is certainly interesting and cool, the subject of this post was inspired by this post on the Node.js blog by Node’s creator Ryan Dahl. More specifically, it was this quote that got me interested:

Since Node is totally non-blocking, libuv turns out to be a rather useful library itself: a BSD-licensed, minimal, high-performance, cross-platform networking library.

I am always interested in cool networking libraries with my current favorite being 0MQ ( http://www.zeromq.org/ ) which is a full featured, multiple MEP supporting, high performance, general purpose messaging library. Libuv is a very different beast from 0MQ and has different goals but both projects share the common bond of being open source and of making performance a first class citizen.

In order to get started on my adventure I downloaded the 5.7 version of the libuv source and attempted to run the vcbuild.bat file. After that tanked, I downloaded python, then subversion and checked out the gyp script and.. anyways these shenanigans could probably be a blog in and of themselves but for now just know that what ended up being generated was a Visual Studio solution that I was then able to tweak to generate a 64 bit static library. Since libuv and Node are highly active projects it’s likely that things have changed since I went through my exercise so if you take this on yourself your experience may or may not be different.

Once I had the library, I had to decide how I wanted to deploy things to . Libuv is native C code and comes with a test suite that I could have theoretically deployed using the SDK 1.5ProgramEntryPoint element. But, that would have been boring and probably not seen as too useful by developers who wanted to use the library with their own managed code so what I did instead was develop a prototype in C++/CLI that allowed me to create a server that runs in a WorkerRole and looks like this:

public override void Run()
{
     long cnt =0;
     long byteCnt = 0;
     using (var server = new TcpServer())
     {
        var ep = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["listen"];
        //must assign callback before calling listen
        server.OnBytesReceived += (arg) =>      
         {
          Interlocked.Increment(ref cnt);
          Interlocked.Add(ref byteCnt, arg.Length);
          //Trace.WriteLine(cnt);
         };
        server.Listen(ep.IPEndpoint.Address.ToString(), 
                          ep.IPEndpoint.Port); 
       //if this is successful libuv uv_run blocks exit
      }  
}

That should look very normal to any managed developer and while all I do in the prototype is increment bytes, and number of calls received you’re free to do any interesting thing you choose in the exposed delegate.

So, how did I get from straight C code to a nice managed experience? Well, the first thing I had to do was to decide if I wanted to use P/Invoke or C++/CLI. After perusing the libuv source and realizing the whole thing was callback driven, I decided to go with C++/CLI. While pointer to member function syntax is always a joy to deal with, I still prefer dealing with that to managing the DLLImport and marshalling and mapping one does using P/Invoke. This resource was a great help in figuring out the callback syntax. Essentially what you end up needing is a Delegate declaration, a Delegate instance variable and a GCHandle. My header file ended up looking like this:

// LibuvWrapper.h
#pragma once
#include "include\uv.h"
using namespace System;
using namespace System::Collections::Concurrent;
using namespace System::Runtime::InteropServices;

namespace LibuvWrapper {
    
    public delegate void BytesReceived(array<Byte>^ stuff);
    
    public ref class TcpServer sealed 
    {
        public:
            
            TcpServer();
            ~TcpServer();
            !TcpServer();
            void Listen(String^ address, int port);
            //public delegate for passing bits sent 
            //from client onto implementation code
            BytesReceived^ OnBytesReceived; 
            private:
            //delegate declarations. to convert to native callbacks you
            //need the delegate declaration, delegate instance, 
            //gchandle and preferably a typedef
            delegate void AfterRead(uv_stream_t* handle, ssize_t nread, 
                                                        uv_buf_t buf);
            delegate void OnConnection(uv_stream_t* server, 
                                                        int status);
            delegate void AfterWrite(uv_write_t* req, int status);
            delegate void OnClose(uv_handle_t* peer);
            delegate void OnServerClose(uv_handle_t* handle);

            //callback methods
            void AfterReadImpl(uv_stream_t* handle, ssize_t nread, 
                                                uv_buf_t buf);
            void OnConnectionImpl(uv_stream_t* server, int status);
            void AfterWriteImpl(uv_write_t* req, int status);
            void OnCloseImpl(uv_handle_t* peer);
            void OnServerCloseImpl(uv_handle_t* handle);
            

            //handles and delegate members that will be 
            //converted to native callbacks
            AfterRead^ afterReadFunc;
            GCHandle afterReadHandle;
            OnConnection^ onConnectionFunc;
            GCHandle onConnectionHandle;
            AfterWrite^ afterWriteFunc;
            GCHandle afterWriteHandle;
            OnClose^ onCloseFunc;
            GCHandle onCloseHandle;
            OnServerClose^ onServerCloseFunc;
            GCHandle onServerCloseHandle;
            
            
            //libuv members
            uv_tcp_t * server;
            uv_loop_t * loop;
            uv_buf_t * ack; 
            //this is the same response sent to all 
            //clients as ack. created once and never 
            //deleted until object destruction
    };
}

Wiring up the delegates looks like this:

afterReadFunc = gcnew AfterRead(this, &LibuvWrapper::TcpServer::AfterReadImpl);
afterReadHandle = GCHandle::Alloc(afterReadFunc);

The typedef and passing the callback look like this:
typedef void (__stdcall * AFTERREAD_CB)(uv_stream_t* handle,
                         ssize_t nread, uv_buf_t buf);

...

if(uv_read_start(stream, echo_alloc,static_cast<AFTERREAD_CB>
  (Marshal::GetFunctionPointerForDelegate(afterReadFunc).ToPointer())))
{
    throw gcnew System::Exception(
                 "Error on setting up after read\n");}

It is not super pretty but it got the job done! The rest of the internals are available with the posted source.

Once I had a server, I built a simple test client using plain old C# , TcpClient and ran that from another WorkerRole:

var ep = RoleEnvironment.Roles["LibuvServer"].Instances.Select(
               i => i.InstanceEndpoints ["listen"]).ToArray();
var cl = new TcpClient();
byte[] msg = Encoding.ASCII.GetBytes(new string('x',msgSize));
byte[] gbye = Encoding.ASCII.GetBytes("|^|");
byte[] ackBuf = new byte[1];

//it's easy for the client to start before server so wait 
//forever until we connect. 
//for something more sophisticated AKA "the greatest retry library 
//in the history of man" see 
//http://code.msdn.microsoft.com/Transient-Fault-Handling-b209151fwhile (!cl.Connected)
{
    try
    {
        cl.Connect(ep[0].IPEndpoint);
    }
    catch (Exception)
    {
        Thread.Sleep(100);
    }
    
}
using(var stream = cl.GetStream())
{
    var watch = new Stopwatch();
    watch.Start();
    for (int i = 0; i < numTimes ; i++)
    {

    stream.Write(msg, 0, msg.Length);
    stream.Read(ackBuf, 0, ackBuf.Length);
}
watch.Stop();
log = string.Format("Sent {0} messages of size {1} and received  {0} acks in {2} milliseconds", 
                                            numTimes, msgSize, watch.ElapsedMilliseconds);
Trace.WriteLine(log, "Information");
stream.Write(gbye,0,gbye.Length);
}

Next, I added a simple script and Startup Task to my “server” project to ensure the C runtime was available:

"%~dps0vcredist_x64.exe" /q /norestart

And finally, I deployed the project and ran it in .

The full source for the project can be found here.

Community Additions

ADD
Show:
© 2014 Microsoft