ステートマシン猛レース

ストリーム処理ライブラリはHaskellにおいて競争の激しい分野の一つだ。ストリーム処理ライブラリとは大雑把に言うと、IOなどの作用を絡めながら値の列(ストリーム)を生成したり、処理したりする構造を提供するライブラリである。多くのライブラリは、以下の3種の構造を定義している。

  • 生産者(プロデューサー): IOなどのアクションを伴いつつ値を生成する。
  • 消費者(コンシューマー): 多くの場合モナド変換子になっており、await :: Consumer s m sのようなアクションを組み合わせ、値の列を処理するプログラムを書ける。
  • 変換者(トランスデューサー): 入力を受け取りながら、出力もできる。

生産者と消費者が変換者の特殊な場合であるものも多い。

今回は、基本中の基本とも言える操作であるスキャンの速さを調べる。scan (+) 0は入力ストリーム[0,1,2,3, ...]を出力[0,1,3,6, ...]のように変換する。

iteratee, tubes, streaming, machinecell, io-streams, pipes, machines, conduit, boomboxと試作品のfeeders、predatorsパッケージをベンチマークした。ソースコードhttps://github.com/fumieval/feeders/blob/all-bench/benchmarks/benchmark.hsにある。 ライブラリ数という点では、2017年現在最も網羅的なベンチマークだろう。

pipes

  • 使用実績: ghc-mod, purescript
  • 利点 速い、ドキュメントが豊富
  • 欠点 終端、残余を扱えない

まずは人気のpipes。yieldとawaitをモナドで組み合わせる素直なインターフェイスが魅力で、scanの実装もわかりやすい。 ただし、runEffect以外の方法での分解はあまり想定していないのか、自分でPipeを分解するにはPipes.Internalモジュールをインポートしないといけない。その際はpipesの設計の理解が必須となる。

scan :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Pipe a b m r
scan step begin done = go begin
  where
    go x = do
        yield (done x)
        a <- await
        let x' = step x a
        go $! x'

変換に相当する値と、それを走らせる関数に分けてCriterionベンチマークする。

sourceP :: Monad m => P.Producer Int m ()
sourceP = each [1..10000]

drainP :: Pipe Int a Identity () -> ()
drainP p = runIdentity $ runEffect $ for (sourceP >-> p) discard

main = defaultMain
  [ bench "pipes" $ whnf drainP (scan (+) 0 id) ]

10000要素を処理するのに179μsという結果が出た。一件あたり18ナノ秒はなかなか悪くないと言えるだろう。なおGHCは8.0.2で、CPUはIntel(R) Core(TM) i7-6700K CPU @ 4.00GHzである。

time                 179.3 μs   (177.9 μs .. 180.8 μs)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 179.1 μs   (178.4 μs .. 179.9 μs)
std dev              2.457 μs   (2.043 μs .. 3.124 μs)

tubes

time                 22.99 s    (22.21 s .. 24.75 s)
                     0.999 R²   (0.999 R² .. 1.000 R²)
mean                 22.86 s    (22.61 s .. 23.04 s)
std dev              269.8 ms   (0.0 s .. 308.6 ms)

23「秒」という圧倒的な時間が目を引いたのはtubesだ。Freeモナドをベースにした基本を押さえたAPIリストモナド相当のSourceに加えContravariantなSinkと第一印象は良いが、さすがに10万倍も遅いと実用的とは言いがたい。

scanT :: Monad m => (b -> a -> b) -> b -> Tube a b m x
scanT f = go where
  go b = await >>= \x -> let !b' = f b x in yield b' >> go b'

sourceT :: Monad m => Tube () Int m ()
sourceT = each [1..value]

drainT :: Tube Int a Identity () -> ()
drainT h = runIdentity $ runTube $ sourceT >< h >< stop

streaming

使用実績: 不明

  • 利点 速い
  • 欠点 消費者がない
