Haskell製の新しいメッセージングシステムfranz(フランツ)の紹介。
背景
取引所にあるマシンで取引プログラムを実行するのが我々の仕事だが、朝8時に起動したらあとは昼寝したり酒を飲んだりというわけにはいかない。モニタリングしたり、分析のためにデータを残しておく必要がある。そのため、プログラムによって解析しやすい形でログを出力する。 今までは複数の種類のレコードをシリアライズし、一つのファイルに連結させる独自のフォーマットを10年近く使っていたが、書いていて恥ずかしくなるような多数の問題を抱えていた。
- 柔軟性が乏しい: 32bit整数や文字列などの単純な値しか格納できず、例えばレコードを含むレコードなどを表現できない。その結果、複雑なデータは一旦文字列に変換するような運用がされており、そのプリティプリンタやパーサは十分にテストされていない。
- コードがまとまっていない: シリアライザとデシリアライザが非対称的に実装されており、メンテナンスが難しい。どちらも取引関連アプリケーションのための特別な処理がハードコードされている。
- シークできない: n番目の要素に素早くアクセスする方法、ましてやタイムスタンプなどを元にシークする方法がない。
- 読み込みが遅い: 書き込みはチューニングされているが、読み込み処理が非常に遅い。さらに実際に使うAPIはFRPベースなのでさらにオーバーヘッドがある。
- 配信に適さない: 帯域幅の理由でログの全てをオフィスに送ることはできないため、あらかじめダウンサンプリングする必要がある。しかし上述の問題からログからログへの変換としては実装されておらず、オフィスでログを再構築するための配信専用プロトコルが実装されている。結果としてコードが非常に冗長になっているほか、本来のログとの互換性も不完全であり、アプリケーションの振る舞いも一貫性がない。
Kafkaの誘惑
Apache Kafkaでシークの非効率性を解決できるのではという提案があり、本格的に取り組み始めた。Kafkaは、文字列のリストを永続化し、任意の要素を高速に取り出せるようなサーバーを提供する。例の独自フォーマットの断片をKafkaのペイロードにするという仕組みで、私は独自フォーマットを使い続けるのには反対だったが、プロジェクトは進行した。現在では以下のようになっている。
しかし、Kafkaにも問題点があった。
- 異常終了するとインデックスが壊れ、再起動に非常に長い(トピック数に比例した)時間がかかる。取引プログラムがログを送信する相手としては致命的だ。
- 要素のインデックスや、Kafkaブローカー(サーバー)のタイムスタンプによるアクセスはできるが、自分で決めたタイムスタンプは使えない。そのため、検索には二分探索や割線法のために複数回の送受信が必要になり、クライアントのレイテンシが大きいと極めて効率が悪い。
醸造家と音楽家
データ表現の柔軟性、実装の対称性の課題は、wineryというライブラリによって解決した。Haskell上の表現からディスク上の表現を簡単に導出できるため、ソースコードの量を大幅に削減でき、バグも発生しにくい。
残りの問題は、以下の要件を満たす新しいシステムによって実現すべきという結論に至った。
- 書き込みにはサーバーが不要で、サーバーは読み出しのみを行う。万が一サーバーがダウンしても取引プログラムに影響しない。
- サーバーが新たなアイテムを検出し次第、クライアントに送信するようなクエリを表現できる。
- システムもそのAPIも、チームがメンテナンスできる言語(Haskell)で実装されている。
- ブロックするようなクエリは、好きなタイミングで中断できる。
- 連番の他、任意のタイムスタンプによって望んだ要素にアクセスできる。
私はLisztという新たなコンテナフォーマットを開発した。LisztはCouchDBのモデルをただのリストのマップに簡略化したような設計で、フッターにポインタを並べることで木構造を表現する。一つのファイルのみを扱うので理論上の効率と信頼性は高いが、フォーマットの複雑さを理由にチームの合意は得られず没となった。
代わりに、Kafkaと同様、ペイロードのみを連結したファイル(ペイロードファイル)と、64ビット整数で表されるペイロードのポインタのみを連結したファイル(インデックスファイル)を二つ作るのがFranzだ(ファイルシステムの実装を考えれば、本質的にはLisztとやっていることは変わらないとも言える)。サーバー側はinotifyなどを用いてインデックスファイルを監視し、sendfileでペイロードをクライアントに送る。ペイロードが連結されているため、sendfileでまとめて効率よく送れるのは一つの利点だ。
各要件を満たすため、決して高度なものではないが様々な技法が用いられている。
ビルダーの本気
HaskellでIOというとByteStringが定番ではあるが、ここではfast-builderのビルダーを採用した。ビルダーはByteStringに効率よく変換できるモノイドとしてよく知られた構造だが、ByteStringを介さずにファイルなどに書き込むこともできる。
-- Data.ByteString.FastBuilder byteString :: ByteString -> Builder word8 :: Word8 -> Builder word64LE :: Word64 -> Builder toStrictByteString :: Builder -> ByteString hPutBuilder :: Handle -> Builder -> IO ()
ByteStringにするまで出力される文字列の長さがわからないのが唯一の欠点だったが、書き込んだ文字列の長さを返すhPutBuilderLen :: Handle -> Builder -> IO Int
を追加したため、この問題も解消された。
つまり、要素の追加は「ペイロードファイルにhPutBuilderLen
でバイト列を書き込み、その結果を用いてインデックスファイルにペイロードのオフセットを書き込む」という極めて単純な処理である。要素を1つ追加するたびにwrite(2)をそれぞれ呼ぶのは非効率的なので、可能な限りHandle内部のバッファを活用したいが、ペイロードよりも先にインデックスファイルが書き込まれてしまうとおかしなことになる。もちろんインデックスファイルのバッファは明示的に実装しており、安心してナイスバルクインサートできる。
なお、openFile
を使うと、GHCはファイルデスクリプタをノンブロッキングモードとして作成する。これは内部でunsafeなforeign callを使うため、hFlush
などの実行中に他のスレッドがGCを要求するとプログラム全体がストップする危険性がある。それを避けるため、openFileBlockingを代わりに使っている。
色々なテクニックを盛り込んだが、基本のインターフェイスは3つの関数にうまくまとめることができた。謎のfパラメータについては後述する。
-- Database.Franz withWriter :: Foldable f => f String -> FilePath -> (WriterHandle f -> IO r) -> IO r write :: WriterHandle f -> f Int64 -> Builder -> IO Int flush :: WriterHandle f -> IO ()
n種類のタイムスタンプ
稀なユースケースかもしれないが、本体のタイムスタンプと、取引所から送られてきたタイムスタンプの両方を記録したい。特に過去のデータでシミュレーションする際は両者は大きく異なる。そこで、シークなどのために任意の個数の値を付随させられるようにしてある——それが型パラメータf
だ。
data Timestamps a = Timestamps a a deriving Foldable
あらかじめ上のようなデータ型を定義しておき、withWriter (Timestamps "MarketTime" "LocalTime")
のように名前を指定する。write
には具体的な値をTimestamps Int64
型で与えればよい。これらが不要な場合、Data.Proxy
のProxy
を渡せばOKだ(忘れがちだが、MonadやTraversableなどのインスタンスがついた0要素のリストとして使える)。
継続は力なり
リアルタイムでメッセージを配信する以上、クライアントは最新の要素が来るまで待つ必要がある。プログラミングにおいて、ブロックする、つまり何かができるまで待つような振る舞いはコントロールが難しくなりがちだが、franzは高い柔軟性を持つ革新的なアプローチを採用した––継続とSTMのコンボである。
awaitResponse :: STM Response -> STM Contents data Contents data Item = Item { seqNo :: !Int indices :: !(Vector Int64) payload :: !ByteString } toList :: Contents -> [Item] fetch :: Connection -> Query -- ^ リクエスト -> (STM Response -> IO r) -- ^ 「レスポンスを受け取るトランザクション」を受け取る継続 -> IO r
以下のコードは、stm-delayを利用してタイムアウトを実現する。動きを細かく追ってみよう。
fetch conn q $ \t -> do d <- newDelay 1000000 atomically $ Just <$> awaitResponse t <|> Nothing <$ waitDelay d
- まずサーバーにクエリqを送信する。
- 返信を待たずにトランザクションを作成し、継続に渡す。
newDelay 1000000
でディレイd
を作成する。waitDelay d
を呼ぶと、作成してから1秒経つまでブロックする。- 以下のどちらかが可能になるまで待つ。
awaitResponse
でレスポンスを受信し終えたら、Justに包んで返す。- 1秒経過したら、Nothingを返す。
- サーバーに処理が完了した旨を通知する。
- クエリは破棄され、仮にレスポンスが既に送られたとしてもクライアントはそれを捨てる。
継続に与えたトランザクションt
は、atomically
によって実行して初めて結果を待つ。STM
であるがゆえに、タイムアウトなどの理由で待つのを諦めるような振る舞いを合成できるというわけだ。継続が終了すれば、その旨もサーバーに送信されるため、待機処理がサーバーに溜まることもない。ContTなどで全体を合成すれば一度にたくさんのリクエストを送ることもでき、当然Traversable APIのようなテクニックも使える。このコンボはブロッキングAPIの新定番としてのポテンシャルを秘めていると考えている。
そこまで柔軟性が要求されない場合のために、タイムアウトのみを指定するクラシックなAPIも用意した。クエリの具体的な構造も以下に示す。
fetchSimple :: Connection -> Int -- ^ timeout in microseconds -> Query -> IO Contents data ItemRef = BySeqNum !Int -- ^ sequential number | ByIndex !B.ByteString !Int -- ^ index name and value data Query = Query { reqStream :: !B.ByteString , reqFrom :: !ItemRef -- ^ name of the index to search , reqTo :: !ItemRef -- ^ name of the index to search , reqType :: !RequestType } data RequestType = AllItems | LastItem
Connection
はwithConnection
関数を用いて作成する。
withConnection :: String -- ^ ホスト -> PortNumber -- ^ ポート -> ByteString -- ^ ストリームプレフィクス -> (Connection -> IO r) -> IO r
サーバーを立ち上げるのは簡単で、データが格納されているパスを指定すればよい。
franzd .
圧縮
書き込み終わった1日分のログをまとめて圧縮する手段としてSquashFSを採用した。指定されたプレフィクスと同名のイメージがある場合、サーバーはそれをFUSEでマウントするという機能がある。サーバーを起動する際、アーカイブを格納するパスをオプションとして指定することで利用できる。
franzd --live ./live --archive ./archive
適切なオプションなら高い圧縮率を実現できる一方、パフォーマンスも意外に良好で、むしろ圧縮してある方が読み込みが速いのではないかと感じるほどだ。当初はあまり期待していなかったが、複数のファイルにまたがるフォーマットは取り回しが悪いという欠点をうまく克服できている。
今後の展開
ロギングの構成を完全に置き換える前段階として、分析用データの格納のために運用している。大きなバグもなく好感触だが、並行処理やIOをふんだんに使ったこの手のプログラムは細部の動きが怪しくなりがちだ。どんな状況でもきっちり動くように煮詰めていきたい。