Oath: 安全、高速、合成可能な並行処理

TL;DR

github.com

並行処理を簡潔かつ安全に記述できるライブラリを作った。ApplicativeDo拡張を使って、以下のようにoathの引数として与えたIOアクションを同時に実行し、結果を集める処理を書ける。いずれかが例外を投げた場合、残りをキャンセルするためリソースを漏らす心配がない。

evalOath $ do
   a <- oath $ ...
   b <- oath $ ...
   f a b

経緯

Haskellは並行処理が得意とされている。事実、軽量スレッド、MVar、STMといった処理系によるサポートが充実しており、HackageのConcurrencyカテゴリには235ものパッケージがあることからもユーザの関心の高さが窺える。 並行処理を実用する上では、単にスレッドをフォークするだけでなく、計算の結果を集める必要がある―—Scalaなどで言うFutureに近いものがあると嬉しい。案の定、並行計算の結果を取り出すためのHaskellライブラリは数多く作られてきた。

futuresというなかなかいい名前のパッケージがある。APIもかなりシンプルだ。 スレッドをフォークして、計算結果をMVarに代入し、MVarの中身を取り出すアクションを返すというものだ。

fork :: IO a -> IO (Future a)
fork io = do
  mv <- newEmptyMVar
  forkIO $ do
    a <- io
    putMVar mv a
  return $ Future $ readMVar mv

考え方はわかりやすいが、この実装には致命的な欠点がある―—例外処理である。forkに与えたアクションが例外を発生させても、誰も対応ができない。しかも、putMVarが呼ばれなくなるのでreadMVarが発散してしまう(実行時エラーになる)。これでは実用するのは難しい。また、forkIOの結果であるThreadIdが捨てられていることからわかるように、一度始めた計算を外部から止めるすべはない。

spawnは、Future型ではなく直接IOを返す点を除けばfuturesと似ている。

spawnTry :: IO a -> IO (IO (Result a))
spawnTry m = do
  v <- newEmptyMVar
  _ <- mask $ \restore -> forkIO (try (restore m) >>= putMVar v)
  return (readMVar v)

spawn :: IO a -> IO (IO a)
spawn m = do
  r <- spawnTry m
  return (r >>= either throwIO return)

こちらはきちんと例外処理をしており、putMVarが呼ばれることが保証されるためいくらか実用的だ。しかし、やはり計算は止められない。

promiseは、ApplicativeとMonadインスタンスであるPromise型が売りだ。

newtype Promise a = Promise { unPromise :: IO (Async a) }
  deriving (Functor)

instance Applicative Promise where
  pure = Promise . async . return
  Promise mf <*> Promise mx = Promise $ do  
      f <- mf
      x <- mx
      (f', x') <- waitBoth f x
      async $ return $ f' x'

instance Monad Promise where
   return = liftIO . return
   Promise m >>= f = Promise $ m >>= wait >>= unPromise . f 

liftIO :: IO a -> Promise a
liftIO = Promise . async

