【Async / Concurrent】『Haskellによる並列・並行プログラミング』を読了した (2)

Haskellによる並列・並行プログラミング』を読了した。前回の続きでII部は並行プログラミング/分散プログラミングについて。前半同様に難しかった印象しかないが雰囲気は掴めた感じ。内容は完全に備忘録である。

本書によるとHaskellの哲学では, 単純だが汎用的な機構を提供し, より高水準の機能を構築できるような仕組みを目指している。

Haskellは並行性に対して便利な抽象化であるという立場を取っており, 下記の並行プログラミングモデルについて, ひとつに依存せずどれもサポートしている。

  • アクターモデル [1]
  • 共有メモリモデル
  • トランザクションモデル

7章 並行制御の基本:スレッドとMVar


7.1 例:備忘通知
7.2 通信:MVar
7.3 簡単なチャネルとしてのMVar:ログサービス
7.4 共有状態としてのMVar
7.5 組み立て部品としてのMVar:無制限チャネル
7.6 公平性

MVarは基本的なスレッド間通信の機構である。takeMVarは中身が詰まった箱から中身を取り出して返す。箱が空なら詰められるまでブロックする。
逆の操作である putMVarはMVarに値を詰めるが, 既に値が詰まっている場合はブロックする。

下記では, forkIOで新しいスレッドを生成し, 子スレッドがx, yをメインスレッドに渡す。

import Control.Concurrent

main = do
  m <- newEmptyMVar
  forkIO $ do putMVar m 'x'; putMVar m 'y'
  r <- takeMVar m
  print r
  r <- takeMVar m
  print r

MVarは単純なchannelとして機能する。

$ ./mvar2 
'x'
'y'

永遠に空なのに, takeMVarしてしまった例。

import Control.Concurrent

main = do
  m <- newEmptyMVar
  takeMVar m

この場合, 実行時システムが永遠のブロック (デッドロック)を検出して例外を投げてくれる。

$ ./mvar3 
mvar3: thread blocked indefinitely in an MVar operation

Haskellでは MVarから値を取り出し, ロックを獲得して, 変数の更新とロックの解除はMVarへの値の設定によって行う。

ここでMVarの特徴をまとめると,

  • 単一項チャネルである。
  • 可変状態を共有するためのコンテナである。ロックの獲得/解放をtakeMVar/putMVarで行う。
  • 大きな並行データ構造を構成するための組立部品である。

MVarの強みは, 抽象化のための部品になることである。
また, 本書では例として無制限バッファを持つ channelを紹介しているがここでは省略する。

並行プログラミングでは, どのスレッドに対しても等しくCPU時間を割り当てるような公平性があってほしい。
Haskellの実行時システムであるGHCはラウンドロビン型のスケジューラを採用しており, CPU時間が割り当てられないスレッドがないことを保証するが, 完全にスレッドに等しく割り当てることは保証しない。

GHCでは MVarに関連づけた FIFO queueにブロックされているスレッドを入れることで公平性の保証を実現している。
takeMVar/putMVarのそれぞれ対となる操作がある限り, ブロックされているスレッドは操作完了できることが保証される。
この時, ブロックされているスレッドを起こすだけでなく, ブロックされている操作を単一ステップで不可分に実行するように実装されている。

GHCによる公平性を保証した例が本章の冒頭の例である。

import Control.Concurrent
import Control.Monad
import System.IO

main = do
  hSetBuffering stdout NoBuffering            -- <1>
  forkIO (replicateM_ 100000 (putChar 'A'))   -- <2>
  replicateM_ 100000 (putChar 'B')            -- <3>
-- >>

2つのスレッドが交互に実行された結果, A, Bが交互に出力される。

$ ./fork 
ABABABABABABABABABABABABABABABABABABABABABABABABABABABAB...

プログラムの振る舞いは実行時システムが競合をどう管理するかに影響される。
Haskellのスレッドはまだ走っているスレッドが残っていてもmainが終了するとスレッドも停止する。

8章 入出力の重ね合わせ


8.1 Haskellの例外
8.2 Asyncのエラー処理
8.3 マージ

8章冒頭では Haskellの例外処理 [3]について。
I/O操作に関する例外は IOException型の値である。この例外を catchで捕捉できるのはIOモナドのみである。
スレッドが何もできずに死ぬことはなくスレッドは例外を補足でき, 補足した例外は例外ハンドラで扱えるが, 型注釈で指定した例外とは異なる型は扱えられない。あらゆる例外を補足したい場合は, 例外の型の階層構造の頂点である SomeExceptionを補足するハンドラを書けばよいが行儀としては良くないとしている。

MVarとforkIOを用いた非同期I/Oの例。
ここでいう非同期とは, ある仕事をしている最中に, 裏でI/Oを実行することである。

$ sudo cabal install HTTP

まずは冗長な例をみる。

import Control.Concurrent
import Data.ByteString as B
import GetURL

