単純で頑強なメッセージングシステム、franz

Haskell製の新しいメッセージングシステムfranz(フランツ)の紹介。

github.com

背景

取引所にあるマシンで取引プログラムを実行するのが我々の仕事だが、朝8時に起動したらあとは昼寝したり酒を飲んだりというわけにはいかない。モニタリングしたり、分析のためにデータを残しておく必要がある。そのため、プログラムによって解析しやすい形でログを出力する。 今までは複数の種類のレコードをシリアライズし、一つのファイルに連結させる独自のフォーマットを10年近く使っていたが、書いていて恥ずかしくなるような多数の問題を抱えていた。

  • 柔軟性が乏しい: 32bit整数や文字列などの単純な値しか格納できず、例えばレコードを含むレコードなどを表現できない。その結果、複雑なデータは一旦文字列に変換するような運用がされており、そのプリティプリンタやパーサは十分にテストされていない。
  • コードがまとまっていない: シリアライザとデシリアライザが非対称的に実装されており、メンテナンスが難しい。どちらも取引関連アプリケーションのための特別な処理がハードコードされている。
  • シークできない: n番目の要素に素早くアクセスする方法、ましてやタイムスタンプなどを元にシークする方法がない。
  • 読み込みが遅い: 書き込みはチューニングされているが、読み込み処理が非常に遅い。さらに実際に使うAPIFRPベースなのでさらにオーバーヘッドがある。
  • 配信に適さない: 帯域幅の理由でログの全てをオフィスに送ることはできないため、あらかじめダウンサンプリングする必要がある。しかし上述の問題からログからログへの変換としては実装されておらず、オフィスでログを再構築するための配信専用プロトコルが実装されている。結果としてコードが非常に冗長になっているほか、本来のログとの互換性も不完全であり、アプリケーションの振る舞いも一貫性がない。

Kafkaの誘惑

Apache Kafkaでシークの非効率性を解決できるのではという提案があり、本格的に取り組み始めた。Kafkaは、文字列のリストを永続化し、任意の要素を高速に取り出せるようなサーバーを提供する。例の独自フォーマットの断片をKafkaのペイロードにするという仕組みで、私は独自フォーマットを使い続けるのには反対だったが、プロジェクトは進行した。現在では以下のようになっている。

  • 配信専用プロトコルから再構築したログをスライスし、Kafkaに送るアプリケーションを実行する。
  • GUIのクライアントなど、シークを要求する一部のアプリケーションはKafkaからデータを読み出す。

しかし、Kafkaにも問題点があった。

  • 異常終了するとインデックスが壊れ、再起動に非常に長い(トピック数に比例した)時間がかかる。取引プログラムがログを送信する相手としては致命的だ。
  • 要素のインデックスや、Kafkaブローカー(サーバー)のタイムスタンプによるアクセスはできるが、自分で決めたタイムスタンプは使えない。そのため、検索には二分探索や割線法のために複数回の送受信が必要になり、クライアントのレイテンシが大きいと極めて効率が悪い。

醸造家と音楽家

データ表現の柔軟性、実装の対称性の課題は、wineryというライブラリによって解決した。Haskell上の表現からディスク上の表現を簡単に導出できるため、ソースコードの量を大幅に削減でき、バグも発生しにくい。

fumieval.hatenablog.com

残りの問題は、以下の要件を満たす新しいシステムによって実現すべきという結論に至った。

  • 書き込みにはサーバーが不要で、サーバーは読み出しのみを行う。万が一サーバーがダウンしても取引プログラムに影響しない。
  • サーバーが新たなアイテムを検出し次第、クライアントに送信するようなクエリを表現できる。
  • システムもそのAPIも、チームがメンテナンスできる言語(Haskell)で実装されている。
  • ブロックするようなクエリは、好きなタイミングで中断できる。
  • 連番の他、任意のタイムスタンプによって望んだ要素にアクセスできる。

