TL;DR
並行処理を簡潔かつ安全に記述できるライブラリを作った。ApplicativeDo拡張を使って、以下のようにoath
の引数として与えたIOアクションを同時に実行し、結果を集める処理を書ける。いずれかが例外を投げた場合、残りをキャンセルするためリソースを漏らす心配がない。
evalOath $ do a <- oath $ ... b <- oath $ ... f a b
経緯
Haskellは並行処理が得意とされている。事実、軽量スレッド、MVar、STMといった処理系によるサポートが充実しており、HackageのConcurrencyカテゴリには235ものパッケージがあることからもユーザの関心の高さが窺える。 並行処理を実用する上では、単にスレッドをフォークするだけでなく、計算の結果を集める必要がある──Scalaなどで言うFutureに近いものがあると嬉しい。案の定、並行計算の結果を取り出すためのHaskellライブラリは数多く作られてきた。
futuresというなかなかいい名前のパッケージがある。APIもかなりシンプルだ。 スレッドをフォークして、計算結果をMVarに代入し、MVarの中身を取り出すアクションを返すというものだ。
fork :: IO a -> IO (Future a) fork io = do mv <- newEmptyMVar forkIO $ do a <- io putMVar mv a return $ Future $ readMVar mv
考え方はわかりやすいが、この実装には致命的な欠点がある──例外処理である。forkに与えたアクションが例外を発生させても、誰も対応ができない。しかも、putMVar
が呼ばれなくなるのでreadMVar
が発散してしまう(実行時エラーになる)。これでは実用するのは難しい。また、forkIOの結果であるThreadIdが捨てられていることからわかるように、一度始めた計算を外部から止めるすべはない。
spawnは、Future型ではなく直接IOを返す点を除けばfuturesと似ている。
spawnTry :: IO a -> IO (IO (Result a)) spawnTry m = do v <- newEmptyMVar _ <- mask $ \restore -> forkIO (try (restore m) >>= putMVar v) return (readMVar v) spawn :: IO a -> IO (IO a) spawn m = do r <- spawnTry m return (r >>= either throwIO return)
こちらはきちんと例外処理をしており、putMVarが呼ばれることが保証されるためいくらか実用的だ。しかし、やはり計算は止められない。
promiseは、ApplicativeとMonadのインスタンスであるPromise型が売りだ。
newtype Promise a = Promise { unPromise :: IO (Async a) } deriving (Functor) instance Applicative Promise where pure = Promise . async . return Promise mf <*> Promise mx = Promise $ do f <- mf x <- mx (f', x') <- waitBoth f x async $ return $ f' x' instance Monad Promise where return = liftIO . return Promise m >>= f = Promise $ m >>= wait >>= unPromise . f liftIO :: IO a -> Promise a liftIO = Promise . async
外側のIOでスレッドをフォークし、内側のAsyncで結果を待つという仕組みのようだ。だが、m <*> n
はmとnをそれぞれ並行して実行したのち、両方の結果が返ってくるのをその場で待つ必要がある(「待つAsyncを返す」のではなく、「待ってからAsyncを返す」)。これでは(a *> b) *> c
とa *> (b *> c)
の挙動が異なってしまうため、Applicativeの則を満たさない。また、Asyncは本来cancel`できるはずだが、合成中に結果を待ってしまうので事実上中断ができない(どちらにせよ、Promiseの実装が隠蔽されているのでどうしようもないが)。
さらに、Monadは左の計算の結果を待ってから右に進むという挙動で、Applicativeとの一貫性がない。いくらインスタンスがあっても、これでプログラムを組み立てるのは難しいだろう。
非同期処理の定番であるasyncパッケージには、Concurrently
という型がある。
newtype Concurrently a = Concurrently { runConcurrently :: IO a } instance Applicative Concurrently where pure = Concurrently . return Concurrently fs <*> Concurrently as = Concurrently $ (\(f, a) -> f a) <$> concurrently fs as instance Alternative Concurrently where empty = Concurrently $ forever (threadDelay maxBound) Concurrently as <|> Concurrently bs = Concurrently $ either id id <$> race as bs
(<*>)
はconcurrently :: IO a -> IO b -> IO (a, b)
を使って両方の結果を待つ計算を表現し、(<|>)
はrace :: IO a -> IO b -> IO (Either a b)
はどちらかが返ってくるまで待つ計算を表現する。promise
と違い、その場で待機する必要がないので、Applicative則を満たしそうだ。
concurrently
およびrace
は、スレッドを必ず二つフォークする上、かなり複雑な実装なので、オーバーヘッドが大きそうだ。ここまで見た中では一番正しそうで使いやすそうな実装だが、もっといい方法はないだろうか。
継続とSTMのコンボ
非同期処理を安全に合成する方法には覚えがある。単純で頑強なメッセージングシステム、franz - モナドとわたしとコモナドで紹介した、継続渡しを用いて結果を待つトランザクションを渡すという仕組みだ。この方法なら、継続が終了したタイミングで処理を中断できるため、待つもやめるも自由自在、リソースの解放漏れの心配はない(franzはこのメカニズムによって非同期リクエストを管理している)。そのエッセンスを独立したライブラリに蒸留できそうだ。それっぽい名前はだいたい取られていたので、Promiseからの連想でOathと名付けた。
newtype Oath a = Oath { runOath :: forall r. (STM a -> IO r) -> IO r } deriving Functor instance Applicative Oath where pure a = Oath $ \cont -> cont (pure a) Oath m <*> Oath n = Oath $ \cont -> m $ \f -> n $ \x -> cont (f <*> x)
Oath
は、IO (STM a)
をCPS変換したもので、Compose (Codensity IO) STM
と等価である*1 *2。外側のIO
で計算を起動し、内側のSTM
で結果を待つ。Oath
はApplicative
インスタンスを持ち、STM (a -> b)
とSTM a
を用意してから、それらを合成したSTM b
を返す――計算の起動と、結果の待機を別々に合成するというわけだ。
oath :: IO a -> Oath a oath act = Oath $ \cont -> do v <- newEmptyTMVarIO tid <- forkFinally act (atomically . putTMVar v) let await = readTMVar v >>= either throwSTM pure cont await `finally` killThread tid
oath
は、IOアクションを別のスレッドで実行し、その結果をTMVar経由で取り出す。forkFinally
が例外を受け止め、throwSTM
がそれを伝える。
生殺与奪の権は継続が握っており、終了したときにThreadKilled
例外が投げられる。このような挙動をwithAsyncなどで安全に表現しようとすると、withAsync foo $ \a -> withAsync bar $ \b -> ...
のように
ネストが深くなってしまいがちだが、Oathは継続渡しの抽象化によって、combine <$> oathSTM foo <*> oathSTM bar
のようにフラットに書ける。それだけでなく、traverse
でコンテナに対しても簡単に適用できるという利点もある。
Oath
を実行するには、単にevalOath
を呼び出す。もちろん、結果を待つタイミングを制御したい場合はrunOath
を直接呼ぶべき場面もあるだろう。
evalOath :: Oath a -> IO a evalOath m = runOath m atomically
Alternative
の<|>
は、Concurrently
と同様両方の計算を起動するが、一方が完了した段階でもう片方はキャンセルする(STMのAlternativeインスタンスを継承している)。
Control.Concurrent.STM.Delayのラッパーも提供しており、<|>
で合成するだけで、タイムアウト処理を非常に簡単に記述できる。
また、「リソースの確保」「解放」「使用」という三つの挙動を一つにまとめられるという継続渡しの強みがdelay
の実装によく表れている。
instance Alternative Oath where empty = Oath $ \cont -> cont empty Oath m <|> Oath n = Oath $ \cont -> m $ \a -> n $ \b -> cont (a <|> b) delay :: Int -> Oath () delay dur = Oath $ \cont -> bracket (newDelay dur) cancelDelay (cont . waitDelay)
Concurrently
と違い、Oath
はフォークは必須ではないのもポイントだ。例えばネットワーク経由でリクエストを送信する処理は書いた順序で実行し、結果を待つ部分だけ非同期にするといった実装もできる。forkOn
など、forkIO
以外のフォークも使えるため自由度が高く、そもそもフォークしないという選択肢もあるため、決定性が求められる単体テストの実装などにも役に立つだろう。
パフォーマンス
最後に、Concurrently
と比較するためのベンチマークを行った。他のパッケージはまともに動作しないため、評価の対象から外した(単体テストは残してある)。
, bench "oath STM 100" $ nfIO $ O.evalOath $ traverse (O.oath . pure) [0 :: Int ..99] , bench "async 100" $ nfIO $ A.runConcurrently $ traverse (A.Concurrently . pure) [0 :: Int ..99]
oath IO 10: OK (0.86s) 3.18 μs ± 206 ns oath STM 10: OK (0.34s) 5.10 μs ± 169 ns async 10: OK (0.66s) 10.0 μs ± 190 ns oath IO 100: OK (0.60s) 35.8 μs ± 2.4 μs oath STM 100: OK (0.39s) 48.0 μs ± 1.3 μs async 100: OK (0.21s) 100 μs ± 5.6 μs
オーバーヘッドはConcurrently
の約半分に抑えられている。Concurrentlyは(<*>)
の左右ごとにスレッドをフォークするため、項の二倍フォークする必要があるという点が表れているのだろう。
まとめ
Oath
は、継続渡しスタイルとSTMを組み合わせることによって、柔軟、安全、高速、そして合成可能な非同期処理の表現を可能にした。単にIOアクションを組み立てる道具としても便利だが、今後はOathに基づいたAPIデザインにも一考の余地があると考えている。並行処理の新定番を狙っていきたい。