main = do
  m1 <- newEmptyMVar                                    -- <1>
  m2 <- newEmptyMVar                                    -- <1>

  forkIO $ do                                           -- <2>
    r <- getURL "https://www.wikipedia.org/wiki/Shovel"
    putMVar m1 r

  forkIO $ do                                           -- <3>
    r <- getURL "https://www.wikipedia.org/wiki/Spade"
    putMVar m2 r

  r1 <- takeMVar m1                                     -- <4>
  r2 <- takeMVar m2                                     -- <5>
  print (B.length r1, B.length r2)                      -- <6>

wikipediaは, https://だと https://にリダイレクトされるが, HTTPはhttpsをサポートしていないので www.weblio.jp に変更した。

# Shovel
https://www.weblio.jp/content/%E3%82%B7%E3%83%A7%E3%83%99%E3%83%AB%E3%82%AB%E3%83%BC
# Spade
https://www.weblio.jp/content/%E3%82%B9%E3%83%9A%E3%83%BC%E3%83%89

実行すると, バイトサイズが出力される。

$ ./geturls1 
(140495,138976)

AsyncAPIを使って改善を行う。ダウンロード時間とサイズを出力する timeDownload関数をつくって, これをasyncと共にリストのそれぞれの要素に対して適用する。

sites = ["https://www.google.com",
         "https://www.bing.com",
         "https://www.yahoo.co.jp/",
         "https://www.weblio.jp/content/%E3%82%B7%E3%83%A7%E3%83%99%E3%83%AB%E3%82%AB%E3%83%BC",
         "https://www.weblio.jp/content/%E3%82%B9%E3%83%9A%E3%83%BC%E3%83%89"]

timeDownload :: String -> IO ()
timeDownload url = do
  (page, time) <- timeit $ getURL url   -- <1>
  printf "downloaded: %s (%d bytes, %.2fs)\n" url (B.length page) time

main = do
 as <- mapM (async . timeDownload) sites  -- <2>
 mapM_ wait as                            -- <3>

結果は下記。

$ ./geturls3 
downloaded: https://www.yahoo.co.jp/ (19149 bytes, 0.38s)
downloaded: https://www.google.com (18893 bytes, 0.52s)
downloaded: https://www.bing.com (59411 bytes, 0.57s)
downloaded: https://www.weblio.jp/content/%E3%82%B7%E3%83%A7%E3%83%99%E3%83%AB%E3%82%AB%E3%83%BC (139108 bytes, 0.86s)
downloaded: https://www.weblio.jp/content/%E3%82%B9%E3%83%9A%E3%83%BC%E3%83%89 (138653 bytes, 0.74s)

Asyncのエラー処理について。上記コードだと, ネットワークに接続していない状態だと当然失敗する。

$ ./geturls3 
geturls3: getAddrInfo: does not exist (nodename nor servname provided, or not known)
geturls3: getAddrInfo: does not exist (nodename nor servname provided, or not known)
geturls3: getAddrInfo: does not exist (nodename nor servname provided, or not known)
geturls3: getAddrInfo: does not exist (nodename nor servname provided, or not known)
geturls3: getAddrInfo: does not exist (nodename nor servname provided, or not known)
geturls3: thread blocked indefinitely in an MVar operation

Async関数を拡張して, data Async a = Async (MVar a) を data Async a = Async (MVar (Either SomeException a)) に変更する。
actionをtryで包むようにする。Asyncが例外を投げた時, waitはエラーを上位に伝播させる仕組みとなる。(geturls4.hs)

9章 キャンセルとタイムアウト


9.1 非同期例外
9.2 非同期例外のマスク
9.3 bracket
9.4 チャネルに対する非同期例外の安全性
9.5 タイムアウト
9.6 非同期例外の捕捉
9.7 maskとforkIO
9.8 非同期例外に関して

対話的なアプリケーションなどで, 他スレッドに割り込んでそのスレッドで走っている処理をキャンセルさせる方法について。
Haskellは I/Oを伴わない場合, 副作用がないので, 中断や一時停止で停止しても正しさを損なわずに復帰できる。Haskellでは非同期キャンセルがデフォルトである。

  • 同期例外 : 指定した場所にファイルが存在しない場合など, ファイルI/Oなどに起因。
  • 非同期例外 : ユーザがクリックしたら発生する例外の場合など, 外部のイベントに起因。

Asyncをさらに拡張してキャンセルできるようにする。cancel関数はそのスレッドに例外を投げる。


cancel :: Async a -> IO ()
cancel (Async t var) = throwTo t ThreadKilled
-- >>

forkIOの中で標準入力から qを検出した時にcancelする。

main = do
  as <- mapM (async . timeDownload) sites                     -- <1>

  forkIO $ do                                                 -- <2>
     hSetBuffering stdin NoBuffering
     forever $ do
        c <- getChar
        when (c == 'q') $ mapM_ cancel as

  rs <- mapM waitCatch as                                     -- <3>
  printf "%d/%d succeeded\n" (length (rights rs)) (length rs) -- <4>

