【RTP / RTSP】Live555 の使い方と解説

Live555はC++で書かれたRTP/RTCP, RTSP, SIPでメディアをマルチストリーミングするためのライブラリです。
3rdPartyのソフトには VLC, GoodPlayer, MPlayer, OpneRTSP などがあります。
Linuxではカーネル2.6以降で動作します。iOS実装でも実績があります。

* OSSのRTP/RTSPサーバでは同様の機能を持つgstreamerも有名です。gstreamerはC言語で書いたオブジェクト指向ライブラリで勉強になると思います。

live555

Live555を使う

公式はこちらです。
まずはインストールします。

# Mac OS X
$ ./genMakefile macosx
$ make && make install

テスト用のサーバを起動させます。

$ ./testOnDemandRTSPServer

次にRTSPクライアント(openRTSP)でサーバに接続します。

$ openRTSP rtsp://192.168.0.3:8554/h264ESVideoTest
Opening connection to 192.168.0.3, port 8554...
...remote connection opened
Sending request: OPTIONS rtsp://192.168.0.3:8554/h264ESVideoTest RTSP/1.0
CSeq: 2
User-Agent: openRTSP (LIVE555 Streaming Media v2013.09.18)

ソースコードリーディング

ソースのディレクトリごとに簡単に機能を説明します。
全体的にはベースとなる基底クラスを継承していくことで様々なメディアに対応する設計になっています。

$ ls -F | grep /
BasicUsageEnvironment/
UsageEnvironment/
WindowsAudioInputDevice/
groupsock/
liveMedia/
mediaServer/
proxyServer/
testProgs/
wis-streamer/

BasicUsageEnvironment

まずBasicTaskScheduler0.cppではリクエストを待つためのループイベントを呼んでいます。

void BasicTaskScheduler0::doEventLoop(char* watchVariable) {
  // Repeatedly loop, handling readble sockets and timed events:
  while (1) {
    if (watchVariable != NULL && *watchVariable != 0) break;
    SingleStep();
  }
}

SingleStep()はBasicTaskSchedulerクラスにありソケット接続、ファイルディスクリプタ管理をしています。

 void BasicTaskScheduler::SingleStep(unsigned maxDelayTime) 

fdの監視はselect()で行っています。

 
int selectResult = select(fMaxNumSockets, &readSet, &writeSet, &exceptionSet, &tv_timeToDelay);

非同期読み取りイベントのハンドラを割り当てるため、据え置きイベントをスケジュールするために使用されます。
また、セッションクッキー等を生成するhashTableクラスは汎用的なhashテーブルへのインターフェイスです。hashテーブルはUsageEnvironmentクラスにあります。これをサブクラス化することでLLとも連携できます。

UsageEnvironment

hashテーブルや文字処理 (strDup.cpp) のクラスです。

groupsock

Groupsockはmulticastで配信するためのUsageEnvironmentのサブクラスです。
multicastで送信するためにソケットをカプセル化しています。
select()でfdを監視、ハンドリングしています。

inet.cではアドレス操作ルーチン、ServerMediaSession.cではSDPレスポンスの生成、GroupsockHelper.cppはGroupsock.cppの補助クラスで低級関数の処理を担当しています。

* SDPとはSession Description Protocolの事でSessionの告知や招待、他のMultiMediaSessionを開始するために必要な情報を記述することを目的としています。RTSPのDESCRIBEメソッドに付随させて送信します。

IOHandlers.cppではsocketReadHandlerでhandleRead()を呼んでいます。

void socketReadHandler(Socket* sock, int /*mask*/) {
  unsigned bytesRead;
  struct sockaddr_in fromAddress;
  UsageEnvironment& saveEnv = sock->env();
      // because handleRead(), if it fails, may delete "sock"
  if (!sock->handleRead(ioBuffer, ioBufferSize, bytesRead, fromAddress)) {
    saveEnv.reportBackgroundError();
  }
}

handleReadではソケットからストリームを読み込み、SSM groupの場合addressが一致しているか確認しています、
SSMはsource-specific multicastの略で、ソースを特定して運用するマルチキャストを意味していているようです。逆に、ソースを特定しないで、すべてのソースからのマルチキャストパケットを受信する運用をASM (Any Source Multicast)というらしいです。
よくわかりませんが、マルチキャスト対応のルータで効力を発揮するのかな(?)

