Erlang和其他语言的交互

Erlang和其他语言(如C和Java)的交互手段一直是我很感兴趣的主题,周末看了下OTP文档,终于大致理清楚了思路。这里先简单总结四种交互手段,也是更进一步学习Erlang的开始。

端口

最简单的方式是调用Erlang模块的 open_port/2 ,创建一个端口。Erlang端口可以被认为是一个外部Erlang进程,交互手段是通过标准IO输入输出,对应C语言里的read和write函数。

-spec open_port(PortName, PortSettings) -> port() when
      PortName :: {spawn, Command :: string() | binary()} |
                  {spawn_driver, Command :: string() | binary()} |
                  {spawn_executable, FileName :: file:name() } |
                  {fd, In :: non_neg_integer(), Out :: non_neg_integer()},
      PortSettings :: [Opt],
      Opt :: {packet, N :: 1 | 2 | 4}
           | stream
           | {line, L :: non_neg_integer()}
           | {cd, Dir :: string() | binary()}
           | {env, Env :: [{Name :: string(), Val :: string() | false}]}
           | {args, [string() | binary()]}
           | {arg0, string() | binary()}
           | exit_status
           | use_stdio
           | nouse_stdio
           | stderr_to_stdout
           | in
           | out
           | binary
           | eof
       | {parallelism, Boolean :: boolean()}
       | hide.

还没有读源码,但是原理很好理解,只需要重定向标准输入输出,如linux的dup2函数,就可以实现数据交互。具体的数据格式也是简单的二进制串,由{packet, N}指定开头长度标识符的位数。

例子如下:

%% erlang
%% simple test for string operation with c
-module(complex1).
-export([start/1, stop/0, init/1]).
-export([strlen/1, strcmp/2]).

start(Prog) ->
    spawn(?MODULE, init, [Prog]).

stop() ->
    ?MODULE ! stop.

strlen(S) ->
    call_port({strlen, S}).
strcmp(S, T) ->
    call_port({strcmp, S, T}).

call_port(Msg) ->
    ?MODULE ! {call, self(), Msg},
    receive
        {?MODULE, Result} ->
            Result
    end.

init(Prog) ->
    register(?MODULE, self()),
    process_flag(trap_exit, true),
    Port = open_port({spawn, Prog}, [{packet, 2}]),
    loop(Port).

loop(Port) ->
    receive
        {call, From, Msg} ->
            Port ! {self(), {command, encode(Msg)}},
            receive
                {Port, {data, Data}} ->
                    From ! {?MODULE, decode(Data)}
            end,
            loop(Port);
        stop ->
            Port ! {self(), close},
            receive
                {Port, closed} ->
                    exit(normal)
            end;
        {'EXIT', Port, _Reason} ->
            exit(port_exit_error)
    end.

encode({strlen, X}) -> [1, list_to_binary(X)];
encode({strcmp, X, Y}) -> [2, list_to_binary(X), 0, list_to_binary(Y), 0].

decode([Int]) -> Int.

后面的Erlang端程序大同小异,只写关键点。C语言那边也很简单:

#include <unistd.h>
#include "comm.h"

int read_exact(byte*, int);
int write_exact(byte*, int);

int read_cmd(byte* buf)
{
    int len;
    if (read_exact(buf, 2) != 2)
        return -1;
    len = (buf[0] << 8) | buf[1];
    return read_exact(buf, len);
}

int write_cmd(byte* buf, int len)
{
    byte li;

    li = (len >> 8) & 0xff;
    write_exact(&li, 1);

    li = len & 0xff;
    write_exact(&li, 1);

    return write_exact(buf, len);
}

int read_exact(byte* buf, int len)
{
    int i, got = 0;
    do {
        if ((i = read(0, buf+got, len-got)) <= 0)
            return i;
        got += i;
    } while (got < len);

    return len;
}

int write_exact(byte* buf, int len)
{
    int i, wrote = 0;
    do {
        if ((i = write(1, buf+wrote, len-wrote)) <= 0)
            return i;
        wrote += i;
    } while (wrote < len);

    return len;
}
// C
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include "comm.h"

#define LOG_FILE "einterface.log"

void split_string(char* s, char** s1, char** s2)
{
    *s1 = s;
    *s2 = s + strlen(s) + 1;    
}