実行して, qを入力する。

$ ./geturlscancel 
downloaded: https://www.bing.com (59210 bytes, 0.53s)
downloaded: https://www.yahoo.co.jp/ (19006 bytes, 0.63s)
downloaded: https://www.google.com (18812 bytes, 0.74s)
q
3/5 succeeded

Haskellにはmaskコンビネータがある。共有している状態の更新の時に例外が発生した場合は、マスクする。
割り込み可能な操作は実際にブロックされたときにだけ非同期例外を受け取る。
非同期例外は catchなど例外ハンドラで捕捉可能。非同期例外は例外ハンドラ内でマスクされてほしい。

非同期例外の補足に関する例題。
コマンドの引数に与えたファイルのリストの, それぞれのファイルの行数をカウントするプログラム。ここで存在しないファイルを指定してみる。

$ ./catch-mask xxx yyy
Unmasked
MaskedInterruptible
0

loopがmask内部で再帰的に呼び出しされてしまっている。例外ハンドラ自体をtryで隠蔽した場合。(catch-mask2.hs)

$ ./catch-mask2 xxx yyy
Unmasked
Unmasked
0

I/Oを含まないHaskellのコードは自動的に安全である。また, ほとんどのI/Oを含むコードも安全にする方法があると紹介されている。
Haskellは非同期例外安全ではないが、非同期例外をうまく扱うことができる利点がある。

10章 ソフトウェアトランザクショナルメモリ


10.1 ウィンドウマネージャの例
10.2 ブロッキング
10.3 変更されるまでのブロッキング
10.4 STMを使ったマージ
10.5 Async再考
10.6 STMを使ったチャネルの実装
10.6.1 より多くの操作が可能
10.6.2 ブロックされる操作の合成
10.6.3 非同期例外安全
10.7 もう1つのチャネル実装
10.8 有界チャネル
10.9 STMでできないこと
10.10 性能
10.11 まとめ

STM (software transactional memory)は, 並行性制御機構である。
複数の状態変更操作をグループ化して単一の不可分操作 [3]として実行するため, デッドロックを回避できる。

並行プログラミングでSTMを使うメリットは下記となる。

  • 合成可能な不可分性 : 計算の部品化
  • 合成可能なブロッキング : 選択性を与える
  • 失敗やキャンセルがあるときの頑健性 : 状態に対する不変条件の維持の容易にする

例えば, OSはロック機構のある言語で書かれた大規模なシステムで, OSの開発においてデッドロックの問題がつきまとう。
本章では例として複数のデスクトップを管理する WindowManagerを取り上げている。

MVarは適切に扱わないと, スレッド間で互いにブロックされ進めなくなる, いわゆる食事する哲学者の問題(Dining Philosophers Problem)が起きる可能性がある。そこで, TVarを使いSTM操作に置き換える。STMは並行プログラムの表現力と頑健性を改善できる。

STM a 型の操作はどれも他の操作と合成して大きな不可分トランザクションを形成できる。
IOモナドと異なりSTMの実装は、別のトランザクションと衝突したときにトランザクションのエフェクトをロールバックできるようになっている。

続いて, 特定のリソースを獲得しないとならない時のブロッキングの話題。

retry :: STM a

STMの上記 retry は現在のトランザクションを破棄してもう一度やり直すことを意味する。(計算を部品化)
何も変化がないのに, 無闇にトランザクションをやり直しても意味がないので, TVarは書き込まれた時点でブロックが解除、トランザクションが走るような仕組みになっている。
retryのエフェクトがあるおかげで, 並行プログラムに共通する 起こし忘れバグ (状態変数への通知忘れ)を起こさずに済む。

続いて, orElse操作について。orElseは選択を可能にする。

orElse :: STM a -> STM a -> STM a

orElse a b は a を実行して, 結果を返したら終了, aがretryを呼んだら a のエフェクトを捨てて b を実行する。

7章で扱ったChan型をSTMで実現する。TChanはMvarでは難しい操作を追加できる。

data TChan a

newTChan :: STM (TChan a)
writeChan :: TChan a -> a -> STM()
readChan :: TChan a -> STM a

操作がIOモナド内から, STMモナド内に変更される。この利点のひとつとして, 非同期例外安全となることが挙げられる。
STM内での非同期例外安全とは, 何もしないので置き変えるべきロックもなく例外ハンドラは必要なく, maskで保護するべきクリティカルセッションを心配する必要もない。
ただし, すべての状態に対してSTMを一貫して使わなければならない。