外側のIOでスレッドをフォークし、内側のAsyncで結果を待つという仕組みのようだ。だが、m <*> nはmとnをそれぞれ並行して実行したのち、両方の結果が返ってくるのをその場で待つ必要がある(「待つAsyncを返す」のではなく、「待ってからAsyncを返す」)。これでは(a *> b) *> ca *> (b *> c)の挙動が異なってしまうため、Applicativeの則を満たさない。また、Asyncは本来cancel`できるはずだが、合成中に結果を待ってしまうので事実上中断ができない(どちらにせよ、Promiseの実装が隠蔽されているのでどうしようもないが)。 さらに、Monadは左の計算の結果を待ってから右に進むという挙動で、Applicativeとの一貫性がない。いくらインスタンスがあっても、これでプログラムを組み立てるのは難しいだろう。

非同期処理の定番であるasyncパッケージには、Concurrentlyという型がある。

newtype Concurrently a = Concurrently { runConcurrently :: IO a }

instance Applicative Concurrently where
  pure = Concurrently . return
  Concurrently fs <*> Concurrently as =
    Concurrently $ (\(f, a) -> f a) <$> concurrently fs as

instance Alternative Concurrently where
  empty = Concurrently $ forever (threadDelay maxBound)
  Concurrently as <|> Concurrently bs =
    Concurrently $ either id id <$> race as bs

(<*>)concurrently :: IO a -> IO b -> IO (a, b)を使って両方の結果を待つ計算を表現し、(<|>)race :: IO a -> IO b -> IO (Either a b)はどちらかが返ってくるまで待つ計算を表現する。promiseと違い、その場で待機する必要がないので、Applicative則を満たしそうだ。 concurrentlyおよびraceは、スレッドを必ず二つフォークする上、かなり複雑な実装なので、オーバーヘッドが大きそうだ。ここまで見た中では一番正しそうで使いやすそうな実装だが、もっといい方法はないだろうか。

継続とSTMのコンボ

非同期処理を安全に合成する方法には覚えがある。単純で頑強なメッセージングシステム、franz - モナドとわたしとコモナドで紹介した、継続渡しを用いて結果を待つトランザクションを渡すという仕組みだ。この方法なら、継続が終了したタイミングで処理を中断できるため、待つもやめるも自由自在、お漏らしの心配はない(franzはこのメカニズムによって非同期リクエストを管理している)。そのエッセンスを独立したライブラリに蒸留できそうだ。それっぽい名前はだいたい取られていたので、Promiseからの連想でOathと名付けた。

github.com

newtype Oath a = Oath { runOath :: forall r. (STM a -> IO r) -> IO r } deriving Functor

instance Applicative Oath where
  pure a = Oath $ \cont -> cont (pure a)
  Oath m <*> Oath n
    = Oath $ \cont -> m $ \f -> n $ \x -> cont (f <*> x)

Oathは、IO (STM a)CPS変換したもので、Compose (Codensity IO) STMと等価である*1 *2。外側のIOで計算を起動し、内側のSTMで結果を待つ。OathApplicativeインスタンスを持ち、STM (a -> b)STM aを用意してから、それらを合成したSTM bを返す――計算の起動と、結果の待機を別々に合成するというわけだ。

oath :: IO a -> Oath a
oath act = Oath $ \cont -> do
  v <- newEmptyTMVarIO
  tid <- forkFinally act (atomically . putTMVar v)
  let await = readTMVar v >>= either throwSTM pure
  cont await `finally` killThread tid

oathは、IOアクションを別のスレッドで実行し、その結果をTMVar経由で取り出す。forkFinallyが例外を受け止め、throwSTMがそれを伝える。

生殺与奪の権は継続が握っており、終了したときにThreadKilled例外が投げられる。このような挙動をwithAsyncなどで安全に表現しようとすると、withAsync foo $ \a -> withAsync bar $ \b -> ...のように ネストが深くなってしまいがちだが、Oathは継続渡しの抽象化によって、combine <$> oathSTM foo <*> oathSTM barのようにフラットに書ける。それだけでなく、traverseでコンテナに対しても簡単に適用できるという利点もある。

Oathを実行するには、単にevalOathを呼び出す。もちろん、runOathを直接呼ぶべき場面もある。

evalOath :: Oath a -> IO a
evalOath m = runOath m atomically

Alternative<|>は、Concurrentlyと同様両方の計算を起動するが、一方が完了した段階でもう片方はキャンセルする(STMのAlternativeインスタンスを継承している)。 Control.Concurrent.STM.Delayのラッパーも提供しており、<|>で合成するだけで、タイムアウト処理を非常に簡単に記述できる。 また、「リソースの確保」「解放」「使用」という三つの挙動を一つにまとめられるという継続渡しの強みがdelayの実装によく表れている。

instance Alternative Oath where
  empty = Oath $ \cont -> cont empty
  Oath m <|> Oath n = Oath $ \cont -> m $ \a -> n $ \b -> cont (a <|> b)

delay :: Int -> Oath ()
delay dur = Oath $ \cont ->
  bracket (newDelay dur) cancelDelay (cont . waitDelay)

Concurrentlyと違い、Oathはフォークは必須ではないのもポイントだ。例えばネットワーク経由でリクエストを送信する処理は書いた順序で実行し、結果を待つ部分だけ非同期にするといった実装もできる。forkOnなど、forkIO以外のフォークも使えるため自由度が高く、決定性が求められる単体テストの実装などにも役に立つだろう。

パフォーマンス

最後に、Concurrentlyと比較するためのベンチマークを行った。他のパッケージはまともに動作しないため、評価の対象から外した(単体テストは残してある)。

  , bench "oath STM 100" $ nfIO $ O.evalOath $ traverse (O.oath . pure) [0 :: Int ..99]
  , bench "async 100" $ nfIO $ A.runConcurrently $ traverse (A.Concurrently . pure) [0 :: Int ..99]
  oath IO 10:   OK (0.86s)
    3.18 μs ± 206 ns
  oath STM 10:  OK (0.34s)
    5.10 μs ± 169 ns
  async 10:     OK (0.66s)
    10.0 μs ± 190 ns
  oath IO 100:  OK (0.60s)
    35.8 μs ± 2.4 μs
  oath STM 100: OK (0.39s)
    48.0 μs ± 1.3 μs
  async 100:    OK (0.21s)
    100  μs ± 5.6 μs

オーバーヘッドはConcurrentlyの約半分に抑えられている。Concurrentlyは(<*>)の左右ごとにスレッドをフォークするため、項の二倍フォークする必要があるという点が表れているのだろう。

まとめ

Oathは、継続渡しスタイルとSTMを組み合わせることによって、柔軟、安全、高速、そして合成可能な非同期処理の表現を可能にした。単にIOアクションを組み立てる道具としても便利だが、今後はOathに基づいたAPIデザインにも一考の余地があると考えている。並行処理の新定番を狙っていきたい。