私はLisztという新たなコンテナフォーマットを開発した。LisztはCouchDBのモデルをただのリストのマップに簡略化したような設計で、フッターにポインタを並べることで木構造を表現する。一つのファイルのみを扱うので理論上の効率と信頼性は高いが、フォーマットの複雑さを理由にチームの合意は得られず没となった。

github.com

代わりに、Kafkaと同様、ペイロードのみを連結したファイル(ペイロードファイル)と、64ビット整数で表されるペイロードのポインタのみを連結したファイル(インデックスファイル)を二つ作るのがFranzだ(ファイルシステムの実装を考えれば、本質的にはLisztとやっていることは変わらないとも言える)。サーバー側はinotifyなどを用いてインデックスファイルを監視し、sendfileペイロードをクライアントに送る。ペイロードが連結されているため、sendfileでまとめて効率よく送れるのは一つの利点だ。

各要件を満たすため、決して高度なものではないが様々な技法が用いられている。

ビルダーの本気

HaskellでIOというとByteStringが定番ではあるが、ここではfast-builderのビルダーを採用した。ビルダーはByteStringに効率よく変換できるモノイドとしてよく知られた構造だが、ByteStringを介さずにファイルなどに書き込むこともできる。

-- Data.ByteString.FastBuilder
byteString :: ByteString -> Builder
word8 :: Word8 -> Builder
word64LE :: Word64 -> Builder

toStrictByteString :: Builder -> ByteString
hPutBuilder :: Handle -> Builder -> IO ()

ByteStringにするまで出力される文字列の長さがわからないのが唯一の欠点だったが、書き込んだ文字列の長さを返すhPutBuilderLen :: Handle -> Builder -> IO Intを追加したため、この問題も解消された。

つまり、要素の追加は「ペイロードファイルにhPutBuilderLenでバイト列を書き込み、その結果を用いてインデックスファイルにペイロードのオフセットを書き込む」という極めて単純な処理である。要素を1つ追加するたびにwrite(2)をそれぞれ呼ぶのは非効率的なので、可能な限りHandle内部のバッファを活用したいが、ペイロードよりも先にインデックスファイルが書き込まれてしまうとおかしなことになる。もちろんインデックスファイルのバッファは明示的に実装しており、安心してナイスバルクインサートできる。

なお、openFileを使うと、GHCはファイルデスクリプタをノンブロッキングモードとして作成する。これは内部でunsafeなforeign callを使うため、hFlushなどの実行中に他のスレッドがGCを要求するとプログラム全体がストップする危険性がある。それを避けるため、openFileBlockingを代わりに使っている。

色々なテクニックを盛り込んだが、基本のインターフェイスは3つの関数にうまくまとめることができた。謎のfパラメータについては後述する。

-- Database.Franz
withWriter :: Foldable f => f String -> FilePath -> (WriterHandle f -> IO r) -> IO r 
write :: WriterHandle f -> f Int64 -> Builder -> IO Int
flush :: WriterHandle f -> IO ()

n種類のタイムスタンプ

稀なユースケースかもしれないが、本体のタイムスタンプと、取引所から送られてきたタイムスタンプの両方を記録したい。特に過去のデータでシミュレーションする際は両者は大きく異なる。そこで、シークなどのために任意の個数の値を付随させられるようにしてある——それが型パラメータfだ。

data Timestamps a = Timestamps a a deriving Foldable

あらかじめ上のようなデータ型を定義しておき、withWriter (Timestamps "MarketTime" "LocalTime")のように名前を指定する。writeには具体的な値をTimestamps Int64型で与えればよい。これらが不要な場合、Data.ProxyProxyを渡せばOKだ(忘れがちだが、MonadやTraversableなどのインスタンスがついた0要素のリストとして使える)。

継続は力なり