続いて, 有界チャネルについて。単一項チャネルと無制限チャネルの中間的な有界チャネルが必要な理由として下記が挙げられている。

  • 単一項チャネル (Mvar, TMVar) : 書き込みが集中すると, 読み出しスレッドが追いつくのを待ってブロック。逆だとコンテキストスイッチが大量に発生して性能が落ちる。
  • 無限項チャネル (Chan), TChan) : 読み出しスレッドが書き込みスレッドに追いつけないと, チャネルの大きさが無制限に大きくなりメモリを食い尽くす可能性。

STMは万能ではなく, MvarはSTMより速い可能性あるいは公平性 [4]がある。公平性に関してはSTM上で実装するには, 合成可能性をあきらめないとならない。

11章 並行性の高水準な抽象化


11.1 スレッド漏れの回避
11.2 対称型並行性コンビネータ
11.2.1 raceを使ったタイムアウト
11.3 Functorインスタンスの追加
11.4 まとめ:Async API

大規模でより複雑なプログラムにして, 抽象化された並行処理機構を考える。スレッドの木を組み立てる。
AsyncAPIはIOアクションを非同期に実行して, 結果を待つAPIである。
親スレッドが終了した時に, 子スレッドが自動的に終了させる。また, 子スレッドが終了した時はそれが親スレッドに通知される。

IOアクションは非同期 (Async)に実行し, 結果を待つ(wait)単純なAPIである。

main =
  withAsync (getURL "https://www.weblio.jp/content/%E3%82%B7%E3%83%A7%E3%83%99%E3%83%AB%E3%82%AB%E3%83%BC") $ \a1 ->
  withAsync (getURL "https://www.weblio.jp/content/%E3%82%B9%E3%83%9A%E3%83%BC%E3%83%89")  $ \a2 -> do
  r1 <- wait a1
  r2 <- wait a2
  print (B.length r1, B.length r2)

withAsyncによってスレッド漏れ, つまり親スレッドが例外で終了した時に, 子スレッドが取り残されてしまうことを防ぐ。
1つ目の withAsyncが失敗すると, 2回目の withAsyncはキャンセルされる。

しかし, ここで失敗した時の振る舞いが a1 と a2 で対称となっていない。
両方の結果を待ち, どちらかが withAsyncが失敗したら停止させたい。
そこで, withAsyncと withBothを合成する。また, race関数により2つのIOアクションを並行させる。

どちらかのIOアクションが結果を返す or 例外を投げたら, 他方をキャンセルさせる。これは再帰的に行われる。

concurrently :: IO a -> IO b -> IO (a,b)
concurrently ioa iob =
  withAsync ioa $ \a ->
  withAsync iob $ \b ->
    waitBoth a b

waitBothを使った concurrentlyは対称的である。

main = do
  (r1,r2) <- concurrently
               (getURL "https://www.weblio.jp/content/%E3%82%B7%E3%83%A7%E3%83%99%E3%83%AB%E3%82%AB%E3%83%BC")
               (getURL "https://www.weblio.jp/content/%E3%82%B9%E3%83%9A%E3%83%BC%E3%83%89")
  print (B.length r1, B.length r2)

raceはIOアクションを競争させて, 1つの結果を生成する。

race :: IO a -> IO b -> IO (Either a b)
race ioa iob =
  withAsync ioa $ \a ->
  withAsync iob $ \b ->
    waitEither a b

raceとconcurrentlyを使って, 大きなスレッドの木を構築することができ, 必ずボトムアップに壊れ, 失敗は伝播するようになった。

asyncはHackageにも登録されている。

$ sudo cabal install Async

12章 並行ネットワークサーバ


12.1 簡単なサーバ
12.2 単純なサーバの状態による拡張
12.2.1 設計1:1つのジャイアントロック
12.2.2 設計2:サーバスレッドごとに1つのチャネル
12.2.3 設計3:放送チャネルの利用
12.2.4 設計4:STMの利用
12.2.5 実装
12.3 チャットサーバ
12.3.1 アーキテクチャ
12.3.2 クライアントのデータ
12.3.3 サーバのデータ
12.3.4 サーバ
12.3.5 新しいクライアントの準備
12.3.6 クライアントの起動
12.3.7 まとめ

多数のクライアントと通信するサーバ型のアプリケーションは, 高度な並行性と高い性能が要求される。スレッドは適切な抽象化であり, アプリ開発者は単一のクライアントへのやりとりを実現することに集中できる。

本省では最終的に ChatServerの実現を目指す。まずは単一のクライアントとの対話処理を扱って, それを並行性を使ってマルチクライアントサーバに拡張させる流れ。

$ ./server 
Listening on port 44444

Accepted connection from localhost: 54541

動作としては, acceptによってクライアントからの接続リクエストを待つ。この間はブロックされる。
Handleを獲得して, hostとportを束縛する。次に新しいスレッド (サーバスレッド)を生成してmainスレッドに戻る。
この時, アクティブなサーバスレッドと mainスレッドは並行して走っている状態となる。
forkFinallyによって例外が発生したときには Handleが確実に閉じられるようにする。