int main()
{
    int fn, res, len, i;
    char *s1, *s2;
    
    byte buf[100] = {0};
    
    FILE* f = fopen(LOG_FILE, "w");
    if (f == NULL)
        return -1;

    while ( (len=read_cmd(buf)) > 0) {
        fn = buf[0];
        
        fprintf(f, "Data: ");
        for (i = 0; i < len; ++i)
            fprintf(f, "0x%02x ", buf[i]);
        fprintf(f, "\n");
        
        if (fn == 1) {
            res = strlen((char*)buf + 1);
        } else if (fn == 2) {
            split_string((char*)buf + 1, &s1, &s2);
            fprintf(f, "s1: %s, s2: %s\n", s1, s2);
            res = strcmp(s1,s2);
        }

        buf[0] = res;
        write_cmd(buf, 1);
        memset(buf, 0, sizeof(buf));
    }

    fclose(f);
    
    return 0;
}

端口方式最简单,但是缺点也很明显,数据量大的时候效率很低。

Erl_Interface

准确的说,这和上一种方式一样,都是利用输入输出。Erl_Interface是Erlang官方提供的数据编码手段,用来替换我们的decode和encode。能把所有Erlang项式编码成二进制。

    {call, From, Msg} ->
        Port ! {self(), {command, term_to_binary(Msg)}},
        receive
            {Port, {data, Data}} ->
                From ! {?MODULE, binary_to_term(Data)}
        end,

C结构体的定义在erl_interface.h,缺少文档说明的接口也能在这里找到。用到的关键结构ETERM是所有Erlang基本数据结构的union。

typedef struct _eterm {
  union {
    Erl_Integer    ival;
    Erl_Uinteger   uival; 
    Erl_LLInteger  llval;
    Erl_ULLInteger ullval;
    Erl_Float      fval;
    Erl_Atom       aval;
    Erl_Pid        pidval;     
    Erl_Port       portval;    
    Erl_Ref        refval;   
    Erl_List       lval;
    Erl_EmptyList  nval;
    Erl_Tuple      tval;
    Erl_Binary     bval;
    Erl_Variable   vval;
    Erl_Function   funcval;
    Erl_Big        bigval;
  } uval;
} ETERM;

常用的转换函数也都封装成了宏:

#define ERL_INT_VALUE(x)  ((x)->uval.ival.i)
#define ERL_INT_UVALUE(x) ((x)->uval.uival.u)
#define ERL_LL_VALUE(x)   ((x)->uval.llval.i)
#define ERL_LL_UVALUE(x)  ((x)->uval.ullval.u)

#define ERL_FLOAT_VALUE(x) ((x)->uval.fval.f)

#define ERL_ATOM_PTR(x) erl_atom_ptr_latin1((Erl_Atom_data*) &(x)->uval.aval.d)
#define ERL_ATOM_PTR_UTF8(x) erl_atom_ptr_utf8((Erl_Atom_data*) &(x)->uval.aval.d)
#define ERL_ATOM_SIZE(x) erl_atom_size_latin1((Erl_Atom_data*) &(x)->uval.aval.d)
#define ERL_ATOM_SIZE_UTF8(x) erl_atom_size_utf8((Erl_Atom_data*) &(x)->uval.aval.d)

#define ERL_PID_NODE(x) erl_atom_ptr_latin1((Erl_Atom_data*) &(x)->uval.pidval.node)
#define ERL_PID_NODE_UTF8(x) erl_atom_ptr_utf8((Erl_Atom_data*) &(x)->uval.pidval.node)
#define ERL_PID_NUMBER(x) ((x)->uval.pidval.number)
#define ERL_PID_SERIAL(x) ((x)->uval.pidval.serial)
#define ERL_PID_CREATION(x) ((x)->uval.pidval.creation)

使用erl_interface的C程序如下:

#include <unistd.h>
#include <string.h>
#include "comm.h"
#include "erl_interface.h"
#include "ei.h"


int main()
{
    ETERM *tuplep, *intp;
    ETERM *fnp, *argp, *sp1, *sp2;

    byte buf[100] = {0};
    int res, allocated, freed;

    erl_init(NULL, 0);

    while (read_cmd(buf) > 0) {
        tuplep = erl_decode(buf);
        fnp = erl_element(1, tuplep);
        sp1 = erl_element(2, tuplep);

        if (strncmp((const char*)ERL_ATOM_PTR(fnp), "strlen", 6) == 0) {
            res = strlen(erl_iolist_to_string(sp1));
        } else if (strncmp((const char*)ERL_ATOM_PTR(fnp), "strcmp", 6) == 0) {
            sp2 = erl_element(3, tuplep);
            res = strcmp(erl_iolist_to_string(sp1), erl_iolist_to_string(sp2));
        }

        intp = erl_mk_int(res);
        erl_encode(intp, buf);
        write_cmd(buf, erl_term_len(intp));

        erl_free_compound(tuplep);
        erl_free_term(fnp);
        erl_free_term(argp);
        erl_free_term(intp);
    }
    return 0;
}