Boolean Groupsock::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
			      unsigned& bytesRead,
			      struct sockaddr_in& fromAddress) {

TunnelEncaps.hhではトンネリング時の処理で使っているみたいです。

RTPInterfaceクラスではTCPベースの配信の場合の拡張機能を持っています。

void RTPInterface
::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
  // Normal case: Arrange to read UDP packets:
  envir().taskScheduler().
    turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);

  // Also, receive RTP over TCP, on each of our TCP connections:
  fReadHandlerProc = handlerProc;
  for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
       streams = streams->fNext) {
    // Get a socket descriptor for "streams->fStreamSocketNum":
    SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);

    // Tell it about our subChannel:
    socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
  }
}

startNetworkReadingはMultiFramedRTPSourceから呼ばれます。
MultiFramedRTPSourceでは,startNetworkReadingで TCPパケット の処理を行い nextTask() = envir().taskScheduler().scheduleDelayedTask(0,FramedSource::afterGetting) で次のタスクをスケジューラに登録する流れとなっています。

実際にソケットからreadするためのハンドラーがRTPInterfaceのhandleReadメソッドです。
Groupsock::handleRead,RTPInterface::handleReadがそれにあたります。中ではreadSocketを呼んでいます。

int numBytes = readSocket(env(), socketNum(),buffer, maxBytesToRead, fromAddress);

また、tcpReadHandler1で1byteずつソケットから読み込みます。
まず,’$’を探し channel id( 1-byte ), packet size (2-byte) の順で全て取得したら, readSocket で実際のデータサイズを見て1以上の場合にはRTPデータを読みにいきます。データが1以上の場合は AWAITING_PACKET_DATA 状態を継続し次のデータを読みにいきます。詳しくはRFC2326 10.12 Embedded (Interleaved) Binary Dataを参照ください。

int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);

readSocketの中ではrecvfromシステムコールでfromAddressからreadしています。

int bytesRead = recvfrom(socket, (char*)buffer, bufferSize, 0,
			   (struct sockaddr*)&fromAddress,
			   &addressSize);

liveMedia

liveMediaはRTP/RTCP, RTSP, SIPの基本的な機能を担当しています。自分でアプリケーションを作成する場合はliveMedia.hhをincludeする必要があります。

例えば、RTSPServerクラスではRTSPハンドリング、RTSPレスポンス、ストリーミングファイル設定を行っています。
まず以下の長い宣言で、taskSchedulerにRTSPハンドリングの開始を通知しています。

RTSPServer::RTSPServer(UsageEnvironment& env,
		       int ourSocket, Port ourPort,
		       UserAuthenticationDatabase* authDatabase,
		       unsigned reclamationTestSeconds)
  : Medium(env),
    fRTSPServerPort(ourPort), fRTSPServerSocket(ourSocket), fHTTPServerSocket(-1), fHTTPServerPort(0),
    fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
    fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),
    fClientConnectionsForHTTPTunneling(NULL), // will get created if needed
    fClientSessions(HashTable::create(STRING_HASH_KEYS)),
    fPendingRegisterRequests(HashTable::create(ONE_WORD_HASH_KEYS)),
    fAuthDB(authDatabase), fReclamationTestSeconds(reclamationTestSeconds)

リクエストのRTSPメソッド判定やレスポンス生成はRTSPClientConnectionクラスで行っています。
実際にRTSPリクエストをParseしているのがRTSPCommonクラスのparseRTSPRequestStringメソッドです。

Boolean parseRTSPRequestString(char const* reqStr,
			       unsigned reqStrSize,
			       char* resultCmdName,
			       unsigned resultCmdNameMaxSize,
			       char* resultURLPreSuffix,
			       unsigned resultURLPreSuffixMaxSize,
			       char* resultURLSuffix,
			       unsigned resultURLSuffixMaxSize,
			       char* resultCSeq,
			       unsigned resultCSeqMaxSize,
                               char* resultSessionIdStr,
                               unsigned resultSessionIdStrMaxSize,
			       unsigned& contentLength) 

resultCmdNameにはRTSPメソッドが格納され,戻り値のparseSucceededがTrueの場合はresponseを生成していきます。
DESCRIBEのハンドリングの場合にDigest認証のチェックが入ります。

if (authDB == NULL) return True;

authDBの設定がある場合は,以下で認証の処理に入ります。

if (!authenticationOK("DESCRIBE", urlTotalSuffix, fullRequestStr)) break;

authenticationOK内で認証情報を含んだヘッダを探すparseAuthorizationHeaderを呼んでいます。

while (1) {
    if (*buf == '\0') return False; // not found
    if (_strncasecmp(buf, "Authorization: Digest ", 22) == 0) break;
    ++buf;
}

authenticationOKで Digest認証 の情報を取得し MD5 で解読し username から設定された password を参照し判定しています。

