【Node.js】node-cassandra-clientでアクセス


photo by Apache

4年間頑張ってくれた自作機の Windows XP を Windows 8 でクリーンインストールしました。

今回は CLI で簡単なデータモデルを作成し iOS クライアント側のコードを書いて, Node.jsから node-cassandra-client でデータを取得するまでです。

データモデルを作るまで

データモデルの構造はクラスタが一番外枠です。大きな単位から見ていくと以下のようになります。

  • Key Space : 最も大きな単位。
  • Column Family : 似たようなデータの論理的な集まり。PersonとかAnimalとかです。
  • Row(行): カラムの集合です。Personの中にmamerootとかAbeSinzoとかIchiroなどの識別子です。
  • Column(列): name(カラム名)とValueを持ちます。例えば、RowKeyがmamerootの場合, birth:1987, state:tired みたいなプロパティのディクショナリというイメージです。この最小単位nameとvalueと, さらにtimestampを持っています。timestampはデータ衝突回避のために用いられます。

以上を整理すると以下のようなデータモデル例になります。


keySpace: ExampleModel
  columfamily: Person
    Row : mameroot
      birth : 1987
      state : tired
    Row : AbeSinzo
      .......
    Row : Ichiro
      .......

それでは実際にデータモデルを作成するためにcassandraを起動します。デフォルトは9160portです。


$ $CASSANDRA_HOME/bin/cassandra
$ sudo lsof -i:9160
COMMAND  PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    2313 root   44u  IPv4 4143453      0t0  TCP localhost:apani1 (LISTEN)

CLIモードで操作します。versionが古いと昔のコマンドが使えないときもあります。


$ $CASSANDRA_HOME/bin/cassandra-cli
Connected to: "Test Cluster" on 127.0.0.1/9160
Welcome to Cassandra CLI version 1.1.7

起動portは /usr/local/cassandra/conf/cassandra.yamlに書かれています。

クラスタ名の確認


$ show cluster name;

ResultGroupというkeyspaceを作成します。


[default@unknown] create keyspace ResultGroup;

keyspacesを確認します。カラムファミリーも同時に表示されます。


[default@unknown] show keyspaces;
Keyspace: ResultGroup:

作成したResultGroupに切り替えます。


[default@unknown] use ResultGroup;
Authenticated to keyspace: ResultGroup
[default@ResultGroup] 

カラムファミリーを作成。


[default@ResultGroup] create column family Category;
14b91be7-f92c-33de-b667-36b787b34855
Waiting for schema agreement...
... schemas agree across the cluster

org.apache.cassandra.db.marshal.MarshalException: cannot parse 'first' as hex bytes

とでたら、UTF8に設定しましょう。


[default@ResultGroup] create column family Category with comparator=UTF8Type and default_validation_class=UTF8Type and key_validation_class=UTF8Type;  

カラムをセットします。


[default@ResultGroup] set Category['life']['quiznumbers']='0'; 
Value inserted.
Elapsed time: 46 msec(s).

カラムの内容を確認します。


[default@ResultGroup] get Category[life];
=> (column=quiznumbers, value=0, timestamp=1354576888986000)
Returned 1 results.
Elapsed time: 47 msec(s).

カラムを削除します。


[default@ResultGroup] del Category['Life']['Rate'];

キースペースが持つカラムファミリーを確認する場合は以下のようにします。


[default@ResultGroup] describe ResultGroup;

カラムファミリーを削除する場合はdropを使います。


[default@ResultGroup] drop column family Category01;
6a2600d0-8514-35ad-b8c1-c1ee5888d64d
Waiting for schema agreement...
... schemas agree across the cluster

iOSクライアント

iOS5から JsonFrameworkが使えるみたいですが, そのことには気づけなかった私は SBJsonと ASIHTTPRequestを使いました。
まず, Get/PostするJsonControllerクラスをつくります。実際は NSArrayをJSONにして Postしますが, 今は試しに直書き。


#import "ASIFormDataRequest.h"
#import "SBJson.h"
#import "JsonController.h"

-(void)GetData{
    recieveDataEnd = NO;
    NSURLRequest *request = [NSURLRequest requestWithURL:[NSURL URLWithString:@"https://ec2-****.ap-northeast-1.compute.amazonaws.com:****/response.json"]];
    NSData *jsonData = [NSURLConnection sendSynchronousRequest:request returningResponse:nil error:nil];
    SBJsonParser *parser = [[SBJsonParser alloc] init];
    recieveData = [parser objectWithData: jsonData];// NSData to NSMutable
    NSLog(@"JSON NSMutableArray=%@", [recieveData description]);
}
-(void)PostData{
    NSURL *url = [NSURL URLWithString:@"https://ec2-****.ap-northeast-1.compute.amazonaws.com:****/post.json"];
    ASIFormDataRequest *request = [ASIFormDataRequest requestWithURL:url];
    [request setRequestMethod:@"POST"];
    [request setPostValue:@"1" forKey:@"quiznumbers"];
    [request setDelegate:self];
    [request startAsynchronous]; 
}