erl_init 用于初始化内存管理等。

嵌入式端口(端口驱动)

和前面两种方式不同,嵌入式端口作为动态模块直接加载到Erlang虚拟机。Erlang端需要先加载模块:

start(ProgLib) ->
    case erl_ddll:load_driver("./", ProgLib) of
        ok ->
            ok;
        {error, already_loaded} ->
            ok;
        Reason ->
            io:format("error: ~p~n", [Reason]),
            exit({error, could_not_load_driver})
    end,
    spawn(?MODULE, init, [ProgLib]).

C程序做的变动比较大,主要是完善ErlDrvEntry接口:

/*
 * This structure defines a driver.
 */

typedef struct erl_drv_entry {
    int (*init)(void);      /* called at system start up for statically
                   linked drivers, and after loading for
                   dynamically loaded drivers */ 

#ifndef ERL_SYS_DRV
    ErlDrvData (*start)(ErlDrvPort port, char *command);
                /* called when open_port/2 is invoked.
                   return value -1 means failure. */
#else
    ErlDrvData (*start)(ErlDrvPort port, char *command, SysDriverOpts* opts);
                /* special options, only for system driver */
#endif
    void (*stop)(ErlDrvData drv_data);
                                /* called when port is closed, and when the
                   emulator is halted. */
    void (*output)(ErlDrvData drv_data, char *buf, ErlDrvSizeT len);
                /* called when we have output from erlang to
                   the port */
    void (*ready_input)(ErlDrvData drv_data, ErlDrvEvent event); 
                /* called when we have input from one of 
                   the driver's handles */
    void (*ready_output)(ErlDrvData drv_data, ErlDrvEvent event);  
                /* called when output is possible to one of 
                   the driver's handles */
    char *driver_name;      /* name supplied as command 
                   in open_port XXX ? */
    void (*finish)(void);        /* called before unloading the driver -
                   DYNAMIC DRIVERS ONLY */
    void *handle;       /* Reserved -- Used by emulator internally */
    ErlDrvSSizeT (*control)(ErlDrvData drv_data, unsigned int command,
                char *buf, ErlDrvSizeT len, char **rbuf,
                ErlDrvSizeT rlen); /* "ioctl" for drivers - invoked by
                          port_control/3 */
    void (*timeout)(ErlDrvData drv_data);   /* Handling of timeout in driver */
    void (*outputv)(ErlDrvData drv_data, ErlIOVec *ev);
                /* called when we have output from erlang
                   to the port */
    void (*ready_async)(ErlDrvData drv_data, ErlDrvThreadData thread_data);
    void (*flush)(ErlDrvData drv_data);
                                /* called when the port is about to be 
                   closed, and there is data in the 
                   driver queue that needs to be flushed
                   before 'stop' can be called */
    ErlDrvSSizeT (*call)(ErlDrvData drv_data,
             unsigned int command, char *buf, ErlDrvSizeT len,
             char **rbuf, ErlDrvSizeT rlen,
             unsigned int *flags); /* Works mostly like 'control',
                          a synchronous
                          call into the driver. */
    void (*event)(ErlDrvData drv_data, ErlDrvEvent event,
          ErlDrvEventData event_data);
                                /* Called when an event selected by 
                   driver_event() has occurred */
    int extended_marker;    /* ERL_DRV_EXTENDED_MARKER */
    int major_version;      /* ERL_DRV_EXTENDED_MAJOR_VERSION */
    int minor_version;      /* ERL_DRV_EXTENDED_MINOR_VERSION */
    int driver_flags;       /* ERL_DRV_FLAGs */
    void *handle2;              /* Reserved -- Used by emulator internally */
    void (*process_exit)(ErlDrvData drv_data, ErlDrvMonitor *monitor);
                                /* Called when a process monitor fires */
    void (*stop_select)(ErlDrvEvent event, void* reserved);
                                /* Called on behalf of driver_select when
                   it is safe to release 'event'. A typical
                   unix driver would call close(event) */
    void (*emergency_close)(ErlDrvData drv_data);
                                /* called when the port is closed abruptly.
                   specifically when erl_crash_dump is called. */
    /* When adding entries here, dont forget to pad in obsolete/driver.h */
} ErlDrvEntry;

作为样例,简单的初始化stop、start和output就行:

ErlDrvEntry driver_entry = {
    NULL,
    drv_start,
    drv_stop,
    drv_output,
    NULL,
    NULL,
    "port_driver",
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    NULL,
    ERL_DRV_EXTENDED_MARKER,
    ERL_DRV_EXTENDED_MAJOR_VERSION,
    ERL_DRV_EXTENDED_MINOR_VERSION,
    0,
    NULL,
    NULL,
    NULL
};

