【Node.js】Socket.IO@1.0 を読んでみた

socket.io@1.0で大きくアーキテクチャが変わるみたいなので調べてみました。
binaryをサポートしたことで, 例えば node-canvas でサーバ側で rendering した canvasをbinary で効率良く送信もできるとの事。

socketio

socket.io@0.9の構成が以下です。

/lib$ tree
.
├── logger.js
├── manager.js
├── namespace.js
├── parser.js
├── socket.io.js
├── socket.js
├── static.js
├── store.js
├── stores
│   ├── memory.js
│   └── redis.js
├── transport.js
├── transports
│   ├── flashsocket.js
│   ├── htmlfile.js
│   ├── http-polling.js
│   ├── http.js
│   ├── index.js
│   ├── jsonp-polling.js
│   ├── websocket
│   │   ├── default.js
│   │   ├── hybi-07-12.js
│   │   ├── hybi-16.js
│   │   └── index.js
│   ├── websocket.js
│   └── xhr-polling.js
└── util.js

socket.io@1.0の構成です。

/lib$ tree
.
├── adapter.js
├── client.js
├── index.js
├── namespace.js
└── socket.js

1.0では従来より接続開始時の高速化が行われているようです。
socket.ioはクライアント側で Websocket (リアルタイム全二重通信プロトコル, RFC6455)が使えない場合に xhr-polling, jsonp-polling などでまずは確実に接続を確立する方式が取られています。
そして, xhr, jsonp から Websocket に Upgrade する仕組みを取っているようです。

従来から, transports層を意識せずにアプリ開発に専念できる透過的な設計になっていました。
このtransports部分がengine.ioというモジュールに委譲され独立性が高くなりました。
engine.ioのpackage.json(“The realtime engine behind Socket.IO. Provides the foundation of a bidirectional connection between client and server”)にもあるようにsocket.ioの派生プロジェクトの扱いになっています。

engine.io@0.7.14の構成は以下です。

/lib$ tree
.
├── engine.io.js
├── index.js
├── server.js
├── socket.js
├── transport.js
└── transports
    ├── flashsocket.js
    ├── flashsocket.xml
    ├── index.js
    ├── polling-jsonp.js
    ├── polling-xhr.js
    ├── polling.js
    └── websocket.js

flushメソッド等, call/applyの仕組みを使っている場面が多いようです。

* flush : ログ、キャッシュを全て消す。または流れているデータを一気に押し流して何もない状態にする

ソースコード

Socket.IOはインストールは以下です。

$ npm install socket.io -g

socket.io@1.0の全体観を簡単に書いてみました。ちなみに, socket.io-clientは1500行程のサイズです。

index.js

index.jsからです。

module.exports = process.env.SIO_COV
? require('./lib-cov/')
: require('./lib/');

./lib/index.jsでengine.ioモジュールを呼んでいます。

var http = require('http');
var send = require('send');
var parse = require('url').parse;
var engine = require('engine.io');
var Client = require('./client');
var Namespace = require('./namespace');
var Adapter = require('./adapter');
var debug = require('debug')('socket.io:server');

Serverクラスは以下です。

function Server(srv, opts){
  if (!(this instanceof Server)) return new Server(srv, opts);
  if ('object' == typeof srv && !srv.listen) {
    opts = srv;
    srv = null;
  }
  opts = opts || {};
  this.nsps = {};
  this.path(opts.path || '/socket.io');
  this.serveClient(false !== opts.serveClient);
  this.adapter(opts.adapter || Adapter);
  this.sockets = this.of('/');
  if (srv) this.attach(srv, opts);
}

注目すべきは attach(srv, opts)でsocket.ioをengine.ioにbindしています。

// engine.io.js
function attach(server, options) {
  var engine = new exports.Server(options);
  engine.attach(server, options);
  return engine;
};

ofメソッドでnamespaceを参照しています。

Server.prototype.of = function(name, fn){
  if (!this.nsps[name]) {
    debug('initializing namespace %s', name);
    var nsp = new Namespace(this, name);
    this.nsps[name] = nsp;
  }
  if (fn) this.nsps[name].on('connect', fn);
  return this.nsps[name];
};

this.nsps[name].on(‘connect’, fn) でnamespace(nsp)にイベントを登録することもできるようになっています。

namespace.js

room はクライアントを部屋ごとに分割することができる機能です。チャット部屋のイメージですね。

function Namespace(server, name){
  this.name = name;
  this.server = server;
  this.adapter = new (server.adapter())(this);
  this.sockets = [];
  this.connected = {};
  this.fns = [];
  this.ids = 0;
  this.acks = {};
}

NameSpaceクラスのaddメソッド内, runでクライアント別に処理を開始し, 状態判定(‘open’ == client.conn.readyState)して connect, connectionイベントをemitしています。

self.emit('connect', socket);
self.emit('connection', socket);

emitメソッドで this.adapter.broadcast していています。
渡している opt の rooms には this.rooms がセットされていて, この rooms の中の room の id ごとに socket.packet() で packet を write しています。

