Elias-Fano encoding: 単調増加する数列をほぼ簡潔に表現する

Haskell Advent Calendar 2018 20日

単調増加する自然数の列を、最低限のビット数で表現するための興味深いテクニックと、Haskellによる実装を紹介する。

Elias-Fano encoding

この手法は、簡潔データ構造に分類されるもの一つであるが、厳密には条件を満たさないため疑似簡潔データ構造と呼ばれる。1970年代、Peter EliasとRobert Mario Fanoによって独立して発見された。

例題として1, 1, 4, 10, 17, 22, 23, 30という列をエンコードしてみよう。まず、それぞれの数を上位3ビットと下位2ビットに分割する。列の長さをNとしたとき、上位のビット数は{ \displaystyle
\lceil \lg N \rceil
}とする。

上位ビットの列は000 000 001 010 100 101 101 111となる。これをヒストグラムのようにして23個のビンに分ける。

  • 000: 2個
  • 001: 1個
  • 010: 1個
  • 011: 0個
  • 100: 1個
  • 101: 2個
  • 110: 0個
  • 111: 1個

これにアルファ符号(個数分だけ1を並べ、0で区切る)を適用すれば上位の表現は完成だ。要素数分の1と、バケット{ \displaystyle
2^{\lceil \lg N \rceil}
}分の0を合わせるのでおよそ{ \displaystyle
2N
}ビット使う。

1101010010110010

下位ビットの列は、そのまま結合する。こちらは、元のビット数をWとしたとき、{ \displaystyle
N(W-lg(N))
} ビット消費する。

0101001001101110

32ビットの整数を105個格納する場合、消費ビット数は17*105ビットとなかなか優秀な圧縮率を誇る。

n番目の値を読み出すときは、上位は{ \displaystyle select_1(n) - n}を求め、下位はそのまま取り出してくっつければよい。{ \displaystyle select_1(n)}は簡潔データ構造の文脈でよく使われる演算の一つで、ビット列のn番目に出現する1の位置を求める。nを引くのは、ちょうどn個だけ0が混ざっているからだ。

実装

早速実装してみよう。上位ビットのエンコーダは非自明そうに見えるが、要素の位置、上位ビット、カウンタの三つ組(i, u, n)を持つステートマシンで簡単に表現できる。

  • (i, u, n) = (0, 0, 0)から開始する
  • uが最大値を超えたとき、nを出力し停止
  • i番目の上位ビットとuを比較する
    • 等しい場合、(i + 1, u, n + 1)に更新する
    • 異なる場合、nを出力し、(i, u + 1, 0)に更新する

あまり知られていないが、vectorパッケージにはData.Vector.Fusion.Stream.Monadicという簡単なオートマトンを表現するためのモジュールがある。状態と、それを更新する関数の対という単純な作りで、最適化が効く限りこの構造は消滅してただのループとなる。表現力は限られているが、他の追従を許さないパフォーマンスを誇る。

data Stream m a = forall s. Stream (s -> m (Step s a)) s

data Step s a where
  Yield :: a -> s -> Step s a
  Skip  :: s -> Step s a
  Done  :: Step s a

ビット列をVector Boolのように愚直に扱っていては空間効率が悪いので、まずはビット列をWord64の配列に変換する仕組みを作りたい。ここでは受けとったWord64から任意のビット数だけ切り取り、それらを連結して出力する変換器を定義する。Bの最初のフィールドが切り取るビット数となる。溢れたビットの処理はやや煩雑だが、やるだけなので読み飛ばしても構わない。ただしINLINEは必須で、これがないと最適化が止まってしまいポテンシャルを発揮できない。

import Data.Bits
import Data.Word (Word64)
import qualified Data.Vector.Fusion.Stream.Monadic as S

data B = B !Int !Word64

data Chunker s = Chunker s !Word64 !Int
  | ChunkerDone