benchmarking scan/streaming
time                 77.40 μs   (77.07 μs .. 77.74 μs)
                     1.000 R²   (1.000.. 1.000 R²)
mean                 77.17 μs   (77.01 μs .. 77.41 μs)
std dev              668.9 ns   (552.8 ns .. 820.2 ns)

streamingpipesの倍以上の速度が印象的だ。streamingには変換者や消費者に相当する構造はないため、やや不公平な比較かもしれない。

Stream (Of a)がaを生産するモナド変換子であり、これをリストのように扱う数々の関数が提供されている。

drainS :: (Stream (Of Int) Identity () -> Stream (Of a) Identity ()) -> ()
drainS h = runIdentity $ effects $ h sourceS

sourceS :: Monad m => Stream (Of Int) m ()
sourceS = each [1..value]

...
, bench "streaming" $ whnf drainS (S.scan (+) 0 id)

ストリーム処理ライブラリを使う動機はモナディックな消費であることが多いが、そうでない場合は選択肢となりうるだろう。

io-streams

使用実績: snap

  • 利点 速い
  • 欠点 何を書くにもIOを使わないといけない
time                 87.93 μs   (86.58 μs .. 89.68 μs)
                     0.996 R²   (0.989 R² .. 1.000 R²)
mean                 88.13 μs   (87.08 μs .. 91.47 μs)
std dev              5.351 μs   (1.913 μs .. 10.55 μs)
variance introduced by outliers: 63% (severely inflated)

io-streamsは使うモナドをIOに限定するという開き直った設計のパッケージだ。たかをくくっていたが、streamingに迫る速度が出ており侮れない。

import qualified System.IO.Streams as Is

drainIs :: (Is.InputStream Int -> IO (Is.InputStream b)) -> IO ()
drainIs h = do
  i <- Is.fromList [1..value]
  i' <- h i
  o <- Is.nullOutput
  Is.connect i' o

scanIs :: (b -> a -> b) -> b -> Is.InputStream a -> IO (Is.InputStream b)
scanIs f b0 i = do
  r <- newIORef b0
  Is.makeInputStream $ Is.read i >>= \case
    Nothing -> return Nothing
    Just x -> do
      b <- readIORef r
      let !b' = f b x
      writeIORef r b'
      return $ Just b'

machines

使用実績: 不明

  • 利点 各種構造が透明で、拡張性に富む
  • 欠点 Tee、Stackなどの発展形はAPIが乏しく、うまく合成できない

ekmett発のmachinesはまずまずの性能だ。