Namespace.prototype.emit = function(ev){
  if (~exports.events.indexOf(ev)) {
    emit.apply(this, arguments);
  } else {
    // set up packet object
    var args = Array.prototype.slice.call(arguments);
    var packet = { type: parser.EVENT, data: args };

    if ('function' == typeof args[args.length - 1]) {
      throw new Error('Callbacks are not supported when broadcasting');
    }

    this.adapter.broadcast(packet, {
      rooms: this.rooms,
      flags: this.flags
    });

    delete this.rooms;
    delete this.flags;
  }
  return this;
};

broadcast送信か, rooms に対しての broadcast送信 (toメソッド)か, socket.id別送信かはこちらが参考になりました。

動的に NameSpace を追加する hook-point の話はこちらを参考にしました。

socket.js

Nodeの設計指針は基本的に非同期処理前提で速度重視なので、callbackを受け取る設計は大変です。(ES6以前)
実際プログラムを書いていて気づくとネストが深った結果、読みづらくなるというケースがあります。
そこで,EventEmitterというListenerをアタッチすることができる仕組みを使います。
EventEmitterはおおまかにはonメソッドでイベント登録(クライアントでいうaddEventListener)して、emitメソッドで実行することができます。ちなみに一度だけ実行の場合はOnce、登録解除はoffではなくremoveListenerです。

EventEmitterについてはevents.jsを見よう。

// EventEmitter methods
emitter.on(event, listener)
emitter.once(event, listener)
emitter.emit(event, [arg1], [arg2], [...])
emitter.removeListener(event, listener)
emitter.removeAllListeners([event])

socket.ioもEventEmitterを継承(prototype-chain)しています。

Socket.prototype.__proto__ = Emitter.prototype;

Socketクラスは Namespaceクラス と Clientクラス のインターフェイス的な位置づけ。

function Socket(nsp, client){
  this.nsp = nsp;
  this.server = nsp.server;
  this.adapter = this.nsp.adapter;
  this.id = client.id;
  this.request = client.request;
  this.client = client;
  this.conn = client.conn;
  this.rooms = [];
  this.acks = {};
  this.connected = true;
  this.disconnected = false;
}

Socketオブジェクトのメソッドは書き込み系の emit, to, in, send, writeや,接続系の onevent, ack, onack, ondisconnect, onclose, disconnect,そしてroom管理系のjoin, leave, leaveAllがあります。

ちなみにEventEmitter2なるものもあるようです。

adapter.js

プロセス間の共有のためのクラス(?)でしょうか。

client.js

Clientクラスは主にクライアントの Connection管理 をしています。

function Client(server, conn){
  this.server = server;
  this.conn = conn;
  this.id = conn.id;
  this.request = conn.request;
  this.setup();
  this.sockets = [];
  this.nsps = {};
}

connectメソッドで namespace に追加してあげたり, ondataメソッドで data の parse を開始したり, disconnect で socket を閉じたり, Socketオブジェクト側から remove要求 があったら namespace を delete というようなことをしています。

socket.io-stream

Shell Scriptorにとってはお馴染みのstreamですが, nodejsでもpipe()でデータの流れを制御できたら楽しいですね。
nodejsではI/Oを抽象化したインターフェイスがStream APIのようです。
(間違っていたらすみません)

npm install socket.io -g 
npm install socket.io-stream -g
npm install socket.io-client -g

server側のコードです。受け取ったstreamをfoo.txtにwriteします。

"use strict";
// Server
var fs = require('fs');
var io = require('socket.io').listen(3002);
var ss = require('socket.io-stream');

io.on('connection', function(socket) {
  ss(socket).on('foo', function(stream) {
    stream.pipe(fs.createWriteStream('foo.txt'));
  });
});

client側のコードです。bar.txtをreadしてwsで送信します。

"use strict";
// Client
var fs = require('fs');
var io = require('socket.io-client');
var ss = require('socket.io-stream');

var socket = io.connect('https://localhost:3002');
var stream = ss.createStream();

ss(socket).emit('foo', stream);
fs.createReadStream('bar.txt').pipe(stream);

browser版のclient側コード。ローカルでreadしたファイルを送信します。


<input id="file" type="file" />

<script src="https://cdn.socket.io/socket.io-1.2.1.js"></script>
<script src="https://raw.githubusercontent.com/nkzawa/socket.io-stream/master/socket.io-stream.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
<script>
"use strict";
$(function() {
  var socket = io.connect('https://localhost:3002');

  $('#file').change(function(e) {
    var file = e.target.files[0];
    var stream = ss.createStream();
    console.log(file)

    // upload a file to the server.
    ss(socket).emit('foo', stream, {size: file.size});
    ss.createBlobReadStream(file).pipe(stream);
  });
});
</script>

Nodeでネットワークのモジュールを書く時はSocket.IOの書き方を参考にすると良さそうです。
とりあえず、今後もsocket.ioに注目です。