chunk64 :: Applicative m => S.Stream m B -> S.Stream m Word64
chunk64 (S.Stream upd s0) = S.Stream go $ Chunker s0 zeroBits 0 where
  go ChunkerDone = pure S.Done
  go (Chunker s acc len) = flip fmap (upd s) $ \case
    S.Done -> S.Yield acc ChunkerDone
    S.Skip s' -> S.Skip $ Chunker s' acc len
    S.Yield (B width w) s' -> case mask width .&. w of
      w' | width + len >= 64 -> S.Yield (acc .|. unsafeShiftL w' len)
            $ Chunker s' (unsafeShiftR w' (64 - len)) (len + width - 64)
         | otherwise -> S.Skip $ Chunker s' (acc .|. unsafeShiftL w' len) (len + width)
{-# INLINE chunk64 #-}

次に、アルファ符号のエンコーダを定義する。chunk64は64より長いビット列を受け取れないので、複数に分けるという処理をする。

data Unary s = Unary s | UnaryCont !Int s

unary :: Applicative m => S.Stream m Int -> S.Stream m B
unary (S.Stream upd s0) = S.Stream go $ Unary s0 where
  go (Unary s) = flip fmap (upd s) $ \case
    S.Done -> S.Done
    S.Skip s' -> S.Skip (Unary s')
    S.Yield n s' -> step n s'
  go (UnaryCont n s') = pure $ step n s'
  step n s'
    | n < 64 = S.Yield ((n + 1) `B` mask n) (Unary s')
    | otherwise = S.Yield (B 64 (complement zeroBits)) (UnaryCont (n - 64) s')
{-# INLINE unary #-}

これらを組み合わせてエンコーダを定義する。エンコードの結果として出力するのは以下の5つ組だ。

data EliasFano = EliasFano
    { efLength :: !Int
    , efWidth :: !Int
    , efUpper :: !(UV.Vector Word64)
    , efRanks :: !(UV.Vector Int)
    , efLower :: !(UV.Vector Word64)
    }
    deriving Show
  • efLength: 要素数
  • efWidth: 下位ビットの幅
  • efUpper: 上位ビットの列
  • efRanks: 上位ビットの列をpopcountして累積加算させた列(あとで有利に働く)
  • efLower: 下位ビットの列

以下がエンコーダの実装だ。Streamを効率よくVectorに変換する関数が提供されており、fromStream'でそれらを利用している。upd関数は先に述べたアルゴリズムを実装している。unsafeが散らばっていて可読性はよくないが、ロジックは比較的わかりやすいだろう。

import Control.Monad.ST
import Data.Bits
import qualified Data.Vector.Generic as V
import qualified Data.Vector.Generic.Mutable as GM
import qualified Data.Vector.Unboxed as UV
import qualified Data.Vector.Fusion.Bundle.Monadic as B
import qualified Data.Vector.Fusion.Bundle.Size as B
import qualified Data.Vector.Fusion.Stream.Monadic as S
import Data.Word

data EncoderState s = ESCont !Int !Word64 !Int | ESDone

unsafeFromVector :: V.Vector v Word64 => v Word64 -> EliasFano
unsafeFromVector vec = runST $ do
  efLower <- fromStream' ((efWidth * efLength + 63) `div` 64)
    $ chunk64 $ S.map (B efWidth . fromIntegral) $ B.elements $ B.fromVector vec
  efUpper <- fromStream' ((efLength + 3) `div` 4)
    $ chunk64 $ unary $ S.Stream upd $ ESCont 0 0 0
  return EliasFano
    { efRanks = UV.prescanl (+) 0 $ UV.map popCount efUpper
    , ..
    }
  where
    upd ESDone = pure S.Done
    upd (ESCont i current n)
      | current > maxValue `unsafeShiftR` efWidth = pure $ S.Yield n ESDone
      | otherwise = pure $ case fromIntegral $ V.unsafeIndex vec i `unsafeShiftR` efWidth of
        u | u == current -> S.Skip $ ESCont (i + 1) current (n + 1)
          | otherwise -> S.Yield n (ESCont i (current + 1) 0)

    efLength = V.length vec

    fromStream' len s = GM.munstream (B.fromStream s (B.Exact len))
      >>= UV.unsafeFreeze
    {-# INLINE fromStream' #-}

    maxValue
      | V.null vec = 1
      | otherwise = V.last vec + 1
    efWidth = max 1 $ ceiling $ logBase 2 (fromIntegral maxValue / fromIntegral efLength :: Double)
{-# SPECIALISE unsafeFromVector :: UV.Vector Word64 -> EliasFano #-}

鬼門となるのは要素のアクセスだ。まずはWord64に対するselect関数を実装しなければならない。これに関しては優れたアルゴリズムが研究されており*1、ekmett先生によってすでに実装されていた*2のでそれを拝借した。この手法を直感的に理解するのは、私の頭では不可能だった。

ここさえクリアすれば、Word64の配列に対してselectを実装するのはさほど難しくない。どの要素に対してselectWord64を呼べばいいかは、先に用意しておいたpopcountの配列に対する二分探索によって判断できる。

select :: (V.Vector v Int, V.Vector v Word64) => v Int -> v Word64 -> Int -> Int
select ranks vec q = go 0 (V.length ranks - 1) where
  go l r
    | l >= r = selectWord64 v (q - V.unsafeIndex ranks l) + 64 * l
    | q < V.unsafeIndex ranks (i + 1) = go l i
    | otherwise = go (i + 1) r
    where
      i = div (l + r) 2
      v = V.unsafeIndex vec i
{-# SPECIALISE select :: UV.Vector Int -> UV.Vector Word64 -> Int -> Int #-}

下位ビットを読み出す処理は、ビットを跨ぐ場合さえ気をつければ大丈夫だ。

readBits :: V.Vector v Word64 => v Word64 -> Int -> Int -> Word64
readBits vec width pos
  | b + width > 64 = unsafeShiftL extra (64 - b) .|. base
  | otherwise = base
  where
    i = unsafeShiftR pos 6
    b = pos .&. 63
    base = (V.unsafeIndex vec i `unsafeShiftR` b) .&. mask width
    extra = V.unsafeIndex vec (i + 1) .&. mask (width + b - 64)
{-# SPECIALISE readBits :: UV.Vector Word64 -> Int -> Int -> Word64 #-}

これらを合わせればインデックスの処理が出来上がる。上位ビットの配列は、元の列の長さNに対して{ \displaystyle O(\log N)}個、selectの計算量は配列の長さNについて{ \displaystyle O(\log N)}なので、この演算の計算量は{ \displaystyle O(\log \log N)}となる。

(!) :: EliasFano -> Int -> Word64
(!) (EliasFano _ width upper ranks lower) i
  = fromIntegral (unsafeShiftL (select ranks upper i - i) width)
  .|. readBits lower width (i * width)

ここで紹介したコードはGitHubに保存している。

github.com

テスト

ビット演算という性質上、同じ型の中でたくさんの値を扱うためバグが湧きやすい。QuickCheckは大いに役立った。最終チェックとして以下のテストを残した。

import qualified Test.QuickCheck as QC

prop_access :: [QC.NonNegative Int] -> QC.NonNegative Int -> QC.Property
prop_access xs i_ = QC.counterexample (show (base, ef, i))
  $ ef ! i == base !! i
  where
    i = QC.getNonNegative i_ `mod` length base
    base = scanl (+) 0 $ map (fromIntegral . QC.getNonNegative) xs
    ef = unsafeFromVector $ UV.fromList base

ベンチマーク

やはり気になるのはパフォーマンスだ。適当に単調増加する数列をこしらえ、反転やインデックスなどの基本的な演算と比較した。

td :: V.Vector Word64
td = V.scanl (+) 0 $ V.fromList $ map (toEnum . fromEnum) $ "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."

main = do
  let ef = unsafeFromVector td
  defaultMain
    [ bench "encode/elias-fano" $ whnf unsafeFromVector td
    , bench "encode/vector" $ whnf V.reverse td
    , bench "access/elias-fano" $ nf (map (ef!)) [0..V.length td - 1]
    , bench "access/vector" $ nf (map (td V.!)) [0..V.length td - 1]
    ]

結果はこの通りだ。インデックスはUnboxed Vectorの2倍強遅いが、内部で3つの配列を使っていることを考えると期待以上であると言える。

encode/elias-fano                        mean 3.891 μs  ( +- 78.82 ns  )
reverse/vector                            mean 705.7 ns  ( +- 12.20 ns  )
access/elias-fano                        mean 16.69 μs  ( +- 423.2 ns  )
access/vector                            mean 7.499 μs  ( +- 2.076 μs  )

応用

Elias-Fano encodingはマイナーな手法ではあるが、よく知られている応用として検索のためのインデックス化が挙げられる。単語ごとに、マッチする文書の番号の一覧をメモリ上に保持するといった風に利用できる。ここでは紹介しなかったが、ある値より等しいかそれ以上の要素を探すという演算も効率よく実装でき、積集合を求める操作において並外れたパフォーマンスを発揮する。

また、個人的な研究として、LOUDSなどの木構造の簡潔な表現と組み合わせ、シリアライズに応用できないか模索している。直列化に簡潔データ構造を用いる例は少ないので開拓しがいがあると感じている。

まとめ

  • Elias-Fano encodingは、単調増加する自然数の列を圧縮しつつ、効率の良いアクセスも提供する
  • Data.Vector.Fusion.Stream.Monadicを使って、パフォーマンスを犠牲にすることなく抽象度を高められる
  • QuickCheckにより、複雑でバグを仕込みやすい処理も簡単にテストできる
  • 利用例がまだ少なく、研究しがいがある

参考文献