利用宏 DRIVER_INIT 完成初始化:

#ifdef STATIC_ERLANG_DRIVER
#  define ERLANG_DRIVER_NAME(NAME) NAME ## _driver_init
#  define ERL_DRIVER_EXPORT
#else
#  define ERLANG_DRIVER_NAME(NAME) driver_init
#  if defined(__GNUC__) && __GNUC__ >= 4
#    define ERL_DRIVER_EXPORT __attribute__ ((visibility("default")))
#  elif defined (__SUNPRO_C) && (__SUNPRO_C >= 0x550)
#    define ERL_DRIVER_EXPORT __global
#  else
#    define ERL_DRIVER_EXPORT
#  endif
#endif

#ifndef ERL_DRIVER_TYPES_ONLY

#define DRIVER_INIT(DRIVER_NAME) \
    ERL_DRIVER_EXPORT ErlDrvEntry* ERLANG_DRIVER_NAME(DRIVER_NAME)(void); \
    ERL_DRIVER_EXPORT ErlDrvEntry* ERLANG_DRIVER_NAME(DRIVER_NAME)(void)

三个回调如下:

typedef struct {
    ErlDrvPort port;
} data;

static ErlDrvData drv_start(ErlDrvPort port, char* buf)
{
    data* d = (data*)driver_alloc(sizeof(data));
    d->port = port;
    return (ErlDrvData)d;
}

static void drv_stop(ErlDrvData handle)
{
    driver_free(handle);
}

void split_string(char* s, char** s1, char** s2)
{
    *s1 = s;
    *s2 = s + strlen(s) + 1;    
}

static void drv_output(ErlDrvData handle, char* buf,
        ErlDrvSizeT len)
{
    data* d = (data*)handle;
    char *s1, *s2;

    int res = 0, fn = buf[0];
    
    if (fn == 1) {
        res = strlen(buf + 1);
    } else if (fn == 2) {
        split_string(buf + 1, &s1, &s2);
        res = strcmp(s1,s2);
    }

    driver_output(d->port, (char*)&res, 1);
}

依然是利用端口交互。

分布式Erlang节点

分布式方式用C创建一个Erlang节点,以分布式的方式和Erlang交互。很独特的一种方法,详细的工作原理和数据格式以后会分析。

%% Erlang
-module(complex4).
-export([strlen/1, strcmp/2]).

-define(CNODE, 'cnode@192.168.0.4').

strlen(S) ->
    call_port({strlen, S}).
strcmp(S, T) ->
    call_port({strcmp, S, T}).

call_port(Msg) ->
    {any, ?CNODE} ! {call, self(), Msg},
    receive
        {cnode, Result} ->
            Result
    end.

另一端是什么语言编写的完全无所谓。

C程序用ErlMessage封装了消息:

typedef struct {
  int type;   /* one of the message type constants in eiext.h */
  ETERM *msg; /* the actual message */
  ETERM *from;
  ETERM *to;
  char to_name[MAXREGLEN+1];
} ErlMessage;

erl_receive_msg 接收消息,erl_send 发送消息,消息格式用erl_interface封装。可以实现为服务端和客户端,服务端等待Erlang节点连接,客户端节点要在Erlang启动后发起连接:

    erl_init(NULL, 0);

    addr.s_addr = inet_addr("192.168.0.4");
    if (erl_connect_xinit("192.168.0.4", "cnode", "cnode@192.168.0.4",
                &addr, "123456", 0) == -1)
        erl_err_quit("erl_connect_init error");

    if ((listen = listen_port(port)) <= 0)
        erl_err_quit("listen_port error");

    if (erl_publish(port) == -1)
        erl_err_quit("erl_publish error");

    if ((fd = erl_accept(listen, &conn)) == ERL_ERROR)
        erl_err_quit("erl_accept error");
    erl_init(NULL, 0);

    addr.s_addr = inet_addr("192.168.0.4");
    if (erl_connect_xinit("192.168.0.4", "cnode", "cnode@192.168.0.4",
                &addr, "123456", 0) == -1)
        erl_err_quit("erl_connect_init error");

    if ((fd = erl_connect("e1@192.168.0.4")) < 0)
        erl_err_quit("erl_connect error");

    fprintf(stderr, "Connected to e1@192.168.0.4\n"); 