char const* password = fOurServer.fAuthDB->lookupPassword(username);

parseRTSPRequestStringの戻り値であるparseSucceededがFalseの場合は通常 400 Bad Requestに流れますが, 一度HTTPトンネリングのチェックが入ります。

HTTPトンネリングであるかはparseHTTPRequestStringメソッドで判定しています。

parseSucceeded = parseHTTPRequestString(cmdName, sizeof cmdName,
					      urlSuffix, sizeof urlPreSuffix,
					      sessionCookie, sizeof sessionCookie,
					      acceptStr, sizeof acceptStr);

sessionCookieとacceptStrはポインタでデータを渡しています。
parseHTTPRequestStringメソッドでは以下のようにヘッダフィールドを検索するlookForHeaderでその値を取得しsessionCookieとacceptStrに代入しています。

lookForHeader("x-sessioncookie", &reqStr[i], reqStrSize-i, sessionCookie, sessionCookieMaxSize);
lookForHeader("Accept", &reqStr[i], reqStrSize-i, acceptStr, acceptStrMaxSize);

基本的には HTTP/*.* の文字列があれば parseSucceeded にTrue が set されます。
ない場合は False が set され 400 Bad Request が返ります。

HTTPトンネリングではRTSPヘッダーはbase64Encodeされてくるのでbase64Decodeします。

unsigned char* decodedBytes = base64Decode((char const*)(ptr-fBase64RemainderCount), numBytesToDecode, decodedSize);

base64Decodeには Boolean trimTrailingZeros を指定できます。

unsigned char* base64Decode(char const* in, unsigned& resultSize,
			    Boolean trimTrailingZeros) {
  if (in == NULL) return NULL; // sanity check
  return base64Decode(in, strlen(in), resultSize, trimTrailingZeros);
}

このオプションはbase64Encode過程で追加されたpadding-bitをトリミングします。

if (trimTrailingZeros) {
    while (paddingCount > 0 && k > 0 && out[k-1] == '\0') { --k; --paddingCount; }
  }

逆にbase64Encodeする場合は入力文字数によって,padding文字”=”を詰めることが必要です。(RFC4648)

続いて,GETハンドリング時を行うhandleHTTPCmd_TunnelingGETでHashTableクラスからsessionCookieを生成しています。

 
fOurServer.fClientConnectionsForHTTPTunneling->Add(sessionCookie, (void*)this);

これをPOST時に参照してoverHTTPの手続きを踏んでいるか確認しているようです。
踏んでいない場合やそれ以外のHTTPメソッドはhandleHTTPCmd_notSupportedを呼んで、405 Method Not Allowedを返しています。

RTCP.cppはRTCPというRTPでデータを送受信するためのセッションを制御するプロトコルの実装クラスです。ここではonReceiveとsendReportが中心的な機能です。

RTSPServer::RTSPClientConnection* prevClientConnection
    = (RTSPServer::RTSPClientConnection*)(fOurServer.fClientConnectionsForHTTPTunneling->Lookup(sessionCookie));

RTCP/RTPポートの設定はOnDemandServerMediaSubsession.cppで行っています。
設定を行うgetStreamParametersメソッドが呼ばれるタイミングはSETUPコマンドのハンドリング中です。

void OnDemandServerMediaSubsession
::getStreamParameters(unsigned clientSessionId,netAddressBits clientAddress, Port const& clientRTPPort,
		      Port const& clientRTCPPort,int tcpSocketNum,unsigned char rtpChannelId,unsigned char rtcpChannelId,
		      netAddressBits& destinationAddress, u_int8_t& /*destinationTTL*/, Boolean& isMulticast,
		      Port& serverRTPPort,Port& serverRTCPPort, void*& streamToken) 

RTCPのポートはRTPの+1で設定しています。

serverRTPPort = serverPortNum;
rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
if (rtpGroupsock->socketNum() < 0) {
  delete rtpGroupsock;
  continue; // try again
}

serverRTCPPort = serverPortNum+1;
rtcpGroupsock = new Groupsock(envir(), dummyAddr, serverRTCPPort, 255);
if (rtcpGroupsock->socketNum() < 0) {
  delete rtpGroupsock;
  delete rtcpGroupsock;
  continue; // try again
}

RTSPClientConnectionクラス では sessionCookie を削除しています。

fOurServer.fClientConnectionsForHTTPTunneling->Remove(fOurSessionCookie);

RTSPClientSessionではRTSPメソッドのハンドリングをしています。