talk :: Handle -> IO ()
talk h = do
  hSetBuffering h LineBuffering                                -- <1>
  loop                                                         -- <2>
 where
  loop = do
    line <- hGetLine h                                         -- <3>
    if line == "end"                                           -- <4>
       then hPutStrLn h ("Thank you for using the " ++         -- <5>
                         "Haskell doubling service.")
       else do hPutStrLn h (show (2 * (read line :: Integer))) -- <6>
               loop                                            -- <7>
-- >>

serverが起動している状態で, ncコマンドでアクセスする。標準入力に入力した数値が2倍で返ってくる。endでクライアントを閉じる。

$ nc localhost 44444
50
100
1
2
end
Thank you for using the Haskell doubling service.

STMを使うアーキテクチャに変更し, マルチクライアントに対応させる。

server :: Handle -> TVar Integer -> TChan String -> IO ()
server h factor c = do
  f <- atomically $ readTVar factor     -- <1>
  hPrintf h "Current factor: %d\n" f    -- <2>
  loop f                                -- <3>
 where
  loop f = do
    action <- atomically $ do           -- <4>
      f' <- readTVar factor             -- <5>
      if (f /= f')                      -- <6>
         then return (newfactor f')     -- <7>
         else do
           l <- readTChan c             -- <8>
           return (command f l)         -- <9>
    action

  newfactor f = do                      -- <10>
    hPrintf h "new factor: %d\n" f
    loop f

  command f s                           -- <11>
   = case s of
      "end" ->
        hPutStrLn h ("Thank you for using the " ++
                     "Haskell doubling service.")         -- <12>
      '*':s -> do
        atomically $ writeTVar factor (read s :: Integer) -- <13>
        loop f
      line  -> do
        hPutStrLn h (show (f * (read line :: Integer)))
        loop f
-- >>

クライアントからserverに接続する。*の後に数値を入力するとserver内でグローバルに共有される。

$ nc localhost 44444
Current factor: 2
*4
new factor: 4

別のクライアントから接続すると, Current factorが変化していることがわかる。

$ nc localhost 44444
Current factor: 4
end
Thank you for using the Haskell doubling service.

*の後に数値以外の文字を入力すると, クライアントの接続は切られる。

続いて, ChatServerを考える。

グローバルなServer状態と, クライアントごとの状態を分ける。Client{..}はクライアントワイルドカードらしい。

  • クライアントの初期化
  • Clientデータ構造の生成
  • Server状態への追加
  • クライアントの起動

receiveスレッドと serverスレッドの2つを管理するのに race関数を使う。また, STMによって2つのクライアントが同時に互いを追い出してはならないといった一貫性を要求される性質を満たす。

注意点として, broadcastで無制限の長さのSTMトランザクションを使ってしまうと, O(n^2)のコストがかかってしまう。

13章 スレッドを用いた並列プログラミング


13.1 並行性を用いて並列性を達成する方法
13.2 例題:ファイル探索
13.2.1 直列版
13.2.2 並列版
13.2.3 性能とスケール
13.2.4 セマフォを使ったスレッド数の制限
13.2.5 ParIOモナド

並行性は並列性を実現するためにも使われる。しかし, 下記のような純粋に並列プログラミングモデルが使えない問題がある。

  • I/Oを伴う問題
  • 内部的に非決定性に依存するアルゴリズム

Haskellには副作用のある問題をSTモナドに隠蔽することができる。しかし, STモナドの中で並列性を持たせることは困難である。
Repaモデルに適合しない場合, 平衡性の問題に置き換えることができる。

ファイル探索プログラムを例に考える。

$ ./findseq findseq.hs .
Just "./findseq.hs"

このプログラムは, 並列化による高速化が難しいので並行化による高速化を目指す。

  • サブディレクトリを並行に探索して, どれかが目的のファイルを見つけたら他の探索は停止する (cancel)
  • 例外を正しく伝播する

これを AsyncAPIの withAsyncで実現する。withAsync型クラスを確認する。

Prelude> :l Async
[1 of 1] Compiling Async            ( Async.hs, interpreted )
Ok, modules loaded: Async.
*Async> :i withAsync
withAsync :: IO a -> (Async a -> IO b) -> IO b
  	-- Defined at Async.hs:92:1

複数の探索を並列に開始するには, withAsyncの複数の呼び出しをネストする必要がある。まずは直列版。

$ ./findseq nonexist ~/Desktop +RTS -s
Nothing
   1,925,333,168 bytes allocated in the heap
      69,305,184 bytes copied during GC
         807,000 bytes maximum residency (22 sample(s))
          36,624 bytes maximum slop
               3 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      3695 colls,     0 par    0.105s   0.110s     0.0000s    0.0010s
  Gen  1        22 colls,     0 par    0.008s   0.010s     0.0005s    0.0020s

  INIT    time    0.000s  (  0.003s elapsed)
  MUT     time    1.290s  (  5.829s elapsed)
  GC      time    0.113s  (  0.120s elapsed)
  EXIT    time    0.000s  (  0.000s elapsed)
  Total   time    1.406s  (  5.952s elapsed)

  %GC     time       8.1%  (2.0% elapsed)

  Alloc rate    1,491,940,782 bytes per MUT second

  Productivity  91.9% of total user, 21.7% of total elapsed

続いて, 2コアを使った並列版。

$ ./findpar nonexist ~/Desktop +RTS -s -N2
Nothing
   2,026,183,936 bytes allocated in the heap
     405,085,248 bytes copied during GC
      22,872,296 bytes maximum residency (27 sample(s))
       1,019,048 bytes maximum slop
              54 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      2133 colls,  2133 par    1.116s   0.739s     0.0003s    0.0097s
  Gen  1        27 colls,    26 par    0.211s   0.124s     0.0046s    0.0117s

  Parallel GC work balance: 59.52% (serial 0%, perfect 100%)

  TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.001s  (  0.008s elapsed)
  MUT     time    2.048s  (  3.964s elapsed)
  GC      time    1.327s  (  0.862s elapsed)
  EXIT    time    0.001s  (  0.002s elapsed)
  Total   time    3.378s  (  4.836s elapsed)

  Alloc rate    989,113,869 bytes per MUT second

  Productivity  60.7% of total user, 42.4% of total elapsed

gc_alloc_block_sync: 77749
whitehole_spin: 0
gen[0].sync: 319
gen[1].sync: 11180

2コアだと並列化したプログラムの方が, 僅かではあるが速くなっているが直列版に対して, 並列版がそれほど高速化できていない。
今回のケースでは並列化したコードにオーバーヘッドが存在する可能性があるのでそれの低減を目指す。
ファイルシステムの構造から, ひとつの深いサブディレクトリに処理が依存してしまう可能性が考えられる。
従って, 並行版では全てのコアが busyになるような調度良い数のスレッドを生成することが目的になる。

スレッド生成数を明示的に与えて, これを管理するカウンタ (semaphore, セマフォ)を用意する。
ノンブロッキングセマフォを表す型を NBSem型とする。リソースの獲得と解放する関数をそれぞれ, tryAcquireNBSem, releaseNBSemとする。


newtype NBSem = NBSem (MVar Int)

newNBSem :: Int -> IO NBSem
newNBSem i = do
  m <- newMVar i
  return (NBSem m)

tryAcquireNBSem :: NBSem -> IO Bool
tryAcquireNBSem (NBSem m) =
  modifyMVar m $ \i ->
    if i == 0
       then return (i, False)
       else let !z = i-1 in return (z, True)

releaseNBSem :: NBSem -> IO ()
releaseNBSem (NBSem m) =
  modifyMVar m $ \i ->
    let !z = i+1 in return (z, ())
-- >>

8スレッド + メインスレッドで動かした場合のコード。findpar2はfindparと同様の条件で高速化された。

$ ./findpar2 8 nonexist ~/Desktop +RTS -s -N2
Nothing
   1,881,160,512 bytes allocated in the heap
     120,678,304 bytes copied during GC
       1,612,792 bytes maximum residency (74 sample(s))
          66,440 bytes maximum slop
               6 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      1995 colls,  1995 par    0.259s   0.176s     0.0001s    0.0016s
  Gen  1        74 colls,    73 par    0.057s   0.031s     0.0004s    0.0017s

  Parallel GC work balance: 35.53% (serial 0%, perfect 100%)

  TASKS: 6 (1 bound, 5 peak workers (5 total), using -N2)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.001s  (  0.001s elapsed)
  MUT     time    1.564s  (  3.737s elapsed)
  GC      time    0.316s  (  0.207s elapsed)
  EXIT    time    0.000s  (  0.000s elapsed)
  Total   time    1.883s  (  3.946s elapsed)

  Alloc rate    1,202,525,863 bytes per MUT second

  Productivity  83.2% of total user, 39.7% of total elapsed

gc_alloc_block_sync: 3974
whitehole_spin: 0
gen[0].sync: 212
gen[1].sync: 15

ParIOモナドは Parモナドの亜種で, 内部でIO操作が行える。

$ ./findpar4 nonexist ~/Desktop +RTS -s -N4
Nothing
   1,934,248,640 bytes allocated in the heap
     118,766,496 bytes copied during GC
       1,162,720 bytes maximum residency (46 sample(s))
         101,048 bytes maximum slop
               5 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      1183 colls,  1183 par    0.517s   0.221s     0.0002s    0.0026s
  Gen  1        46 colls,    45 par    0.099s   0.036s     0.0008s    0.0021s

  Parallel GC work balance: 20.19% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.001s  (  0.006s elapsed)
  MUT     time    2.536s  (  2.947s elapsed)
  GC      time    0.616s  (  0.257s elapsed)
  EXIT    time    0.000s  (  0.001s elapsed)
  Total   time    3.155s  (  3.211s elapsed)

  Alloc rate    762,811,692 bytes per MUT second

  Productivity  80.4% of total user, 79.0% of total elapsed

gc_alloc_block_sync: 6643
whitehole_spin: 0
gen[0].sync: 137
gen[1].sync: 67

NBSem型によって何らかの共有状態に頼ることがなくなっているのがポイント。

本章ではIOモナドの中で書き換え可能な値を持てる IORefも紹介している。

14章 分散プログラミング


14.1 distributed-processパッケージファミリー
14.2 分散並行性か分散並列性か
14.3 最初の例:ピン(ping)
14.3.1 プロセスとProcessモナド
14.3.2 メッセージ型の定義
14.3.3 ピンサーバのプロセス
14.3.4 マスタープロセス
14.3.5 main関数
14.3.6 例題のまとめ
14.4 複数ノードでのピン
14.4.1 1つのマシン上で複数のノード
14.4.2 複数のマシン
14.5 型付きチャネル
14.5.1 チャネルのマージ
14.6 失敗処理
14.6.1 分散プログラムにおける失敗の哲学
14.7 分散チャットサーバ
14.7.1 データ型
14.7.2 メッセージ送信
14.7.3 放送
14.7.4 配布
14.7.5 サーバのテスト
14.7.6 失敗とノードの追加/削除
14.8 練習問題:分散KVS

ここまで, 単一のマシンで複数のコアを使って並列的に活用することを考えてきたが, プログラムを複数のマシンで同時に走らせる 分散プログラミング [5]について。
Haskellには, 組み込みで分散プログラミングのサポートはないが, コアAPIを提供する distributed-processというFWがある。

分散プログラミングの利点は下記が挙げられている。

  • 効率的なネットワークリソースの利用
  • 大規模データ構造がディスクを跨って存在

一方で, 分散環境におけるノード間の通信にコストがかかってしまうことがあり, STMなどによるグローバルな一貫性を保証することが難しくなることもある。本章では, distributed-processが Erlang-style concurrency in Haskell とあるように, Erlangの分散システムアーキテクチャのスタイルをとっており, メッセージパッシングモデルを推奨している。

distributed-processの提供する主な機能は下記が挙げられている。

  • プロセスの遠隔生成
  • メッセージパッシングのためのバイトストリームへのシリアライズ
  • プロセスリンク (プロセス終了の通知)
  • 複数チャネル上のメッセージ送信
  • 自動peer探索

分散プログラムは, メッセージを送受信して互いに通信しあうプロセスの集合である。プロセスはPIDを持っている。
プロセスとスレッドは似ているが, スレッドは現在のノードで生成できるが, プロセスは遠隔ノードで生成することができる。
また, メッセージパッシング操作は Processモナドのみが可能である。

単純な分散プログラミングの例として, localノード通信を行う ping/pongプログラム。
spawnで新しい子プロセスを生成し, pingを送信し, expectを呼んで pongを受信する。
最後に terminateを呼んでプロセスを終了する。

$ ./ping 
Sat Aug 22 02:55:18 UTC 2015 pid://localhost:44444:0:10: spawning on nid://localhost:44444:0
Sat Aug 22 02:55:18 UTC 2015 pid://localhost:44444:0:10: sending ping to pid://localhost:44444:0:11
Sat Aug 22 02:55:18 UTC 2015 pid://localhost:44444:0:11: ping received from pid://localhost:44444:0:10
Sat Aug 22 02:55:18 UTC 2015 pid://localhost:44444:0:10: pong.
ping: ProcessTerminationException

続いて, 1つのマシン上に複数のノードで通信を行う。
distributed-processは, トランスポート層の実装は含まれないので TCP/IPの場合, network-transport-tcpパッケージを使う。また, どのノードが利用可能かを見つけるpeer探索機能を実装したdistributed-process-simplelocalnetを使う。

slaveノードを開始してから, masterノードを開始する。

$ ls | grep DistribUtils
DistribUtils.dyn_hi
DistribUtils.dyn_o
DistribUtils.hi
DistribUtils.hs
DistribUtils.o
$ ghc distrib-ping/ping-multi.hs
$ ./distrib-ping/ping-multi slave 44445 &
[1] 3038
$ ./distrib-ping/ping-multi slave 44446 &
[2] 3047
$ ./distrib-ping/ping-multi
Sat Aug 22 03:14:45 UTC 2015 pid://localhost:44444:0:10: spawning on nid://localhost:44445:0
Sat Aug 22 03:14:45 UTC 2015 pid://localhost:44444:0:10: spawning on nid://localhost:44446:0
Sat Aug 22 03:14:45 UTC 2015 pid://localhost:44444:0:10: pinging pid://localhost:44445:0:11
Sat Aug 22 03:14:45 UTC 2015 pid://localhost:44444:0:10: pinging pid://localhost:44446:0:11
Sat Aug 22 03:14:45 UTC 2015 pid://localhost:44445:0:11: ping received from pid://localhost:44444:0:10
Sat Aug 22 03:14:45 UTC 2015 pid://localhost:44446:0:11: ping received from pid://localhost:44444:0:10
Sat Aug 22 03:14:45 UTC 2015 pid://localhost:44444:0:10: All pongs successfully received
ping-multi: ProcessTerminationException

distributed-process-simplelocalnetが提供する peer探索機能によって自動的にslaveノードを見つけられる。

分散コンピューティングに失敗はつきものである。したがって, プロセスの失敗を他のプロセスで補足/処理できる必要がある。
失敗の哲学は, Erlnagの失敗を扱うための哲学 “Let it Crash!” に沿う形をとっており, あらゆる種類の失敗に対して同じように対応することが望ましい。局所的な失敗への対応は,単純に上の層に伝播させ, 失敗したプロセスはただ死ねば良い。
失敗を考えるべき粒度はプロセスで, 個々のプロセスがシステム全体を停止させないようにする。

複数マシンの場合における例とChatServerの分散版, 応用として分散KVSも本省で扱っているが, 長くなるので今回は省略した。

15章 デバッグ、チューニング、外部コードとのインタフェース


15.1 並行プログラムのデバッグ
15.1.1 スレッド状態の検査
15.1.2 イベントログとThreadScope
15.1.3 デッドロックの検出
15.2 並行(および並列)プログラムのチューニング
15.2.1 スレッドの生成とMVar操作
15.2.2 並行データ構造の共有
15.2.3 微調整のためのRTSオプション
15.3 並行性と外部関数インタフェース
15.3.1 スレッドと外部呼び出し
15.3.2 非同期例外と外部呼び出し
15.3.3 スレッドと外部からの呼び出し

threadStatus関数はスレッドの状態を返す。

Prelude> :m GHC.Conc
Prelude GHC.Conc> :i ThreadStatus
data ThreadStatus
  = ThreadRunning
  | ThreadFinished
  | ThreadBlocked BlockReason
  | ThreadDied
  	-- Defined in ‘GHC.Conc.Sync’
instance Eq ThreadStatus -- Defined in ‘GHC.Conc.Sync’
instance Ord ThreadStatus -- Defined in ‘GHC.Conc.Sync’
instance Show ThreadStatus -- Defined in ‘GHC.Conc.Sync’

下記のようにブロックされた状態から終了状態に遷移していることが確認できるが, 本書では threadStatus関数はデバッグ用途以外では使用を勧めていない。

Prelude GHC.Conc> t <- forkIO (threadDelay 30000000)
Prelude GHC.Conc> GHC.Conc.threadStatus t
ThreadBlocked BlockedOnMVar
Prelude GHC.Conc> GHC.Conc.threadStatus t
ThreadFinished

デッドロックの検出について。GHCはスレッドがデッドロックしたことを検出し例外を投げてくれる。
ヒープオブジェクトを管理する GCによってオブジェクトルートから辿って, 全ての生きているオブジェクトを見つけてくる仕組みとなっている。
オブジェクトルートから到達できないスレッドは確実にデッドロックする。例外によって, スレッドが保持しているリソースを解放できる機会が生まれる。
しかし, GHCはスレッドがデッドロックすると思える状況でも, そうだと必ず証明できるわけではないので, デッドロック検出にあまり頼ってはならないとしている。

また本章では, 並行プログラムの性能改善する手法についての基本原則として下記が挙げられている。

  • 早すぎる最適化は辞めた方がいい。ただし, 初めから効率を念頭に置いてコードを書いた方がいい。
  • ボトルネックではないコードの最適化に時間を浪費しない。空間的または時間的なプロファイリングを取る。(推測するな、計測せよ)

Codeは主に parconc-examplesを参照した。


[1] 副作用から考える並行処理とアクターモデル
[2] Haskellの例外処理。Haskellは例外処理のための特別な構文や組み込みの意味づけがないが, 簡単に例外処理コンビネータを構築できる抽象化がある。
[3] wikipediaによると, 不可分操作は以下の2つの条件を満たさなければならない。(1)全操作が完了するまで他のプロセスはその途中の状態を観測できない。(2)一部操作が失敗したら組合せ全体が失敗し, システムの状態は不可分操作を行う前の状態に戻る。システムの他の部分から見て操作の組合せが一度に成功したか失敗したように見える。途中の状態にアクセスすることはできない。このため不可分操作あるいはアトミック操作(= 原子操作)と呼ぶのである。
[4] ブロックされたスレッドを起こす順番におけるFIFOの保証
[5] 言い換えると, 別機種を含むことができる複数のマシンで単一のプログラムが動くような構成