ストリーム処理ライブラリはHaskellにおいて競争の激しい分野の一つだ。ストリーム処理ライブラリとは大雑把に言うと、IOなどの作用を絡めながら値の列(ストリーム)を生成したり、処理したりする構造を提供するライブラリである。多くのライブラリは、以下の3種の構造を定義している。
- 生産者(プロデューサー): IOなどのアクションを伴いつつ値を生成する。
- 消費者(コンシューマー): 多くの場合モナド変換子になっており、
await :: Consumer s m s
のようなアクションを組み合わせ、値の列を処理するプログラムを書ける。 - 変換者(トランスデューサー): 入力を受け取りながら、出力もできる。
生産者と消費者が変換者の特殊な場合であるものも多い。
今回は、基本中の基本とも言える操作であるスキャンの速さを調べる。scan (+) 0
は入力ストリーム[0,1,2,3, ...]
を出力[0,1,3,6, ...]
のように変換する。
iteratee, tubes, streaming, machinecell, io-streams, pipes, machines, conduit, boomboxと試作品のfeeders、predatorsパッケージをベンチマークした。ソースコードはhttps://github.com/fumieval/feeders/blob/all-bench/benchmarks/benchmark.hsにある。 ライブラリ数という点では、2017年現在最も網羅的なベンチマークだろう。
pipes
- 使用実績: ghc-mod, purescript
- 利点 速い、ドキュメントが豊富
- 欠点 終端、残余を扱えない
まずは人気のpipes。yieldとawaitをモナドで組み合わせる素直なインターフェイスが魅力で、scanの実装もわかりやすい。
ただし、runEffect
以外の方法での分解はあまり想定していないのか、自分でPipe
を分解するにはPipes.Internal
モジュールをインポートしないといけない。その際はpipesの設計の理解が必須となる。
scan :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Pipe a b m r scan step begin done = go begin where go x = do yield (done x) a <- await let x' = step x a go $! x'
変換に相当する値と、それを走らせる関数に分けてCriterionでベンチマークする。
sourceP :: Monad m => P.Producer Int m () sourceP = each [1..10000] drainP :: Pipe Int a Identity () -> () drainP p = runIdentity $ runEffect $ for (sourceP >-> p) discard main = defaultMain [ bench "pipes" $ whnf drainP (scan (+) 0 id) ]
10000要素を処理するのに179μsという結果が出た。一件あたり18ナノ秒はなかなか悪くないと言えるだろう。なおGHCは8.0.2で、CPUはIntel(R) Core(TM) i7-6700K CPU @ 4.00GHzである。
time 179.3 μs (177.9 μs .. 180.8 μs) 1.000 R² (0.999 R² .. 1.000 R²) mean 179.1 μs (178.4 μs .. 179.9 μs) std dev 2.457 μs (2.043 μs .. 3.124 μs)
tubes
- 利点 インターフェイスが親しみやすい
- 欠点 極端に遅い
time 22.99 s (22.21 s .. 24.75 s) 0.999 R² (0.999 R² .. 1.000 R²) mean 22.86 s (22.61 s .. 23.04 s) std dev 269.8 ms (0.0 s .. 308.6 ms)
23「秒」という圧倒的な時間が目を引いたのはtubesだ。Freeモナドをベースにした基本を押さえたAPI、リストモナド相当のSource
に加えContravariantなSink
と第一印象は良いが、さすがに10万倍も遅いと実用的とは言いがたい。
scanT :: Monad m => (b -> a -> b) -> b -> Tube a b m x scanT f = go where go b = await >>= \x -> let !b' = f b x in yield b' >> go b' sourceT :: Monad m => Tube () Int m () sourceT = each [1..value] drainT :: Tube Int a Identity () -> () drainT h = runIdentity $ runTube $ sourceT >< h >< stop
streaming
使用実績: 不明
- 利点 速い
- 欠点 消費者がない
benchmarking scan/streaming time 77.40 μs (77.07 μs .. 77.74 μs) 1.000 R² (1.000 R² .. 1.000 R²) mean 77.17 μs (77.01 μs .. 77.41 μs) std dev 668.9 ns (552.8 ns .. 820.2 ns)
streamingはpipesの倍以上の速度が印象的だ。streamingには変換者や消費者に相当する構造はないため、やや不公平な比較かもしれない。
Stream (Of a)
がaを生産するモナド変換子であり、これをリストのように扱う数々の関数が提供されている。
drainS :: (Stream (Of Int) Identity () -> Stream (Of a) Identity ()) -> () drainS h = runIdentity $ effects $ h sourceS sourceS :: Monad m => Stream (Of Int) m () sourceS = each [1..value] ... , bench "streaming" $ whnf drainS (S.scan (+) 0 id)
ストリーム処理ライブラリを使う動機はモナディックな消費であることが多いが、そうでない場合は選択肢となりうるだろう。
io-streams
使用実績: snap
- 利点 速い
- 欠点 何を書くにもIOを使わないといけない
time 87.93 μs (86.58 μs .. 89.68 μs) 0.996 R² (0.989 R² .. 1.000 R²) mean 88.13 μs (87.08 μs .. 91.47 μs) std dev 5.351 μs (1.913 μs .. 10.55 μs) variance introduced by outliers: 63% (severely inflated)
io-streamsは使うモナドをIOに限定するという開き直った設計のパッケージだ。たかをくくっていたが、streamingに迫る速度が出ており侮れない。
import qualified System.IO.Streams as Is drainIs :: (Is.InputStream Int -> IO (Is.InputStream b)) -> IO () drainIs h = do i <- Is.fromList [1..value] i' <- h i o <- Is.nullOutput Is.connect i' o scanIs :: (b -> a -> b) -> b -> Is.InputStream a -> IO (Is.InputStream b) scanIs f b0 i = do r <- newIORef b0 Is.makeInputStream $ Is.read i >>= \case Nothing -> return Nothing Just x -> do b <- readIORef r let !b' = f b x writeIORef r b' return $ Just b'
machines
使用実績: 不明
- 利点 各種構造が透明で、拡張性に富む
- 欠点 Tee、Stackなどの発展形はAPIが乏しく、うまく合成できない
ekmett発のmachinesはまずまずの性能だ。
time 190.8 μs (190.1 μs .. 191.5 μs) 1.000 R² (1.000 R² .. 1.000 R²) mean 190.7 μs (190.2 μs .. 191.1 μs) std dev 1.542 μs (1.366 μs .. 1.771 μs)
PlanT
というCPSのモナドから変換器のMachineT
を鋳造するというアプローチを用いている。複数の入力をサポートしているのも面白い。比較的習得は容易だが奥が深い。また、pipesと違い終端に対応できる。
sourceM = enumerateFromTo 1 value drainM :: ProcessT Identity Int o -> () drainM m = runIdentity $ runT_ (sourceM ~> m) , bench "machines" $ whnf drainM (scan (+) 0)
conduit
使用実績: Yesod
- 利点 ストリームの終端や残りをきちんと扱える
- 欠点 APIが複雑。オーバーヘッドがある
time 302.4 μs (301.5 μs .. 303.3 μs) 1.000 R² (1.000 R² .. 1.000 R²) mean 302.1 μs (301.6 μs .. 302.8 μs) std dev 2.029 μs (1.547 μs .. 2.515 μs)
かつて黄金時代を築いたconduitはmachinesよりも遅かった。しかし、ストリームの終端及び残余、リソースの解放などをサポートしていることを考えればかなり優秀だと言える。
import qualified Data.Conduit.List as C import qualified Data.Conduit.Combinators as CC drainC :: C.Conduit Int Identity a -> () drainC c = runIdentity $ (sourceC C.$= c) C.$$ C.sinkNull sourceC = C.enumFromTo 1 value , bench "conduit" $ whnf drainC (CC.scanl (+) 0)
この手のライブラリに依存しないような環境の変化が起こったものの、まだまだ現役だ。コンビネータの種類が非常に多く、ハードルが高いという難点もある。
iteratee
- 利点 ストリームの終端、残余はもちろんシークなども表現可能
- 欠点 遅い。設計が汚く、扱いが非常に難しい
使用実績: Tsuru Capital
最古参のiterateeはpipesの約20倍遅いという残念な結果となった。
time 3.392 ms (3.299 ms .. 3.502 ms) 0.995 R² (0.993 R² .. 0.998 R²) mean 3.361 ms (3.325 ms .. 3.399 ms) std dev 121.1 us (105.5 us .. 142.4 us) variance introduced by outliers: 20% (moderately inflated)
実装もわかりやすいとは言いがたい。今あえてこのライブラリを選択する必要はないだろう。iterateeを使ったコードを保守するのは苦行そのものだ。
import qualified Data.Iteratee.Iteratee as I import qualified Data.Iteratee.ListLike as I scanI :: Monad m => (b -> a -> b) -> b -> I.Enumeratee [a] [b] m x scanI f = I.unfoldConvStream (\a0 -> I.liftI $ \case I.Chunk xs -> return $ mapAccumL (\a x -> let !r = f a x in (r, r)) a0 xs I.EOF _ -> return (a0, [a0])) sourceI :: I.Enumerator [Int] Identity a sourceI = I.enumList $ map pure [1..value] drainI :: I.Enumeratee [Int] [a] Identity () -> () drainI h = runIdentity $ I.run $ runIdentity $ I.run $ runIdentity $ sourceI $ h $ I.mapM_ $ const $ return ()
feeders
- 利点 iterateeの問題点を克服し、親しみやすいインターフェイスを持つ
- 欠点 まだまだ遅い
time 414.3 μs (409.7 μs .. 421.8 μs) 0.998 R² (0.996 R² .. 1.000 R²) mean 413.9 μs (412.1 μs .. 421.0 μs) std dev 9.814 μs (3.347 μs .. 22.63 μs) variance introduced by outliers: 15% (moderately inflated)
Feederは、「消費者に供給する構造」として生産者を表現するiterateeの考え方を継承しつつ、よりまともなデザインを目指した試作品だ。
Eaterモナドが消費者、FeederがEaterを変換するモナドとして実装されている。
newtype Feeder s n m a = Feeder { unFeeder :: forall x r. Eater s n x -> (a -> Eater s n x -> m r) -> m r } killEater :: Monad m => Eater s m a -> m a sinkNull :: Monad m => Eater s m () feed :: Monad m => Feeder s n m a -> Eater s n x -> m (a, Eater s n x) type Rancher a b m = Feeder b m (Eater a m) (>-$) :: Monad m => Rancher a b m x -> Eater b m r -> Eater a m r scan :: Monad m => (b -> a -> b) -> b -> Rancher a b m () scan f b = lift await >>= \case Nothing -> return () Just a -> do let !b' = f b a yieldOn liftP b' scan f b' drainF :: Rancher Int a Identity () -> () drainF h = runIdentity $ killEater $ snd $ runIdentity $ feed sourceF $ h >-$ sinkNull sourceF :: Feeder Int Identity Identity () sourceF = yieldMany [1..value]
こちらもストリームの終端と残余を扱えるが、conduitにスピードに負けていては仕方がない。
predators
- 利点 iterateeやconduitと同等の実用的な表現力を、一風変わったシンプルな実装で実現
- 欠点 生産者を使いきって消費者を残すインクリメンタルな使い方はできない
PredatorはFeederとは逆に、生産者を捕食する構造として消費者を実装した。
prey :: Monad m => Predator s n m a -> Prey s n x -> m (Maybe (a, Prey s n x)) type Heterotroph a b m = Predator a m (Prey b m) (@->) :: Monad m => Prey a m x -> Heterotroph a b m r -> Prey b m (Maybe r) scan :: Monad m => (b -> a -> b) -> b -> Heterotroph a b m () scan f b = do a <- awaitOn lift let !b' = f b a lift $ yield b' scan f b' drainPd :: Heterotroph Int a Identity () -> () drainPd h = maybe () fst $ runIdentity $ prey Pd.sinkNull $ sourcePd @-> h sourcePd :: Prey Int Identity () sourcePd = yieldMany [1..value]
conduitと全く異なるアプローチでありながら、残余と終端を処理でき、同等の速度も出ているのでポテンシャルを秘めている。私のやる気が続けばさらなる進展があるかもしれない。ネーミングも気に入っている。
time 300.7 μs (299.2 μs .. 302.1 μs) 0.999 R² (0.998 R² .. 0.999 R²) mean 314.9 μs (310.0 μs .. 321.2 μs) std dev 19.03 μs (13.81 μs .. 23.15 μs) variance introduced by outliers: 56% (severely inflated)
boombox
- 利点 高い柔軟性と高パフォーマンスを両立している
- 欠点 APIが非常に乏しい
ストリーム処理の大統一を目指して作ったライブラリboomboxはpipesよりも速い。
time 160.7 μs (160.1 μs .. 161.5 μs) 1.000 R² (0.999 R² .. 1.000 R²) mean 163.1 μs (162.1 μs .. 164.1 μs) std dev 3.382 μs (2.810 μs .. 4.125 μs) variance introduced by outliers: 14% (moderately inflated)
生産と消費にTapeとPlayerTという専用の構造を用意し、変換器は両者を組み合わせて表現する。ストリームの残余処理、シークなどなんでも表現できるが、可能性を残しすぎたことが仇となりAPIが乏しい。Recorder Identity Identity
がPipe
に相当する変換器で、Identity
をStore
に差し替えればシーク可能になり、当然通常のストリームからシーク可能なストリームへの変換器も定義できる。NonEmpty
コモナドを使えば複数の世界線に分岐するようなストリームも表現できる。machinesと違い、どうカスタマイズしても必ず合成ができるのがポイントだ。
scanB :: (b -> a -> b) -> b -> Recorder Identity Identity m a b scanB f = go where go b = Tape $ await >>= \x -> let !b' = f b x in return (b', pure $ go b') sourceB :: Tape Identity Maybe Int sourceB = tap [1..value] drainB :: Recorder Identity Identity Maybe Int a -> () drainB h = maybe () (\(_,_,r) -> r) $ sourceB @.$ h >-$ forever await
パフォーマンスは良好なので、全ライブラリの統一を目指して研究を続けていきたい。
machinecell
- 利点 今のところ唯一のArrowベースのライブラリ
- 欠点 遅い
time 185.5 ms (184.1 ms .. 188.5 ms) 1.000 R² (1.000 R² .. 1.000 R²) mean 184.2 ms (183.4 ms .. 185.3 ms) std dev 1.176 ms (562.9 μs .. 1.702 ms) variance introduced by outliers: 14% (moderately inflated)
machinecellはアロー変換子としてストリーム変換器を実装した異色のパッケージだ。これもなかなか速い、と思いきや単位がマイクロではなくミリで、pipesの1000分の1の速さだった。今後の改良に期待したい。
drainMc :: Mc.ProcessA (->) (Mc.Event Int) a -> () drainMc h = Mc.run_ (h >>> arr (const Mc.noEvent)) [1..value] , bench "machinecell" $ whnf drainMc (Mc.evMap (+) >>> Mc.accum 0)
まとめ
あまり凝ったことをしないならば、今のところpipesが無難だと考えている。しかし、終端処理、シークなどが絡むと、どのライブラリも困難に直面する。決着は未だついていない。