POSTの中ではBASE64でエンコード化されたRTSPメソッドがクライアントから投げられるので、POST自体のコネクションを閉じずにRTSPレスポンスを返す必要があります。
これをHTTPサーバをベースに実装しようとするとクライアントからエンティティボディが32767という最大サイズで来て、リクエストが終了する前からリクエストパース処理を開始するという仕組みが必要になるので,TCPベースのプロトコルであるHTTPサーバとUDPベースのRTP/RTSPサーバでは思想が違う点で難しいかもしれません。なので,RTSPサーバを改造した方が,HTTPベースの配信はそもそもoptionであるという点からもスマートだと言えるかもしれません。

* RTSPはRFC2326で定義されています。実装を見る前にチェックしておくとHTTPトンネリングのGET > POSTの流れが必要であることの理解が早いと思います。

liveMediaではさらにh264やMPEGなどの様々なフォーマットのハンドリング、TCPStreamSinkなどのストリーム機能、Digest認証の機能があります。

mediaServer

基本となるアプリケーションの雛形が含まれています。

testProgs

サンプル用のテストプログラム群です。
通信プロトコルの手続き確認や自分でアプリケーションを作るときの参考になります。

wis-streamer

Live555を活用したRTSPサーバのアプリケーションです。Live555のサイトから別途ダウンロードできます。
wis-streamer.cppでは配信Portの設定やoverHTTPのEnable設定が行えます。

ストリーミングのソースとの紐付けをしているのが WISInputクラス です。
アプリケーション (wis-streamer.cpp) の先頭の方でコンストラクタを取得しています。

WISInput* inputDevice = WISInput::createNew(*env);
  if (inputDevice == NULL) {
    err(*env) << "Failed to create WIS input device\n";
    exit(1);
  }

UDPベースの配信はUnicast,overHTTPはMulticast対応みたいですね。

RTSPServer::createNew(*env, rtspServerPortNum, authDB)でRTSPサーバをインスタンス化しています。
4つ目の引数は指定しない場合は, reclamationTestSeconds がデフォルトで65秒に設定されます。
これはクライアントの生存確認を行う機能のようで,TEARDOWN以外でセッションが閉じられた時にサーバは知る由がないですが65秒経過したときにチェックが入りこの時点でセッションは破棄されます。
タイムアウトを指定しない場合は明示的にNULLを指定します。

先ほどのWISInputクラスのインスタンスをsetupUnicastStreamingもしくはsetupMulticastStreamingに渡しています。

  // Create the RTSP server:
  RTSPServer* rtspServer = NULL;
  if (streamingMode == STREAMING_UNICAST_THROUGH_DARWIN) {
    // Special case: Streaming through a Darwin Streaming Server:
    setupDarwinStreaming(*env, *inputDevice);
  } else {
    // Normal case: Streaming from a built-in RTSP server:
    rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, authDB);
    if (rtspServer == NULL) {
      *env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
      exit(1);
    }

    *env << "...done initializing\n";

    // Create a record describing the media to be streamed:
    ServerMediaSession* sms
      = ServerMediaSession::createNew(*env, "", NULL, streamDescription,
				      streamingMode == STREAMING_MULTICAST_SSM);
    rtspServer->addServerMediaSession(sms);
    char *url = rtspServer->rtspURL(sms);
    *env << "Play this stream using the URL:\n\t" << url << "\n";
    delete[] url;

    // Configure it for unicast or multicast streaming:
    if (streamingMode == STREAMING_UNICAST) {
      setupUnicastStreaming(*inputDevice, sms);
    } else {
      setupMulticastStreaming(*inputDevice, sms);
    }
  }

ServerMediaSessionのインスタンスをRTSPserverクラスのaddServerMediaSessionメソッド内でstreamNameを取得しfServerMediaSessionsにAddしています。

void RTSPServer::addServerMediaSession(ServerMediaSession* serverMediaSession) {
  if (serverMediaSession == NULL) return;
  
  char const* sessionName = serverMediaSession->streamName();
  if (sessionName == NULL) sessionName = "";
  removeServerMediaSession(sessionName); // in case an existing "ServerMediaSession" with this name already exists
  
  fServerMediaSessions->Add(sessionName, (void*)serverMediaSession);
}

最後にRTSPサーバのイベントループに入りクライアントからの接続を待ちます。

 // Begin the LIVE555 event loop:
  env->taskScheduler().doEventLoop(); // does not return

AACAudioEncoderクラス は FramedFilterの継承, WISJPEGVideoServerMediaSubsessionクラスは WISServerMediaSubsessionクラスの継承, そのWISServerMediaSubsessionクラス は OnDemandServerMediaSubsessionの継承 とクラスの継承による拡張性に重点をおいた設計になっています。