続いて, それぞれのリクエストに対応したコードをNodejsに書きます。

Node.jsからcassandra-nodeでDBアクセス

Expressを使いました。

まず, 起動時にcassandraへの接続の確認をします。成功時には Keyspace is Existが表示されます。


// Keyspace確認
var System = require('cassandra-client').System;
var sys = new System('127.0.0.1:9160');

sys.describeKeyspace('ResultGroup', function(err, ksDef) {
  if (err) {
            console.log("Keyspace is not exist");
  } else {
            console.log("Keyspace is Exist ");
  }
});

app.get('/getData.json', function(request, response) {
  response.contentType('application/json');
  ...
}

POSTはexpressのpaserを使います。


app.use(express.bodyParser());// parse機能

app.post('/setData.json', function(req, res){
 console.log(req.body); // for logging
}

cassandraへのクエリは CQL (Cassandra Query Language)で行います。こちらを参考にしました。


var param = ['**','**','**','**','**'];
con.execute('UPDATE Category SET ?=?,?=? WHERE KEY=?', param, function (err, rows) {
   if (err) {
          response.write("response err:"+err); 
   } else {
          // 成功時のコード
   }response.end();
}

サーバ起動の手順は下記


$ $CASSANDRA_HOME/bin/cassandra
$ node app.js
Express server listening on port 3000
Keyspace is Exist 
Success Access

iOSシミュレータからアクセスしてカラムを作成して, CLIモードで前後のデータを確認します。


// before
[default@ResultGroup] get Category[life];
=> (column=collectRate, value=60, timestamp=1355545828872000)
=> (column=quiznumbers, value=0, timestamp=1354576888986000)
// after
[default@ResultGroup] get Category[life];
=> (column=collectRate, value=80, timestamp=1355697336068000)
=> (column=quiznumbers, value=2, timestamp=1355697336068001)

サーバで3000portのサービスを確認します。


$ lsof -i:3000
COMMAND   PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
node    21555  fis    7u  IPv4 8255834      0t0  TCP *:hbci (LISTEN)

次にブラウザからアクセスしてJSONが返されるかテストします。


https://ec2-****.ap-northeast-1.compute.amazonaws.com:3000/response.json

[{"name":"collectRate","value":"80"},{"name":"quiznumbers","value":"2"}]

下記でも意味は同じですので, 本番時は効率の良いデータモデルにしたいですね。


[{"collectRate":"80"},{"quiznumbers","2"}]

cassandraをJSONでバックアップ

DBの中身をJSONでバックアップします。JSONならスクリプト言語での加工も簡単です。


$CASSANDRA_HOME/bin/sstable2json /var/lib/cassandra/data/ResultGroup/Category/*****.db > Category.json

cassandraのdemonize

補足で, cassandraをデーモンとして起動するコマンドをメモります。


$ sudo $CASSANDRA_HOME/bin/cassandra -p /var/run/cassandra.pid

デーモンの停止は


$ kill -KILL `cat /var/run/cassandra.pid`

cassandraのPIDを監視するpythonスクリプトです。


#-*- coding: utf-8 -*-
import time
import commands
import daemon

dc = daemon.DaemonContext()

with dc:
 while 1:
  cmd = "ps -ax -ef | grep 'java -ea -javaagent:/opt/cassandra/apache-cassandra/b' | grep -v grep | wc -l"
  ret = commands.getoutput(cmd)

  if ret == 0:
   start = '$CASSANDRA_HOME/bin/cassandra -p /var/run/cassandra.pid'
   commands.getoutput(start)
  time.sleep(60)

凡ミス集

起動しようとしたら, すでにportが使用されていたパターン。


エラー: エージェントが例外をスローしました。 : java.rmi.server.ExportException: Port already in use: 7199;
nested exception is: java.net.BindException

$CASSANDRA_HOME/bin/cassandra.bat のportを編集して解決しました。

ポートがあいてなかったり, ホスト名が間違ってたりと大抵は凡ミスです。


Error: getaddrinfo ENOENT
Error: connect ECONNREFUSED

以下、参考まで。


host:        cassandra host
port:        cassandra port
keyspace:    cassandra keyspace
user:        [optional] cassandra user
pass:        [optional] cassandra password
use_bigints: [optional] boolean. toggles whether or not BigInteger or Number instances are in results.

$ ps -a
  PID TTY          TIME CMD
14352 pts/0    00:00:00 node
14893 pts/1    00:00:00 ps
$ kill 14352

killでダメなときは killall -9 を自己責任で試してください。オプションの数字はシグナルの番号で9は強制終了です。

続いて, 適当に動作させてから1週間後にcassandraが落ちた問題。


org.apache.thrift.transport.TTransportException: java.net.ConnectException: 接続を拒否されました

forerverでNodeは動いてもcassandraが自動で再起動されてないためサービスがストップしてしまいました。JVMが亡くなったみたい。