time                 190.8 μs   (190.1 μs .. 191.5 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 190.7 μs   (190.2 μs .. 191.1 μs)
std dev              1.542 μs   (1.366 μs .. 1.771 μs)

PlanTというCPSモナドから変換器のMachineTを鋳造するというアプローチを用いている。複数の入力をサポートしているのも面白い。比較的習得は容易だが奥が深い。また、pipesと違い終端に対応できる。

sourceM = enumerateFromTo 1 value

drainM :: ProcessT Identity Int o -> ()
drainM m = runIdentity $ runT_ (sourceM ~> m)

, bench "machines" $ whnf drainM (scan (+) 0)

conduit

使用実績: Yesod

  • 利点 ストリームの終端や残りをきちんと扱える
  • 欠点 APIが複雑。オーバーヘッドがある
time                 302.4 μs   (301.5 μs .. 303.3 μs)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 302.1 μs   (301.6 μs .. 302.8 μs)
std dev              2.029 μs   (1.547 μs .. 2.515 μs)

かつて黄金時代を築いたconduitはmachinesよりも遅かった。しかし、ストリームの終端及び残余、リソースの解放などをサポートしていることを考えればかなり優秀だと言える。

import qualified Data.Conduit.List as C
import qualified Data.Conduit.Combinators as CC

drainC :: C.Conduit Int Identity a -> ()
drainC c = runIdentity $ (sourceC C.$= c) C.$$ C.sinkNull

sourceC = C.enumFromTo 1 value

, bench "conduit" $ whnf drainC (CC.scanl (+) 0)

この手のライブラリに依存しないような環境の変化が起こったものの、まだまだ現役だ。コンビネータの種類が非常に多く、ハードルが高いという難点もある。

iteratee

  • 利点 ストリームの終端、残余はもちろんシークなども表現可能
  • 欠点 遅い。設計が汚く、扱いが非常に難しい

使用実績: Tsuru Capital

最古参のiterateeはpipesの約20倍遅いという残念な結果となった。

time                 3.392 ms   (3.299 ms .. 3.502 ms)
                     0.995 R²   (0.993 R² .. 0.998 R²)
mean                 3.361 ms   (3.325 ms .. 3.399 ms)
std dev              121.1 us   (105.5 us .. 142.4 us)
variance introduced by outliers: 20% (moderately inflated)

実装もわかりやすいとは言いがたい。今あえてこのライブラリを選択する必要はないだろう。iterateeを使ったコードを保守するのは苦行そのものだ。

import qualified Data.Iteratee.Iteratee as I
import qualified Data.Iteratee.ListLike as I

scanI :: Monad m => (b -> a -> b) -> b -> I.Enumeratee [a] [b] m x
scanI f = I.unfoldConvStream (\a0 -> I.liftI $ \case
  I.Chunk xs -> return $ mapAccumL (\a x -> let !r = f a x in (r, r)) a0 xs
  I.EOF _ -> return (a0, [a0]))

sourceI :: I.Enumerator [Int] Identity a
sourceI = I.enumList $ map pure [1..value]

drainI :: I.Enumeratee [Int] [a] Identity () -> ()
drainI h = runIdentity $ I.run $ runIdentity $ I.run $ runIdentity $ sourceI $ h $ I.mapM_ $ const $ return ()

feeders

  • 利点 iterateeの問題点を克服し、親しみやすいインターフェイスを持つ
  • 欠点 まだまだ遅い
time                 414.3 μs   (409.7 μs .. 421.8 μs)
                     0.998 R²   (0.996 R² .. 1.000 R²)
mean                 413.9 μs   (412.1 μs .. 421.0 μs)
std dev              9.814 μs   (3.347 μs .. 22.63 μs)
variance introduced by outliers: 15% (moderately inflated)

Feederは、「消費者に供給する構造」として生産者を表現するiterateeの考え方を継承しつつ、よりまともなデザインを目指した試作品だ。

Eaterモナドが消費者、FeederがEaterを変換するモナドとして実装されている。

newtype Feeder s n m a = Feeder { unFeeder :: forall x r. Eater s n x -> (a -> Eater s n x -> m r) -> m r }

killEater :: Monad m => Eater s m a -> m a
sinkNull :: Monad m => Eater s m ()
feed :: Monad m => Feeder s n m a -> Eater s n x -> m (a, Eater s n x)

type Rancher a b m = Feeder b m (Eater a m)
(>-$) :: Monad m => Rancher a b m x -> Eater b m r -> Eater a m r

scan :: Monad m => (b -> a -> b) -> b -> Rancher a b m ()
scan f b = lift await >>= \case
  Nothing -> return ()
  Just a -> do
    let !b' = f b a
    yieldOn liftP b'
    scan f b'

drainF :: Rancher Int a Identity () -> ()
drainF h = runIdentity $ killEater $ snd $ runIdentity $ feed sourceF $ h >-$ sinkNull

sourceF :: Feeder Int Identity Identity ()
sourceF = yieldMany [1..value]

こちらもストリームの終端と残余を扱えるが、conduitにスピードに負けていては仕方がない。

predators

  • 利点 iterateeやconduitと同等の実用的な表現力を、一風変わったシンプルな実装で実現
  • 欠点 生産者を使いきって消費者を残すインクリメンタルな使い方はできない

PredatorFeederとは逆に、生産者を捕食する構造として消費者を実装した。

prey :: Monad m => Predator s n m a -> Prey s n x -> m (Maybe (a, Prey s n x))

type Heterotroph a b m = Predator a m (Prey b m)
(@->) :: Monad m => Prey a m x -> Heterotroph a b m r -> Prey b m (Maybe r)

scan :: Monad m => (b -> a -> b) -> b -> Heterotroph a b m ()
scan f b = do
  a <- awaitOn lift
  let !b' = f b a
  lift $ yield b'
  scan f b'

drainPd :: Heterotroph Int a Identity () -> ()
drainPd h = maybe () fst $ runIdentity $ prey Pd.sinkNull $ sourcePd @-> h

sourcePd :: Prey Int Identity ()
sourcePd = yieldMany [1..value]

conduitと全く異なるアプローチでありながら、残余と終端を処理でき、同等の速度も出ているのでポテンシャルを秘めている。私のやる気が続けばさらなる進展があるかもしれない。ネーミングも気に入っている。

time                 300.7 μs   (299.2 μs .. 302.1 μs)
                     0.999 R²   (0.998 R² .. 0.999 R²)
mean                 314.9 μs   (310.0 μs .. 321.2 μs)
std dev              19.03 μs   (13.81 μs .. 23.15 μs)
variance introduced by outliers: 56% (severely inflated)

boombox

  • 利点 高い柔軟性と高パフォーマンスを両立している
  • 欠点 APIが非常に乏しい

ストリーム処理の大統一を目指して作ったライブラリboomboxpipesよりも速い。

time                 160.7 μs   (160.1 μs .. 161.5 μs)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 163.1 μs   (162.1 μs .. 164.1 μs)
std dev              3.382 μs   (2.810 μs .. 4.125 μs)
variance introduced by outliers: 14% (moderately inflated)

生産と消費にTapeとPlayerTという専用の構造を用意し、変換器は両者を組み合わせて表現する。ストリームの残余処理、シークなどなんでも表現できるが、可能性を残しすぎたことが仇となりAPIが乏しい。Recorder Identity IdentityPipeに相当する変換器で、IdentityStoreに差し替えればシーク可能になり、当然通常のストリームからシーク可能なストリームへの変換器も定義できる。NonEmptyモナドを使えば複数の世界線に分岐するようなストリームも表現できる。machinesと違い、どうカスタマイズしても必ず合成ができるのがポイントだ。

scanB :: (b -> a -> b) -> b -> Recorder Identity Identity m a b
scanB f = go where
  go b = Tape $ await >>= \x -> let !b' = f b x in return (b', pure $ go b')

sourceB :: Tape Identity Maybe Int
sourceB = tap [1..value]

drainB :: Recorder Identity Identity Maybe Int a -> ()
drainB h = maybe () (\(_,_,r) -> r) $ sourceB @.$ h >-$ forever await

パフォーマンスは良好なので、全ライブラリの統一を目指して研究を続けていきたい。

machinecell

  • 利点 今のところ唯一のArrowベースのライブラリ
  • 欠点 遅い
time                 185.5 ms   (184.1 ms .. 188.5 ms)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 184.2 ms   (183.4 ms .. 185.3 ms)
std dev              1.176 ms   (562.9 μs .. 1.702 ms)
variance introduced by outliers: 14% (moderately inflated)

machinecellはアロー変換子としてストリーム変換器を実装した異色のパッケージだ。これもなかなか速い、と思いきや単位がマイクロではなくミリで、pipesの1000分の1の速さだった。今後の改良に期待したい。

drainMc :: Mc.ProcessA (->) (Mc.Event Int) a -> ()
drainMc h = Mc.run_ (h >>> arr (const Mc.noEvent)) [1..value]

, bench "machinecell" $ whnf drainMc (Mc.evMap (+) >>> Mc.accum 0)

まとめ

あまり凝ったことをしないならば、今のところpipesが無難だと考えている。しかし、終端処理、シークなどが絡むと、どのライブラリも困難に直面する。決着は未だついていない。