リアルタイムでメッセージを配信する以上、クライアントは最新の要素が来るまで待つ必要がある。プログラミングにおいて、ブロックする、つまり何かができるまで待つような振る舞いはコントロールが難しくなりがちだが、franzは高い柔軟性を持つ革新的なアプローチを採用した––継続とSTMのコンボである。

awaitResponse :: STM Response -> STM Contents
type Contents = [(Int, SomeIndexMap, B.ByteString)]

fetch :: Connection
  -> Query -- ^ リクエスト
  -> (STM Response -> IO r) -- ^ 「レスポンスを受け取るトランザクション」を受け取る継続
  -> IO r

以下のコードは、stm-delayを利用してタイムアウトを実現する。動きを細かく追ってみよう。

fetch conn q $ \t -> do
  d <- newDelay 1000000
  atomically
     $ Just <$> awaitResponse t
     <|> Nothing <$ waitDelay d
  • まずサーバーにクエリqを送信する。
  • 返信を待たずにトランザクションを作成し、継続に渡す。
  • newDelay 1000000でディレイdを作成する。waitDelay dを呼ぶと、作成してから1秒経つまでブロックする。
  • 以下のどちらかが可能になるまで待つ。
    • awaitResponseでレスポンスを受信し終えたら、Justに包んで返す。
    • 1秒経過したら、Nothingを返す。
  • サーバーに処理が完了した旨を通知する。
  • クエリは破棄され、仮にレスポンスが既に送られたとしてもクライアントはそれを捨てる。

継続に与えたトランザクションtは、atomicallyによって実行して初めて結果を待つ。STMであるがゆえに、タイムアウトなどの理由で待つのを諦めるような振る舞いを合成できるというわけだ。継続が終了すれば、その旨もサーバーに送信されるため、待機処理がサーバーに溜まることもない。ContTなどで全体を合成すれば一度にたくさんのリクエストを送ることもでき、当然Traversable APIのようなテクニックも使える。このコンボはブロッキングAPIの新定番としてのポテンシャルを秘めていると考えている。

そこまで柔軟性が要求されない場合のために、タイムアウトのみを指定するクラシックなAPIも用意した。クエリの具体的な構造も以下に示す。

fetchSimple :: Connection
  -> Int -- ^ timeout in microseconds
  -> Query
  -> IO Contents

data ItemRef = BySeqNum !Int -- ^ sequential number
  | ByIndex !B.ByteString !Int -- ^ index name and value
data Query = Query
  { reqStream :: !B.ByteString
  , reqFrom :: !ItemRef -- ^ name of the index to search
  , reqTo :: !ItemRef -- ^ name of the index to search
  , reqType :: !RequestType
  }
data RequestType = AllItems | LastItem

ConnectionwithConnection関数を用いて作成する。

withConnection :: String -- ^ ホスト
  -> PortNumber -- ^ ポート
  -> ByteString -- ^ ストリームプレフィクス
  -> (Connection -> IO r) -> IO r

サーバーを立ち上げるのは簡単で、データが格納されているパスを指定すればよい。

franzd .

圧縮

書き込み終わった1日分のログをまとめて圧縮する手段としてSquashFSを採用した。指定されたプレフィクスと同名のイメージがある場合、サーバーはそれをFUSEでマウントするという機能がある。サーバーを起動する際、アーカイブを格納するパスをオプションとして指定することで利用できる。

franzd ./live ./archive

適切なオプションなら高い圧縮率を実現できる一方、パフォーマンスも意外に良好で、むしろ圧縮してある方が読み込みが速いのではないかと感じるほどだ。当初はあまり期待していなかったが、複数のファイルにまたがるフォーマットは取り回しが悪いという欠点をうまく克服できている。

今後の展開

ロギングの構成を完全に置き換える前段階として、分析用データの格納のために運用している。大きなバグもなく好感触だが、並行処理やIOをふんだんに使ったこの手のプログラムは細部の動きが怪しくなりがちだ。どんな状況でもきっちり動くように煮詰めていきたい。