连接成功后就是数据交互:

    while (loop) {
        got = erl_receive_msg(fd, buf, BUFSIZE, &emsg);
        if (got == ERL_ERROR) {
            loop = 0;
        } else if (got == ERL_TICK) {
            // pass
        } else {
            if (emsg.type == ERL_REG_SEND) {
                fromp = erl_element(2, emsg.msg);
                tuplep = erl_element(3, emsg.msg);
                fnp = erl_element(1, tuplep);
                s1 = erl_element(2, tuplep);
                
                if (strncmp((const char*)ERL_ATOM_PTR(fnp), "strlen", 6) == 0) {
                    res = strlen(erl_iolist_to_string(s1)); 
                } else if (strncmp((const char*)ERL_ATOM_PTR(fnp), "strcmp", 6) == 0) {
                    s2 = erl_element(3, tuplep);
                    res = strcmp(erl_iolist_to_string(s1), erl_iolist_to_string(s2));
                }

                resp = erl_format("{cnode, ~i}", res);
                erl_send(fd, fromp, resp);

                erl_free_term(emsg.from);
                erl_free_term(emsg.msg);
                erl_free_term(fromp);
                erl_free_term(tuplep);
                erl_free_term(fnp);
                erl_free_term(s1);
                erl_free_term(s2);
                erl_free_term(resp);
            }
        }
    }

NIF

最后一张方式,也是最新的,实现NIF内部函数。具体来说,就是编写一个动态链接库,加载到Erlang虚拟机,导出接口让Erlang调用。

-module(complex5).
-export([cstrlen/1, cstrcmp/2]).

-on_load(init/0).

init() ->
    ok = erlang:load_nif("./cnif", 0).

cstrlen(_S) ->
    exit(nif_library_not_loaded).

cstrcmp(_S, _T) ->
    exit(nif_library_not_loaded).

load_nif 加载模块,erlang形式的函数用于模块导出函数不存在时的stub。

C程序填充ERL_NIF_INIT宏:

#define ERL_NIF_INIT(NAME, FUNCS, LOAD, RELOAD, UPGRADE, UNLOAD) \
ERL_NIF_INIT_PROLOGUE                   \
ERL_NIF_INIT_GLOB                       \
ERL_NIF_INIT_DECL(NAME);        \
ERL_NIF_INIT_DECL(NAME)         \
{                   \
    static ErlNifEntry entry =      \
    {                   \
    ERL_NIF_MAJOR_VERSION,      \
    ERL_NIF_MINOR_VERSION,      \
    #NAME,              \
    sizeof(FUNCS) / sizeof(*FUNCS), \
    FUNCS,              \
    LOAD, RELOAD, UPGRADE, UNLOAD,  \
    ERL_NIF_VM_VARIANT,     \
    ERL_NIF_ENTRY_OPTIONS       \
    };                                  \
    ERL_NIF_INIT_BODY;                  \
    return &entry;          \
}     

实际上是初始化了ErlNifEntry结构体。

typedef struct enif_entry_t
{
    int major;
    int minor;
    const char* name;
    int num_of_funcs;
    ErlNifFunc* funcs;
    int  (*load)   (ErlNifEnv*, void** priv_data, ERL_NIF_TERM load_info);
    int  (*reload) (ErlNifEnv*, void** priv_data, ERL_NIF_TERM load_info);
    int  (*upgrade)(ErlNifEnv*, void** priv_data, void** old_priv_data, ERL_NIF_TERM load_info);
    void (*unload) (ErlNifEnv*, void* priv_data);
    const char* vm_variant;
    unsigned options;
}ErlNifEntry;

作为简单例子,这里只关注导出函数:

static ErlNifFunc nif_func[] = {
    {"cstrlen", 1, strlen_nif},
    {"cstrcmp", 2, strcmp_nif}
};

分别是函数名、元数和实现:

static ERL_NIF_TERM strlen_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
{
    int ret;
    char s[100];
    
    if (!enif_get_string(env, argv[0], s, 100, 1)) {
        return enif_make_badarg(env);
    }

    ret = strlen(s);

    return enif_make_int(env, ret);
}

static ERL_NIF_TERM strcmp_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
{
    int ret;
    char s1[100], s2[100];

    if (!enif_get_string(env, argv[0], s1, 100, 1) || 
            !enif_get_string(env, argv[1], s2, 100, 1)) {
        return enif_make_badarg(env);
    }

    ret = strcmp(s1, s2);

    return enif_make_int(env, ret);
}

最后

完整的代码在这里找到。

对我而言,平时C语言写的最多,而且往往是内核模块。结合C和Erlang是很有必要的,一定要搞清楚交互方式。这几种方式都透露着Erlang的设计哲学和内部实现,接下来需要深入代码来学习了。