【技術本】データ指向アプリケーションデザイン
https://amzn.to/4eUWe6e
【技術本】データ指向アプリケーションデザイン
へポスト
今日の多くのアプリケーションは、演算指向でなく、データ指向になっている。
つまり、CPU能力がボトルネックになることはなく、データ量、データの複雑さ、システムがデータの変化に対応する柔軟性や速度が問題となっている。
それを踏まえ、信頼性、スケーラビリティ、メンテナンス性に優れたアプリケーションの設計構築手法を、データを起因とした関連性、相互作用から全体を組み立てていく方法論で記したのが本書である。
上述した信頼性、スケーラビリティ、メンテナンス性においては、データ量や処理量に対応するために分散処理を導入すると、急激に難易度が上昇する。その対策方法をデータシステムの基礎(クエリ:RDB,NoSQL)、データベースストレージの実装、他システムと連携する場合のエンコーディングから技術を解説した上で、分散データ処理の技術(レプリケーション、シャーディング、2相コミット)などの技術と現状の問題点と解決策を示し、最終的には導出データ(バッチ処理データ、ストリーム処理データ)の技術に関する問題と現状の解決策まで言及してくれている。
翻訳に関しては、元々の英文が技術文章というよりは、若干蛇足が多い論述調のものだったせいか、文章がやや冗長に感じられた。
しかし、書かれている内容は近年のデファクトで高度な技術を抑えているので、スケールを考慮した分散処理環境の設計開発に関わっている技術者に価値が出てくる著作であると感じた。
以下、内容メモ(抜粋)
-----------------------
第一部:データシステムの基礎
1章:信頼性、スケーラビリティ、メンテナンス性に優れたアプリケーション
今日の多くのアプリケーションは、演算指向でなく、データ指向。
CPUの処理能力が制約になることはほぼない。通常は、データ量、複雑さ、データの変化する速度が問題。
1.3 スケーラビリティ
1.3.1 負荷の表現
負荷パラメータ:毎秒リクエスト数、読み書き比率、同時アクティブユーザ、ヒット率
Twitter事例
フォロワーのタイムラインへのツイートの反映
旧来:テーブルをjoinして、該当フォローしているユーザのツイートを検索して表示
改善:フォロワーの各ユーザのタイムラインのキャッシュにツイートをポストする
ユーザフォロワーは最大3000万人のため、一回のポストで最大3000万回の書き込みが起こる仕様であるが、平均ツイートポスト数は、ホームタイムライン読み込みと比較して2桁低いから成立する
最終的に、この2つのアプローチのハイブリッドとなる実装に移行。
一般的なユーザのツイートは、ポストされるとタイムラインのキャッシュにファンアウト
大量フォロワーを持つユーザのツイートは、旧来の処理で個別に取得され、ホームタイムラインでマージ
第2部:分散データ
8章;分散システムの問題
8.1.1 クラウドコンピューティングとスーパーコンピューティング
スーパーコンピュータ:間違いがあればクラスタのワークロード全体を止める>どちらかというと単一ノードのコンピュータに近い
インターネット上でオンラインで動作する分散システム
低レイテンシが求められる>地理的分散の活用
部分障害の可能性を受け入れソフトウェアにフォールトトレランスの仕組みを組みこんでおく
※信頼の低い基盤の上に信頼の高いシステムを構築する概念:例:IPでTCPを利用する
8.2 信頼性の低いネットワーク
インターネット、データセンター内部ネットワークの多くは、非同期パケットネットワーク
ネットワークケーブルの不備、過負荷状態、クラッシュ、電源落ち、リクエストやレスポンスの消失、遅延 >通常これらの問題にはタイムアウト対処。
8.2.1 ネットワークフォールトの実際
データセンタ:障害は一般的なもの
EC2などのパブリッククラウド:一時的なネットワーク不調は頻繁に生じる
スイッチのアップグレードによる遅延
8.2.2 フォールトの検出
対処
・ロードバランサ障害:ローテーションから外す
・シングルリーダレプリケーション:スレーブをマスタに昇格
フォールト検知
・TCP:接続ポートのサービスが落ちている場合はRST, FINで返信してくれる
・ICMP:Destination Unreachable
8.2.3 タイムアウトと限度のない遅延
タイムアウト長い:障害判定が遅れてしまう
タイムアウト短い:回復するネットワーク負荷などを障害と誤判定
障害判定し、データ処理を移行する際、すでに全体が高負荷の場合、さらに悪化する懸念
タイムアウト:すべてのパケットは一定時間d以内に配送されるか、ロストされる。ノードはr時間以内にリクエスト処理する>2d+rが妥当なタイムアウト。しかし、非同期のネットワークには遅延には上限がない
8.2.3.1 ネットワークの輻輳とキューイング
ネットワークのパケット遅延の変動はキューイングによって生じる場合がほとんど
スイッチのキューイング:混雑している場合はパケットスロットを得るまで待つ必要あり
>パケット再送が必要になる
ノードに到達してもノードのcpuがビジーだったら、OSによってパケットはキューイングされる
仮想環境でも、CPU利用時間がわりあてられるまでキューイングで待たされる
TCPフロー制御(輻輳回避:バックプレッシャー):ネットワークに入る前に送信側でキューイングされる
TCPのタイムアウト判定による再送による遅延
UDPは信頼性と遅延の変動制のトレードオフ:ただしスイッチのキューやスケジューリング遅延の影響は受ける
バッチ型のワークロードは容易にネットワークリンクを溢れさせてしまう>ネットワーク遅延の引き起こし
8.2.4 同期ネットワークと非同期ネットワーク
電話回線は同期ネットワーク
回路:経路が電話をかけると確立し、利用帯域が確保される
キューイングの影響を受けないのでレイテンシも一定
8.2.4.1 ネットワーク遅延の予測はできないのか
データ属性の違い
・電話:一定ビット/sの転送が必要になる
・メール、Web:送信が早く終わりさえすれば良い:バースト性トラフィックに対
ネットワーク遅延は、単純にコストとメリットをトレードオフした結果。
8.3 信頼性の低いクロック
コンピューナ内の推奨振動機は正確でなく、変動する:進みすぎたり遅くなったり
タイムスタンプによって、複数レプリへの書き込み>各ノードの時刻差によって弊害が起きる場合もある
論理クロック:インクリメントによるカウンタに基づくもの>イベントの順序づけに関しては安全性が高い選択
8.3.4 プロセスの一時停止
ガベージコレクション、サスペンド、レジューム、ライブマイグレーション、他のスレッドへのコンテキストスイッチ、スレッドが低速なI/O操作完了まで一時停止、ページフォールト処理
実行中のスレッドを任意の時点でプリエンプション(一時的に中断)し、再開される。しかし、そのスレッドはそうなったことに気づかない
単一マシンではスレッドセーフの技術として、ミューテックス、セマフォ、アトミックなカウンタ・・・などがあるが、分散システムでは共有メモリがなく、信頼できないネットワークを通じてメッセージを送信するため利用できない。
8.3.4.1 レスポンスタイムの保証
クリティカルな問題事例
・エアバッグ解放システムがGCによって運悪く停止したため、処理が無視or遅れる
>リアルタイムペレーティングシステムでは、指定された間隔でCPU時間の割り当て保証を伴うプロセスのスケジューリングが必要
GCなどを一時的に事前に計画された短時間のノードの停止のように扱い、GC停止中は、他のノードがリクエスト処理する方法もある。
8.4.1 真実は多数決で決定される
多くの分散アルゴリズムはクォラム(quorum)で、ノード障害を判定する。ノード群によって行われる投票。想定外の停止時間から復帰したノードが、正常クラスタに悪影響を与えるのを防ぐ。
8.4.1.1 リーダとロック
分散システムで実装するには注意が必要
一つのデータパーティションで、リーダをひとつにする
オブジェクトのロックできるトランザクション、あるいはクライアントを一つにする
>フェンシングトークン:ロックやリースを渡すたびにインクリメントされるもので、古いトークンを判別し拒否するようにする
>万が一悪意あるトークンを返すシステムの場合はなりたたない=ビザンチン障害:ノードが嘘のレスポンスを返すケース
ユーザIDをユニークで登録できるようにする
※HBaseにかつてあったバグ
ロックを取得したクライアントがGCで一時停止して、ロックのリース期限切れになる。他のクライアントがロックを取得して、ストレージを更新。GC復帰したクライアントがロックリース切れを知らずに書き込みしてデータ破壊。
9章:一貫性と合意
9.1 一貫性の保証
レプリケーションラグは、どのようなトポロジでも発生する。
結果整合性:ほとんどのレプリケーションで保証される
>データベースの書き込みがやめば、ある程度待てば最終的に全ての読み込みリクエストに対して同じ値が返されるようになること
9.2 線形化可能性
レプリカが複数あっても、一つしかないようにみせてくれる
原始的一貫性、即時一貫性、外部一貫性ともいう
読みとられる値は常に最新の値:最新性の保証
直列化可能性との違い
・直列化可能性:複数のオブジェクト(行、ドキュメント、レコード)を読み書きできること。複数のトランザクションが何らかの順序で実行された場合、同じ結果になる。この順序は、トランザクションが実際に実行された順序と異なっても良い。
・線形化可能性:レジスタの書き込みにおける最新性の保証(個々のオブジェクトに関すること)複数操作をトランザクションにまとめるようなものではない。
データベースは、直列化可能性と線形化可能性をともに提供できる。>双方の組み合わせ:厳密な直列化可能性(strict serializability)という
>厳密な直列化可能性>例:2相ロックや完全な順次実行
スナップショット分離は線形化可能ではない
9.2.2 線形化可能性の依存
9.2.2.1 ロックとリーダ選出
単一リーダレプリケーションはマスタが一台であることを保証する必要あり
ロックで制御:ロックの実装には線形化可能性が保証される必要あり
>分散ロック>Zookeeperなどの協調サービスが利用される
9.2.2.2 制約およびユニーク性の保証
ユーザ名、アドレスなどの登録のユニーク性:同時登録時に片方をエラーにする:線形化可能性が求められる
他にも、銀行残高、倉庫の在庫数、並行した席の予約など
9.2.2.3 クロスチャネルタイミング依存
2つの通信チャネルの処理の片方遅延による同期ズレ防止:線形可能性必要
9.2.3 線形可能なシステムの実装
シングルリーダレプリケーション:潜在的に線形化可能
合意アルゴリズム:線形化可能
マルチリーダレプリケーション:線形化可能でない
リーダレスレプリケーション:おそらく線形化可能でない
9.2.3.1 線形化可能とクオラム
Dynamoスタイルのモデル
ネットワーク遅延でレース問題は起きる
Dynamoスタイルのレプリケーションを行うリーダレスなシステムは線形化可能性を提供しない
9.2.4 線形化可能にすることによるコスト
線形化可能なレプリケーションは、他のレプケーションとネットワーク切り離されたら、切り離されたレプリカは利用できなくなる。
線形化が必須でない場合は、ネットワークから切り離されたレプリカは独立して処理できるが、線形化は可能でない。
線形化可能を持っているシステムは少数
現代的なマルチコアCPUのRAMも線形化可能ではない
CPUコアが個別にメモリキャッシュとストアバッファを持っているため
最終的にはパフォーマンスとトレードオフとなる
9.3 順序の保証
全順序:必ず2つの要素の大小を判断し順序づける
線形可能システムでは、操作には全順序がある。並行操作は存在しない。
線形化可能なシステムはパフォーマンスや可用性が損なわれる:ネットワーク遅延が大きいシステムでは問題
9.3.2 シーケンス番号の順序
シーケンス番号、タイムスタンプ:全順序を提供する
複数ノードでシーケンス番号発行:偶数と奇数分ける、番号範囲を分ける
9.3.2.2 ランポートタイムスタンプ
(Counter, Node ID)によるシーケンス。Counterで値比較で大きい方を選択。値が同じなら、Node IDが大きい方を選択する。
9.3.2.3 タイムスタンプの順序づけだけでは不十分
タイムスタンプが一番先に取得されたものだと確認するには、全ての操作(全てのノード)を収集して確認する必要がある。
9.3.3.1 全順序のブロードキャストの利用
Zookeeper, etcdcなどの合意サービスでは実装されている
全順序で全てのレプリカが書き込まれる:ステートマシンレプリケーションの原則
9.3.3.2 全順序ブロードキャストを利用した線形可能なストレージの実装
1.メッセージをログに追記し、取得したいユーザ名を記す
2.ログを読み取り、自分が追記したメッセージが配信されてくるのを待つ
3.取得したいユーザ名を要求しているメッセージがないか調べる。取得したユーザ名に関する最初のメッセージが自分のメッセージであれば取得成功。要求したユーザ名をコミットして、承認。他のユーザからのメッセージであれば操作中断
書き込みのみ線形化可能、読みよりは保証できない。
この手続きが提供するのは逐次一貫性(タイムライン一貫性)
9.3.3.3 線形化可能なストレージを使った全順序ブロードキャストの実装
線形化可能なcompare-and-setレジスタと全順序ブロードキャストはどちらも合意と等価
9.4 分散トランザクションと合意
リーダ選出:障害時でもスプリットブレインなどが起きない
アトミックなコミット:全てのノードでトランザクションが成功すること
9.4.1 アトミックなコミットと2相コミット(2PC)
2相コミット:複数ノードにまたがるアトミックなトランザクションを実現するアルゴリズム
新たなコンポーネントとして、コーディネータ(調停者)が登場する。
>参加ノードが更新可否にyesとコーディネータに返答したのち、コーディネータがダウンすると、参加ノードは更新待ちの状態で行き詰まる
9.4.1.5 スリーフェーズコミット
デメリット
ネットワーク遅延に上限がある
ノードのレスポンスタイムに上限がある
9.4.2 分散トランザクションの実際
・データベース内部の分散トランザクション
参加しているトランザクションは、同じデータベースソフトウェア
・ヘテロジニアスな分散トランザクション
2つ以上の異なるデータベース技術からなるトランザクション。
二相コミット:信用性高い、しかしパフォーマンスそこなうのがデメリット
XAトランザクション(eXtended Architecture):2相コミット標準.(JTA:Java Transaction APIで実装)>JEBC, JMSでサポート
9.4.2.3 未確定様態のロック保持
ノード障害:タイムアウト期限や、管理者の手作業で状況を解決するまでロック保持される懸念
9.4.2.4 コーディネータ障害からのリカバリ
孤立してしまった未確定トランザクションが生じる懸念
>XAではヒューリスティックな判断と呼ばれる緊急処理がある。
9.4.3 耐障害性を持つ合意
一様同意:異なる決定をしていない
整合性:複数決定しているノードがないこと
妥当性:決定内容と提案の一致
終了性:クラッシュしていないノードは最終的に何らかの値を決定する
9.4.4 メンバーシップと協調サービス
ZooKeeper:協調サービス
線形化可能でアトミックな操作
操作の全順序:フェインシングトークン
障害検出:エフェメラルドノード
変更通知:ロックおよび値の読み取りと、変化の監視もできる
9.4.4.1 ノードの処理の割り当て
パーティショニングされたリソースのノード構成変更による割り当て要請:ZooKeeperのアトミック操作、エフェメラエルノード、通知を使うことで達成
9.4.4.2 サービスディスカバリ
ZooKeeper, etcd, Consulはサービスディスカばりにもしばしば用いられる
まとめ:
システム中のイベント順序(原因と結果に基づく出来事の順序関係=因果)を求める場合、因果は線形化可能性ほど調整位オーバヘッドを求めないが、タイムスタンプなどでの実装はできない。>合意の仕組みが必要。
合意には以下の問題がある
・線形化可能なcompare-and-setレジスタ:与えられたパラメータが等しいかによってアトミックに値を設定するかどうか決定しなければならない
・アトミックなトランザクションのコミット:DBは分散トランザクションをコミットか中断するかを決定しなければならない
・全順序ブロードキャスト:メッセージシステムは、メッセージ配信順序を決定しなければならない。
・ロックとリース:複数クライアントから一斉にロック、リースを取得要求があった場合、ロックはどのクライアントに成功させるか決定する必要がある
・メンバーシップ・協調サービス:どのノードが生きていて、どのノードがタイムアウトで落ちているか決定する必要がある。
・ユニーク制約:複数トランザクションで同じキーで衝突するレコードを作成しようとする場合、どのコレードを許可し、どれを制約違反するか決定する必要がある
これらの仕組みでリーダーに障害、ネットワーク障害でリーダに調達不可能の対処
・リーダの回復を待ち、それまでシステムがブロックされてしまうのを許容
・手動フェイルオーバでリーダの選定と設定を手動実施
・アルゴリズムで自動的にリーダを決定する
合意へ落とし込めるような処理が必要で耐障害性をもとめるなら、zookeeperのようなものを使うことを推奨
第3部:導出データ
III.1 記録のシステムと導出データ
導出データ:システムで既存のデータを変換したり処理した結果のもの
導出データは失っても、オリジナルのソースから再生成できる
キャッシュに利用される
非正規化されて活用される
10章:バッチ処理
システムのタイプ
・サービス:オンラインシステム:
・バッチ処理システム:オフラインシステム:10章
・ストリーム処理システム:準リアルタイムシステム:11章
GNU Coreutils(Linux) sort:メモリより大きいデータセットを自動的にディスクに書き出して複数CPUコアで自動的にソートを並列化>メモリを使い果たすことなくスケール化>ボトルネックは入力ファイルをディスクから読み込む速度
※ awk, sed, grep, sort, uniq, xargsをいくつか組み合わせるだけで多くのデータ分析が数分でできていまいパフォーマンスも優れている
10.1.2 Unixの哲学
・各プログラムは一つのことだけうまくこなすようにする
・各プログラムの出力は未知のプログラムの入力になることを想定
・ソフトウェアは数週間で設計し構築する。OSも例外でない
・プログラミングの手間を減らすのに、単純作業でなくツールを使う、たとえ使い捨てでも
10.2 MapReduceと分散ファイルシステム
10.2.1 MapReduceジョブの実行:Hadoop
Mapper:入力レコードに対して一度ずつ呼び出され、入力レコードからキーと値を取り出す。mapperは一つの入力レコードから次の入力レコードへの過程で状態を一切保存しないので、各レコードは独立して処理
Reducer:mapperによって生成されたキーと値ペアから同じキーに属する全ての値を収集し、その値のコレクションに対するイテレータとともにreducerを呼び出します。reducerは、出力レコードを生成
ユーザが並列化のためのコードを書くことなく、分散並列化できる:図10-1 P440
10.2.1.2 MapReduceのワークフロー
単一のMapReduceジョブで解決できる問題の範囲は限られている
例:URLごとのページビューを求めても、もっとも人気のあるurLは判断できない
>そのため2巡目のソートが必要。
従って、MapReduceジョブは連鎖させてワークフローにまとめられるのが一般的
10.2.2 Reduce側での結合とグループ化
MapRedueにはインデックスという概念はない
入力としてファイル集合が渡されると、MapReduceジョブはそれらのファイルの内容を全て読み取る:データベースでのフルテーブルスキャン
分析的なクエリでは大量のコレードに対する集計をするので処理として妥当
10.2.2.4 Group By
mapperが生成するキーと値のペアキーが、グループ化に求められるキーを使うようにmapperをセットアップすることで対応できる。>パーティショニングとソートのプロセスが、同じキーを持つ全てのレコードを同じreducerにまとめてくれる。
10.2.2.5 スキューの処理(ホットスポット)
大量のデータが一つのキーに関連づけられていると破綻する
例:SNSなどで有名フォロワー数百万人などが
不均衡なアクティブデータベースレコード>リンチピノブジェクト、ホットキーと呼ばれる
MapReduceのジョブは、全てのmapperとreducerの処理が終わって初めて完了となるので、後に続くジョブはもっと処理がかかっているreducerが終わらなければ処理を開始できない。
>対策:ホットキーを複数のreducerに分散させ並列度を高める
10.2.3 map側での結合
mapperの役割:入力データを準備
各入力レコードからキーと値を取り出し、キーと値のペアをreducerのパーティションに割り当てキーでソートする
入力データに対して特定の前提を置けるならmap側での結合(map-side join)と呼ばれる手法を使えば、結合を高速化できる:このアプローチは切り詰められたMapReduceジョブを使い、Reducerを持たずソートもしない。
10.2.3.1 ブロードキャストハッシュ結合
大規模なデータセットに小さなデータセットを結合する
小さなデータセットは各mapperのメモリ内に完全にロードできるサイズであること
10.2.3.2 パーティション化ハッシュ結合
map側での結合への入力が同じ方法でパーティショニングされているのであれば、ハッシュのアプローチは各パーティションに対して独立に適用できます。
10.2.3.3 map側マージ結合
mapreduceジョブが同じキーに基づいてソートしているケース:mapperは通常reducerが行うマージ処理を実行できるので、入力がメモリに収まらなくても実行できる。
10.2.3.4 map側での結合を行うMapReduceのワークフロー
使われる結合がmap側かreducer側かによって出力構造が影響される。
reducer側:結合のキーによってパーティション化されソート
map側:大きい方の入力と同じようにパーティションかされソート
10.2.4 バッチワークフローの出力
10.2.4.1 検索インデックスの構築
mapperはドキュメントの集合を必要に応じてパーティショニングし、各reducerは自身が受け持つパーティショニングのインデックスを構築し、インでっk数ファイルが分散ファイルシステムに書き出される
このインデックスの構築は並列化できる。
インデックス付けされたドキュメントに変更があった場合は、定期的に全体のインデック付のワークフローをドキュメントの集合全体に対して再実行し、新しいインデッックスで置き換えるのが一つの選択肢>あるいはインデックスをインクリメンタルに構築:11章
10.2.4.2 バッチ処理の出力としてのキー・バリューストア
Hadoop処理で直接、サービスの本番DBを参照するのはよくない理由
ネットワーク負荷
本番DB負荷
ジョブの出力に対してオールオアナッシングを保証:ジョブの途中で書き込みが起こるとその書き込みは外部に見えてしまう副作用が出る
>新しいDBを作成し、それを参照するようにする
MApReduceジョブ中でのデータベースファイルの構築は様々なキー・バリューストアがサポート
10.2.4.3 バッチ処理の出力における哲学
入力は変化させることなく、副作用(DB書き出しなど)を避けることによってバッチジョブのパフォーマンス性、メンテナンス性を高める
10.2.5 Hadoopと分散データベースとの比較
大規模並列処理(MPP):データベースでHadoop以前に実装されてたもの
MPPデータベースは複数マシンからなるクラスタ上での分析的なSQLクエリの並列実行に焦点
MapReduce+分散ファイルシステムは、任意のプログラムを実行できる汎用的なOSのようなものを提供
10.2.5.1 ストレージの多様性
mppデータベース:注意深いスキーマ設計でデータを加工
実際には、データに癖があって使いにくい
スキーマ設計は、集中化されたデータのコレクションの速度を低下させる
Hadoop:無差別に生のデータを書き、必要に応じて後から処理できるようにする
10.2.5.2 処理モデルの多様性
データベース:SQL>全てのデータに対して万能でない>機械学習とか不得意
MapReduce:エンジニアが自作したコードを大規模なデータセットに対して容易に実行できる
>それを基盤とし、SQLクエリエンジンの構築が可能
後にはMapReduceでも限定が強いとなり他の処理モデルがHadoopで開発>10.3
10.2.5.3 頻繁な障害に備えた設計
map,reduceタスクに障害があっても、ここのタスク粒度で処理の再トライができる。 MapReduceが頻繁に生じる予想外のタスク終了に設計された理由
データセンタで高優先度のプロセスの空きをぬって実行できるよう想定
常に高優先度プロセス優先のため、終了させられるリスクがあった
10.3 MapReduceを超えて
10.3.1 中間的な状態の実体化
MapReduce:ジョブ毎に中間ファイルをつくる
>一つのジョブ完了まで次のジョブが待たされる
Mapperは冗長な部分もある:reducerが書き出したファイルを読み出し、パーティショニングとソートという次のステージに備えるだけ
>reducerの一部にもできる
中間ファイルが複数ノードにレプリケーションされる:過剰である
10.3.1.1 データフローエンジン
MapReduceの上述の問題を解決するため、分散バッチコンピューティングの実行エンジンが新たに開発された
>Spark, TEz, Flinkなど
ワークフローを独立したジョブに分割せず全体を一つのジョブとする
>データフローエンジンと呼ばれる
メリット
・ソートを必要な時のみ行う
・mapperの処理を先行するreducerに取り入れることができる:不要mapタスクがなくなる
・ワークフロー内のデータ結合依存関係が宣言されるので受け渡しが効率化
・中間状態はメモリ保持かローカルディスク書き込みで済む
・先行するステージ全体が終わるのを待たず次のステージを開始できる
・JVMプロセス再利用ができる:MapReduceは常に新規起動
10.3.1.2 耐障害性
Spark, Flink, Texは中間状態をHDFSに書き込まないので別のアプローチ
あるマシンの中間状態が失われたら、利用できる以前の中間データやHDFSのオリジナルデータから再計算される
>現実的には中間データは再計算より実体化しておいた方が低コスト
10.3.3 高レベルAPIと様々な言語
手作業でMapReduceジョブをプログラミングするのは労力大
高レベルの言語やAPIが広く使われるようになる
>Hive Pig, Cascading, Crunch
10.3.3.1 宣言的なクエリ言語への移行
Hive, Spark, Flink はコストベースのクエリオプティマイザを持つ
>中間状態の量を最小化するように結合の順序を入れ替えることが可能
多重ループ内の小さな内部ループでデータを繰り返し処理してCPUキャッシュを有効活用し関数呼び出しをしない
任意のコードを実行し、任意のフォーマットのデータを読めるという柔軟性を維持
まとめ
分散バッチ処理エンジンは、あえて制限のあるプログラミングモデルを持つ
分散システムの難しい問題を抽象化の裏側に隠せる。そのため、ネットワーク障害が生じた場合も、タスクをリトライしても安全で、失敗タスクの出力は破棄される。
フレームワークのおかげで、耐障害性の仕組みの実装も気にする必要はなし。
こういった仕組みは、データベース書き込みを行うオンラインサービスが持っているセマンティクスより強力。
11章:ストリーム処理
10章では、入力データが有限なものを想定
11章のストリームは入力は無限を想定:いつの段階で処理を始めるのか
Stream: Unixのstdin, stdout, JavaのFileInputStream, TCP接続,ビデオ配信
11.1 イベントストリームの転送
ストリーム処理の文脈では、一般にレコードはイベントと呼ばれる
小さく自己完結しているイミュータブルなオブジェクト
ある時点で生じたことの詳細が含まれている
通常のイベントには、発生時刻のタイムスタンプなどが含まれる
プロデューサー:パブリッシャ、あるいは送信者
コンシューマー:サブスクライバあるいは受信者
ファイルシステムでは、ファイル名によって一連のレコードが識別
ストリーミングでは、関連するイベント群はトピック、ストリームとしてグループ化
ストリーミングでは、データストア利用のやりかたではポーリング(競合回避や同期)の負荷が高くなる
>新しいイベントが現れた時にコンシューマに通知する方式が良い
11.1.1 メッセージングシステム
新たしいイベントをコンシューマに通知する>メッセージングシステム
プロデューサー送信速度がコンシューマー処理速度より速い場合の対処
>メッセージドロップ、キューイングでバッファする、back pressureを適用
メッセージが失われるのを許容するかはアプリケーションに依存
>受け取る情報によっては信頼性が問題視される
11.1.1.1 プロデューサーからコンシューマへの直接のメッセージング
株式フィードのストリーム;UDPマルチキャスト
コンシューマがネットワーク上でサービスを公開している場合
HTTP,RPCなどでリクエストを発行してコニューマにメッセージをプッシュ
webhooksの背景にある考え方:サービスのコールバックURLを他のサービスに登録し、イベントが発生した時にそのURLへのリクエストを発行す
これらは、メッセージ失われる可能性をアプリのコードが意識する必要あり
>コンシューマオフライン中に配信見逃し、プロヂューサクラッシュでデータ消失
11.1.1.2 メッセージブローカー
メッセージキューとも呼ばれる
メッセージストリームに最適化された一種のデータベース
メッセージブローカはサーバとして動作:プロデューサやコンシューマがクライアントとして接続
プロデューサー>ブローカー>コンシューマ:メッセージの流れ
永続性の問題をブローカーに移す
キューイングの結果、概してコンシューマは非同期になる
プロデューサはブローカによるメッセージのバッファリングだけ持ちコンシューマによるメッセージの処理は待たない
11.1.1.3 メッセージブローカとデータベースの比較
ブローカ:XA,JTAを使って2層コミットプロトコルに参加できるのもある
配信に成功した時点でデータは削除する:保存ストレージとしては適す
バッファデータが大きくなるとディスク書き込みが必須に>パフォーマンス影響
データに変化があればクライアントに通知
11.1.1.4 複数のコンシューマ
ロードバランシング:どれか一つのコンシューマに配信、送信内容を分業できる
ファンアウト:全てのメッセージを全てのコンシューマに配信、各コンシューマが独立して影響し合うことなく処理できる
11.1.1.5 承認と再配信
メッセージが失われないことを保証するため、メッセージブローカは承認を利用
クライアントは処理を終えたことを明示的に伝える>ブローカがキューからメッセージを削除できるようにする
ロードバランシングと再配信が組み合わさると、コンシューマのクラッシュでメッセージが別のコンシューマに再配信され、メッセージの順番が異なってしまうケースがある
11.1.2 パーティション化されたログ
11.1.2.1 メッセージストレージへのログの利用
高いスループットへスケールするためにログをパーティション化:パーティションごとに専任のブローカがいる構造
プロデューサはトピックのパーティションのファイルに追記することでメッセージを送信し、コンシューマは、それらのファイルを担当パーティション毎にシーケンシャルの読み込む:担当パーティションのオフセットをコンシューマは持っている
11.1.2.4 ディスク領域の利用
循環バッファ(リングバッファ)としてディスクを利用
コンシューマがプロデューサの処理の追いつけない場合の対処
11.1.2.6 古いメッセージのリプレイ
承認ベースのメッセージ処理:ブローカーからのメッセージ削除をする
ログベースのメッセーヒ処理:ファイル良い鳥のみのRead Only
ログベースは、メッセージのオフセットはコンシューマ側が制御しているので、新たなコンシューマを定義して古いメッセージのオフセットから開始することも可能
11.2 データベースとストリーム
11.2.1 システムの同期の保持
データベース:データベース中で更新されたら、キャッシュ、検索インデックス、データウェアハウスの中でも更新されなければならない
データベースと検索インデックスの更新(2重書き込み)には並列処理によるレース(競合)問題がある
11.2.2 変更データのキャプチャ
変更データキャプチャ(CDC);データベースに書かれた全てのデータの変更を観察し、それを他のシステムへレプリケーション出来る形態で取り出すプロセス
変更ログが同じ順序で適用されるなら、検索インデックスへの反映においても、データベースと一致することが期待できる。
11.2.2.3 ログのコンパクション
Apache Kafkaがサポート
11.2.3 イベントソーシング
アプリケーションの状態に加えられた全ての変更を変更イベントのログとして保存。イベントログに書かれたイミュータブルなイベントとして構築される。
11.2.4 状態、ストリーム、イミュータビリティ
イミュータブル:作成後その状態を変えないオブジェクトのこと
11.3 ストリームの処理
ストリームを処理するコード:オペレータ、ジョブという
11.3.4 耐障害性
11.3.4.1 マイクロバッチ処理とチェックポイント処理
マイクロバッチ処理:ストリームを小さなブロックに分割。それぞれのブロックをミニチュアのバッチ処理のように扱う
バッチが小さいとスケジューリングと調整のオーバヘッドが大きいが、バッチを大きくすればストリーム処理の結果が見えるまでの遅延が長くなる。
11.3.4.3 冪等性(べきとう)
冪等な操作:複数回実行でき、その影響が一度だけ実行したのと同じになる
まとめ
ストリーム処理に現れる3種類の結合
・ストリーム・ストリーム結合
・ストリーム・テーブル結合:他方にデータベース変更ログが含まれる
・テーブル・テーブル結合:どちらもデータベースのchangelog>2つのテーブルが結合されたマテリアライズドビューに対する変更ストリーム
以上。
<今日の多くのアプリケーションは、演算指向でなく、データ指向になっている。
つまり、CPU能力がボトルネックになることはなく、データ量、データの複雑さ、システムがデータの変化に対応する柔軟性や速度が問題となっている。
それを踏まえ、信頼性、スケーラビリティ、メンテナンス性に優れたアプリケーションの設計構築手法を、データを起因とした関連性、相互作用から全体を組み立てていく方法論で記したのが本書である。
上述した信頼性、スケーラビリティ、メンテナンス性においては、データ量や処理量に対応するために分散処理を導入すると、急激に難易度が上昇する。その対策方法をデータシステムの基礎(クエリ:RDB,NoSQL)、データベースストレージの実装、他システムと連携する場合のエンコーディングから技術を解説した上で、分散データ処理の技術(レプリケーション、シャーディング、2相コミット)などの技術と現状の問題点と解決策を示し、最終的には導出データ(バッチ処理データ、ストリーム処理データ)の技術に関する問題と現状の解決策まで言及してくれている。
翻訳に関しては、元々の英文が技術文章というよりは、若干蛇足が多い論述調のものだったせいか、文章がやや冗長に感じられた。
しかし、書かれている内容は近年のデファクトで高度な技術を抑えているので、スケールを考慮した分散処理環境の設計開発に関わっている技術者に価値が出てくる著作であると感じた。
以下、内容メモ(抜粋)
-----------------------
第一部:データシステムの基礎
1章:信頼性、スケーラビリティ、メンテナンス性に優れたアプリケーション
今日の多くのアプリケーションは、演算指向でなく、データ指向。
CPUの処理能力が制約になることはほぼない。通常は、データ量、複雑さ、データの変化する速度が問題。
1.3 スケーラビリティ
1.3.1 負荷の表現
負荷パラメータ:毎秒リクエスト数、読み書き比率、同時アクティブユーザ、ヒット率
Twitter事例
フォロワーのタイムラインへのツイートの反映
旧来:テーブルをjoinして、該当フォローしているユーザのツイートを検索して表示
改善:フォロワーの各ユーザのタイムラインのキャッシュにツイートをポストする
ユーザフォロワーは最大3000万人のため、一回のポストで最大3000万回の書き込みが起こる仕様であるが、平均ツイートポスト数は、ホームタイムライン読み込みと比較して2桁低いから成立する
最終的に、この2つのアプローチのハイブリッドとなる実装に移行。
一般的なユーザのツイートは、ポストされるとタイムラインのキャッシュにファンアウト
大量フォロワーを持つユーザのツイートは、旧来の処理で個別に取得され、ホームタイムラインでマージ
第2部:分散データ
8章;分散システムの問題
8.1.1 クラウドコンピューティングとスーパーコンピューティング
スーパーコンピュータ:間違いがあればクラスタのワークロード全体を止める>どちらかというと単一ノードのコンピュータに近い
インターネット上でオンラインで動作する分散システム
低レイテンシが求められる>地理的分散の活用
部分障害の可能性を受け入れソフトウェアにフォールトトレランスの仕組みを組みこんでおく
※信頼の低い基盤の上に信頼の高いシステムを構築する概念:例:IPでTCPを利用する
8.2 信頼性の低いネットワーク
インターネット、データセンター内部ネットワークの多くは、非同期パケットネットワーク
ネットワークケーブルの不備、過負荷状態、クラッシュ、電源落ち、リクエストやレスポンスの消失、遅延 >通常これらの問題にはタイムアウト対処。
8.2.1 ネットワークフォールトの実際
データセンタ:障害は一般的なもの
EC2などのパブリッククラウド:一時的なネットワーク不調は頻繁に生じる
スイッチのアップグレードによる遅延
8.2.2 フォールトの検出
対処
・ロードバランサ障害:ローテーションから外す
・シングルリーダレプリケーション:スレーブをマスタに昇格
フォールト検知
・TCP:接続ポートのサービスが落ちている場合はRST, FINで返信してくれる
・ICMP:Destination Unreachable
8.2.3 タイムアウトと限度のない遅延
タイムアウト長い:障害判定が遅れてしまう
タイムアウト短い:回復するネットワーク負荷などを障害と誤判定
障害判定し、データ処理を移行する際、すでに全体が高負荷の場合、さらに悪化する懸念
タイムアウト:すべてのパケットは一定時間d以内に配送されるか、ロストされる。ノードはr時間以内にリクエスト処理する>2d+rが妥当なタイムアウト。しかし、非同期のネットワークには遅延には上限がない
8.2.3.1 ネットワークの輻輳とキューイング
ネットワークのパケット遅延の変動はキューイングによって生じる場合がほとんど
スイッチのキューイング:混雑している場合はパケットスロットを得るまで待つ必要あり
>パケット再送が必要になる
ノードに到達してもノードのcpuがビジーだったら、OSによってパケットはキューイングされる
仮想環境でも、CPU利用時間がわりあてられるまでキューイングで待たされる
TCPフロー制御(輻輳回避:バックプレッシャー):ネットワークに入る前に送信側でキューイングされる
TCPのタイムアウト判定による再送による遅延
UDPは信頼性と遅延の変動制のトレードオフ:ただしスイッチのキューやスケジューリング遅延の影響は受ける
バッチ型のワークロードは容易にネットワークリンクを溢れさせてしまう>ネットワーク遅延の引き起こし
8.2.4 同期ネットワークと非同期ネットワーク
電話回線は同期ネットワーク
回路:経路が電話をかけると確立し、利用帯域が確保される
キューイングの影響を受けないのでレイテンシも一定
8.2.4.1 ネットワーク遅延の予測はできないのか
データ属性の違い
・電話:一定ビット/sの転送が必要になる
・メール、Web:送信が早く終わりさえすれば良い:バースト性トラフィックに対
ネットワーク遅延は、単純にコストとメリットをトレードオフした結果。
8.3 信頼性の低いクロック
コンピューナ内の推奨振動機は正確でなく、変動する:進みすぎたり遅くなったり
タイムスタンプによって、複数レプリへの書き込み>各ノードの時刻差によって弊害が起きる場合もある
論理クロック:インクリメントによるカウンタに基づくもの>イベントの順序づけに関しては安全性が高い選択
8.3.4 プロセスの一時停止
ガベージコレクション、サスペンド、レジューム、ライブマイグレーション、他のスレッドへのコンテキストスイッチ、スレッドが低速なI/O操作完了まで一時停止、ページフォールト処理
実行中のスレッドを任意の時点でプリエンプション(一時的に中断)し、再開される。しかし、そのスレッドはそうなったことに気づかない
単一マシンではスレッドセーフの技術として、ミューテックス、セマフォ、アトミックなカウンタ・・・などがあるが、分散システムでは共有メモリがなく、信頼できないネットワークを通じてメッセージを送信するため利用できない。
8.3.4.1 レスポンスタイムの保証
クリティカルな問題事例
・エアバッグ解放システムがGCによって運悪く停止したため、処理が無視or遅れる
>リアルタイムペレーティングシステムでは、指定された間隔でCPU時間の割り当て保証を伴うプロセスのスケジューリングが必要
GCなどを一時的に事前に計画された短時間のノードの停止のように扱い、GC停止中は、他のノードがリクエスト処理する方法もある。
8.4.1 真実は多数決で決定される
多くの分散アルゴリズムはクォラム(quorum)で、ノード障害を判定する。ノード群によって行われる投票。想定外の停止時間から復帰したノードが、正常クラスタに悪影響を与えるのを防ぐ。
8.4.1.1 リーダとロック
分散システムで実装するには注意が必要
一つのデータパーティションで、リーダをひとつにする
オブジェクトのロックできるトランザクション、あるいはクライアントを一つにする
>フェンシングトークン:ロックやリースを渡すたびにインクリメントされるもので、古いトークンを判別し拒否するようにする
>万が一悪意あるトークンを返すシステムの場合はなりたたない=ビザンチン障害:ノードが嘘のレスポンスを返すケース
ユーザIDをユニークで登録できるようにする
※HBaseにかつてあったバグ
ロックを取得したクライアントがGCで一時停止して、ロックのリース期限切れになる。他のクライアントがロックを取得して、ストレージを更新。GC復帰したクライアントがロックリース切れを知らずに書き込みしてデータ破壊。
9章:一貫性と合意
9.1 一貫性の保証
レプリケーションラグは、どのようなトポロジでも発生する。
結果整合性:ほとんどのレプリケーションで保証される
>データベースの書き込みがやめば、ある程度待てば最終的に全ての読み込みリクエストに対して同じ値が返されるようになること
9.2 線形化可能性
レプリカが複数あっても、一つしかないようにみせてくれる
原始的一貫性、即時一貫性、外部一貫性ともいう
読みとられる値は常に最新の値:最新性の保証
直列化可能性との違い
・直列化可能性:複数のオブジェクト(行、ドキュメント、レコード)を読み書きできること。複数のトランザクションが何らかの順序で実行された場合、同じ結果になる。この順序は、トランザクションが実際に実行された順序と異なっても良い。
・線形化可能性:レジスタの書き込みにおける最新性の保証(個々のオブジェクトに関すること)複数操作をトランザクションにまとめるようなものではない。
データベースは、直列化可能性と線形化可能性をともに提供できる。>双方の組み合わせ:厳密な直列化可能性(strict serializability)という
>厳密な直列化可能性>例:2相ロックや完全な順次実行
スナップショット分離は線形化可能ではない
9.2.2 線形化可能性の依存
9.2.2.1 ロックとリーダ選出
単一リーダレプリケーションはマスタが一台であることを保証する必要あり
ロックで制御:ロックの実装には線形化可能性が保証される必要あり
>分散ロック>Zookeeperなどの協調サービスが利用される
9.2.2.2 制約およびユニーク性の保証
ユーザ名、アドレスなどの登録のユニーク性:同時登録時に片方をエラーにする:線形化可能性が求められる
他にも、銀行残高、倉庫の在庫数、並行した席の予約など
9.2.2.3 クロスチャネルタイミング依存
2つの通信チャネルの処理の片方遅延による同期ズレ防止:線形可能性必要
9.2.3 線形可能なシステムの実装
シングルリーダレプリケーション:潜在的に線形化可能
合意アルゴリズム:線形化可能
マルチリーダレプリケーション:線形化可能でない
リーダレスレプリケーション:おそらく線形化可能でない
9.2.3.1 線形化可能とクオラム
Dynamoスタイルのモデル
ネットワーク遅延でレース問題は起きる
Dynamoスタイルのレプリケーションを行うリーダレスなシステムは線形化可能性を提供しない
9.2.4 線形化可能にすることによるコスト
線形化可能なレプリケーションは、他のレプケーションとネットワーク切り離されたら、切り離されたレプリカは利用できなくなる。
線形化が必須でない場合は、ネットワークから切り離されたレプリカは独立して処理できるが、線形化は可能でない。
線形化可能を持っているシステムは少数
現代的なマルチコアCPUのRAMも線形化可能ではない
CPUコアが個別にメモリキャッシュとストアバッファを持っているため
最終的にはパフォーマンスとトレードオフとなる
9.3 順序の保証
全順序:必ず2つの要素の大小を判断し順序づける
線形可能システムでは、操作には全順序がある。並行操作は存在しない。
線形化可能なシステムはパフォーマンスや可用性が損なわれる:ネットワーク遅延が大きいシステムでは問題
9.3.2 シーケンス番号の順序
シーケンス番号、タイムスタンプ:全順序を提供する
複数ノードでシーケンス番号発行:偶数と奇数分ける、番号範囲を分ける
9.3.2.2 ランポートタイムスタンプ
(Counter, Node ID)によるシーケンス。Counterで値比較で大きい方を選択。値が同じなら、Node IDが大きい方を選択する。
9.3.2.3 タイムスタンプの順序づけだけでは不十分
タイムスタンプが一番先に取得されたものだと確認するには、全ての操作(全てのノード)を収集して確認する必要がある。
9.3.3.1 全順序のブロードキャストの利用
Zookeeper, etcdcなどの合意サービスでは実装されている
全順序で全てのレプリカが書き込まれる:ステートマシンレプリケーションの原則
9.3.3.2 全順序ブロードキャストを利用した線形可能なストレージの実装
1.メッセージをログに追記し、取得したいユーザ名を記す
2.ログを読み取り、自分が追記したメッセージが配信されてくるのを待つ
3.取得したいユーザ名を要求しているメッセージがないか調べる。取得したユーザ名に関する最初のメッセージが自分のメッセージであれば取得成功。要求したユーザ名をコミットして、承認。他のユーザからのメッセージであれば操作中断
書き込みのみ線形化可能、読みよりは保証できない。
この手続きが提供するのは逐次一貫性(タイムライン一貫性)
9.3.3.3 線形化可能なストレージを使った全順序ブロードキャストの実装
線形化可能なcompare-and-setレジスタと全順序ブロードキャストはどちらも合意と等価
9.4 分散トランザクションと合意
リーダ選出:障害時でもスプリットブレインなどが起きない
アトミックなコミット:全てのノードでトランザクションが成功すること
9.4.1 アトミックなコミットと2相コミット(2PC)
2相コミット:複数ノードにまたがるアトミックなトランザクションを実現するアルゴリズム
新たなコンポーネントとして、コーディネータ(調停者)が登場する。
>参加ノードが更新可否にyesとコーディネータに返答したのち、コーディネータがダウンすると、参加ノードは更新待ちの状態で行き詰まる
9.4.1.5 スリーフェーズコミット
デメリット
ネットワーク遅延に上限がある
ノードのレスポンスタイムに上限がある
9.4.2 分散トランザクションの実際
・データベース内部の分散トランザクション
参加しているトランザクションは、同じデータベースソフトウェア
・ヘテロジニアスな分散トランザクション
2つ以上の異なるデータベース技術からなるトランザクション。
二相コミット:信用性高い、しかしパフォーマンスそこなうのがデメリット
XAトランザクション(eXtended Architecture):2相コミット標準.(JTA:Java Transaction APIで実装)>JEBC, JMSでサポート
9.4.2.3 未確定様態のロック保持
ノード障害:タイムアウト期限や、管理者の手作業で状況を解決するまでロック保持される懸念
9.4.2.4 コーディネータ障害からのリカバリ
孤立してしまった未確定トランザクションが生じる懸念
>XAではヒューリスティックな判断と呼ばれる緊急処理がある。
9.4.3 耐障害性を持つ合意
一様同意:異なる決定をしていない
整合性:複数決定しているノードがないこと
妥当性:決定内容と提案の一致
終了性:クラッシュしていないノードは最終的に何らかの値を決定する
9.4.4 メンバーシップと協調サービス
ZooKeeper:協調サービス
線形化可能でアトミックな操作
操作の全順序:フェインシングトークン
障害検出:エフェメラルドノード
変更通知:ロックおよび値の読み取りと、変化の監視もできる
9.4.4.1 ノードの処理の割り当て
パーティショニングされたリソースのノード構成変更による割り当て要請:ZooKeeperのアトミック操作、エフェメラエルノード、通知を使うことで達成
9.4.4.2 サービスディスカバリ
ZooKeeper, etcd, Consulはサービスディスカばりにもしばしば用いられる
まとめ:
システム中のイベント順序(原因と結果に基づく出来事の順序関係=因果)を求める場合、因果は線形化可能性ほど調整位オーバヘッドを求めないが、タイムスタンプなどでの実装はできない。>合意の仕組みが必要。
合意には以下の問題がある
・線形化可能なcompare-and-setレジスタ:与えられたパラメータが等しいかによってアトミックに値を設定するかどうか決定しなければならない
・アトミックなトランザクションのコミット:DBは分散トランザクションをコミットか中断するかを決定しなければならない
・全順序ブロードキャスト:メッセージシステムは、メッセージ配信順序を決定しなければならない。
・ロックとリース:複数クライアントから一斉にロック、リースを取得要求があった場合、ロックはどのクライアントに成功させるか決定する必要がある
・メンバーシップ・協調サービス:どのノードが生きていて、どのノードがタイムアウトで落ちているか決定する必要がある。
・ユニーク制約:複数トランザクションで同じキーで衝突するレコードを作成しようとする場合、どのコレードを許可し、どれを制約違反するか決定する必要がある
これらの仕組みでリーダーに障害、ネットワーク障害でリーダに調達不可能の対処
・リーダの回復を待ち、それまでシステムがブロックされてしまうのを許容
・手動フェイルオーバでリーダの選定と設定を手動実施
・アルゴリズムで自動的にリーダを決定する
合意へ落とし込めるような処理が必要で耐障害性をもとめるなら、zookeeperのようなものを使うことを推奨
第3部:導出データ
III.1 記録のシステムと導出データ
導出データ:システムで既存のデータを変換したり処理した結果のもの
導出データは失っても、オリジナルのソースから再生成できる
キャッシュに利用される
非正規化されて活用される
10章:バッチ処理
システムのタイプ
・サービス:オンラインシステム:
・バッチ処理システム:オフラインシステム:10章
・ストリーム処理システム:準リアルタイムシステム:11章
GNU Coreutils(Linux) sort:メモリより大きいデータセットを自動的にディスクに書き出して複数CPUコアで自動的にソートを並列化>メモリを使い果たすことなくスケール化>ボトルネックは入力ファイルをディスクから読み込む速度
※ awk, sed, grep, sort, uniq, xargsをいくつか組み合わせるだけで多くのデータ分析が数分でできていまいパフォーマンスも優れている
10.1.2 Unixの哲学
・各プログラムは一つのことだけうまくこなすようにする
・各プログラムの出力は未知のプログラムの入力になることを想定
・ソフトウェアは数週間で設計し構築する。OSも例外でない
・プログラミングの手間を減らすのに、単純作業でなくツールを使う、たとえ使い捨てでも
10.2 MapReduceと分散ファイルシステム
10.2.1 MapReduceジョブの実行:Hadoop
Mapper:入力レコードに対して一度ずつ呼び出され、入力レコードからキーと値を取り出す。mapperは一つの入力レコードから次の入力レコードへの過程で状態を一切保存しないので、各レコードは独立して処理
Reducer:mapperによって生成されたキーと値ペアから同じキーに属する全ての値を収集し、その値のコレクションに対するイテレータとともにreducerを呼び出します。reducerは、出力レコードを生成
ユーザが並列化のためのコードを書くことなく、分散並列化できる:図10-1 P440
10.2.1.2 MapReduceのワークフロー
単一のMapReduceジョブで解決できる問題の範囲は限られている
例:URLごとのページビューを求めても、もっとも人気のあるurLは判断できない
>そのため2巡目のソートが必要。
従って、MapReduceジョブは連鎖させてワークフローにまとめられるのが一般的
10.2.2 Reduce側での結合とグループ化
MapRedueにはインデックスという概念はない
入力としてファイル集合が渡されると、MapReduceジョブはそれらのファイルの内容を全て読み取る:データベースでのフルテーブルスキャン
分析的なクエリでは大量のコレードに対する集計をするので処理として妥当
10.2.2.4 Group By
mapperが生成するキーと値のペアキーが、グループ化に求められるキーを使うようにmapperをセットアップすることで対応できる。>パーティショニングとソートのプロセスが、同じキーを持つ全てのレコードを同じreducerにまとめてくれる。
10.2.2.5 スキューの処理(ホットスポット)
大量のデータが一つのキーに関連づけられていると破綻する
例:SNSなどで有名フォロワー数百万人などが
不均衡なアクティブデータベースレコード>リンチピノブジェクト、ホットキーと呼ばれる
MapReduceのジョブは、全てのmapperとreducerの処理が終わって初めて完了となるので、後に続くジョブはもっと処理がかかっているreducerが終わらなければ処理を開始できない。
>対策:ホットキーを複数のreducerに分散させ並列度を高める
10.2.3 map側での結合
mapperの役割:入力データを準備
各入力レコードからキーと値を取り出し、キーと値のペアをreducerのパーティションに割り当てキーでソートする
入力データに対して特定の前提を置けるならmap側での結合(map-side join)と呼ばれる手法を使えば、結合を高速化できる:このアプローチは切り詰められたMapReduceジョブを使い、Reducerを持たずソートもしない。
10.2.3.1 ブロードキャストハッシュ結合
大規模なデータセットに小さなデータセットを結合する
小さなデータセットは各mapperのメモリ内に完全にロードできるサイズであること
10.2.3.2 パーティション化ハッシュ結合
map側での結合への入力が同じ方法でパーティショニングされているのであれば、ハッシュのアプローチは各パーティションに対して独立に適用できます。
10.2.3.3 map側マージ結合
mapreduceジョブが同じキーに基づいてソートしているケース:mapperは通常reducerが行うマージ処理を実行できるので、入力がメモリに収まらなくても実行できる。
10.2.3.4 map側での結合を行うMapReduceのワークフロー
使われる結合がmap側かreducer側かによって出力構造が影響される。
reducer側:結合のキーによってパーティション化されソート
map側:大きい方の入力と同じようにパーティションかされソート
10.2.4 バッチワークフローの出力
10.2.4.1 検索インデックスの構築
mapperはドキュメントの集合を必要に応じてパーティショニングし、各reducerは自身が受け持つパーティショニングのインデックスを構築し、インでっk数ファイルが分散ファイルシステムに書き出される
このインデックスの構築は並列化できる。
インデックス付けされたドキュメントに変更があった場合は、定期的に全体のインデック付のワークフローをドキュメントの集合全体に対して再実行し、新しいインデッックスで置き換えるのが一つの選択肢>あるいはインデックスをインクリメンタルに構築:11章
10.2.4.2 バッチ処理の出力としてのキー・バリューストア
Hadoop処理で直接、サービスの本番DBを参照するのはよくない理由
ネットワーク負荷
本番DB負荷
ジョブの出力に対してオールオアナッシングを保証:ジョブの途中で書き込みが起こるとその書き込みは外部に見えてしまう副作用が出る
>新しいDBを作成し、それを参照するようにする
MApReduceジョブ中でのデータベースファイルの構築は様々なキー・バリューストアがサポート
10.2.4.3 バッチ処理の出力における哲学
入力は変化させることなく、副作用(DB書き出しなど)を避けることによってバッチジョブのパフォーマンス性、メンテナンス性を高める
10.2.5 Hadoopと分散データベースとの比較
大規模並列処理(MPP):データベースでHadoop以前に実装されてたもの
MPPデータベースは複数マシンからなるクラスタ上での分析的なSQLクエリの並列実行に焦点
MapReduce+分散ファイルシステムは、任意のプログラムを実行できる汎用的なOSのようなものを提供
10.2.5.1 ストレージの多様性
mppデータベース:注意深いスキーマ設計でデータを加工
実際には、データに癖があって使いにくい
スキーマ設計は、集中化されたデータのコレクションの速度を低下させる
Hadoop:無差別に生のデータを書き、必要に応じて後から処理できるようにする
10.2.5.2 処理モデルの多様性
データベース:SQL>全てのデータに対して万能でない>機械学習とか不得意
MapReduce:エンジニアが自作したコードを大規模なデータセットに対して容易に実行できる
>それを基盤とし、SQLクエリエンジンの構築が可能
後にはMapReduceでも限定が強いとなり他の処理モデルがHadoopで開発>10.3
10.2.5.3 頻繁な障害に備えた設計
map,reduceタスクに障害があっても、ここのタスク粒度で処理の再トライができる。 MapReduceが頻繁に生じる予想外のタスク終了に設計された理由
データセンタで高優先度のプロセスの空きをぬって実行できるよう想定
常に高優先度プロセス優先のため、終了させられるリスクがあった
10.3 MapReduceを超えて
10.3.1 中間的な状態の実体化
MapReduce:ジョブ毎に中間ファイルをつくる
>一つのジョブ完了まで次のジョブが待たされる
Mapperは冗長な部分もある:reducerが書き出したファイルを読み出し、パーティショニングとソートという次のステージに備えるだけ
>reducerの一部にもできる
中間ファイルが複数ノードにレプリケーションされる:過剰である
10.3.1.1 データフローエンジン
MapReduceの上述の問題を解決するため、分散バッチコンピューティングの実行エンジンが新たに開発された
>Spark, TEz, Flinkなど
ワークフローを独立したジョブに分割せず全体を一つのジョブとする
>データフローエンジンと呼ばれる
メリット
・ソートを必要な時のみ行う
・mapperの処理を先行するreducerに取り入れることができる:不要mapタスクがなくなる
・ワークフロー内のデータ結合依存関係が宣言されるので受け渡しが効率化
・中間状態はメモリ保持かローカルディスク書き込みで済む
・先行するステージ全体が終わるのを待たず次のステージを開始できる
・JVMプロセス再利用ができる:MapReduceは常に新規起動
10.3.1.2 耐障害性
Spark, Flink, Texは中間状態をHDFSに書き込まないので別のアプローチ
あるマシンの中間状態が失われたら、利用できる以前の中間データやHDFSのオリジナルデータから再計算される
>現実的には中間データは再計算より実体化しておいた方が低コスト
10.3.3 高レベルAPIと様々な言語
手作業でMapReduceジョブをプログラミングするのは労力大
高レベルの言語やAPIが広く使われるようになる
>Hive Pig, Cascading, Crunch
10.3.3.1 宣言的なクエリ言語への移行
Hive, Spark, Flink はコストベースのクエリオプティマイザを持つ
>中間状態の量を最小化するように結合の順序を入れ替えることが可能
多重ループ内の小さな内部ループでデータを繰り返し処理してCPUキャッシュを有効活用し関数呼び出しをしない
任意のコードを実行し、任意のフォーマットのデータを読めるという柔軟性を維持
まとめ
分散バッチ処理エンジンは、あえて制限のあるプログラミングモデルを持つ
分散システムの難しい問題を抽象化の裏側に隠せる。そのため、ネットワーク障害が生じた場合も、タスクをリトライしても安全で、失敗タスクの出力は破棄される。
フレームワークのおかげで、耐障害性の仕組みの実装も気にする必要はなし。
こういった仕組みは、データベース書き込みを行うオンラインサービスが持っているセマンティクスより強力。
11章:ストリーム処理
10章では、入力データが有限なものを想定
11章のストリームは入力は無限を想定:いつの段階で処理を始めるのか
Stream: Unixのstdin, stdout, JavaのFileInputStream, TCP接続,ビデオ配信
11.1 イベントストリームの転送
ストリーム処理の文脈では、一般にレコードはイベントと呼ばれる
小さく自己完結しているイミュータブルなオブジェクト
ある時点で生じたことの詳細が含まれている
通常のイベントには、発生時刻のタイムスタンプなどが含まれる
プロデューサー:パブリッシャ、あるいは送信者
コンシューマー:サブスクライバあるいは受信者
ファイルシステムでは、ファイル名によって一連のレコードが識別
ストリーミングでは、関連するイベント群はトピック、ストリームとしてグループ化
ストリーミングでは、データストア利用のやりかたではポーリング(競合回避や同期)の負荷が高くなる
>新しいイベントが現れた時にコンシューマに通知する方式が良い
11.1.1 メッセージングシステム
新たしいイベントをコンシューマに通知する>メッセージングシステム
プロデューサー送信速度がコンシューマー処理速度より速い場合の対処
>メッセージドロップ、キューイングでバッファする、back pressureを適用
メッセージが失われるのを許容するかはアプリケーションに依存
>受け取る情報によっては信頼性が問題視される
11.1.1.1 プロデューサーからコンシューマへの直接のメッセージング
株式フィードのストリーム;UDPマルチキャスト
コンシューマがネットワーク上でサービスを公開している場合
HTTP,RPCなどでリクエストを発行してコニューマにメッセージをプッシュ
webhooksの背景にある考え方:サービスのコールバックURLを他のサービスに登録し、イベントが発生した時にそのURLへのリクエストを発行す
これらは、メッセージ失われる可能性をアプリのコードが意識する必要あり
>コンシューマオフライン中に配信見逃し、プロヂューサクラッシュでデータ消失
11.1.1.2 メッセージブローカー
メッセージキューとも呼ばれる
メッセージストリームに最適化された一種のデータベース
メッセージブローカはサーバとして動作:プロデューサやコンシューマがクライアントとして接続
プロデューサー>ブローカー>コンシューマ:メッセージの流れ
永続性の問題をブローカーに移す
キューイングの結果、概してコンシューマは非同期になる
プロデューサはブローカによるメッセージのバッファリングだけ持ちコンシューマによるメッセージの処理は待たない
11.1.1.3 メッセージブローカとデータベースの比較
ブローカ:XA,JTAを使って2層コミットプロトコルに参加できるのもある
配信に成功した時点でデータは削除する:保存ストレージとしては適す
バッファデータが大きくなるとディスク書き込みが必須に>パフォーマンス影響
データに変化があればクライアントに通知
11.1.1.4 複数のコンシューマ
ロードバランシング:どれか一つのコンシューマに配信、送信内容を分業できる
ファンアウト:全てのメッセージを全てのコンシューマに配信、各コンシューマが独立して影響し合うことなく処理できる
11.1.1.5 承認と再配信
メッセージが失われないことを保証するため、メッセージブローカは承認を利用
クライアントは処理を終えたことを明示的に伝える>ブローカがキューからメッセージを削除できるようにする
ロードバランシングと再配信が組み合わさると、コンシューマのクラッシュでメッセージが別のコンシューマに再配信され、メッセージの順番が異なってしまうケースがある
11.1.2 パーティション化されたログ
11.1.2.1 メッセージストレージへのログの利用
高いスループットへスケールするためにログをパーティション化:パーティションごとに専任のブローカがいる構造
プロデューサはトピックのパーティションのファイルに追記することでメッセージを送信し、コンシューマは、それらのファイルを担当パーティション毎にシーケンシャルの読み込む:担当パーティションのオフセットをコンシューマは持っている
11.1.2.4 ディスク領域の利用
循環バッファ(リングバッファ)としてディスクを利用
コンシューマがプロデューサの処理の追いつけない場合の対処
11.1.2.6 古いメッセージのリプレイ
承認ベースのメッセージ処理:ブローカーからのメッセージ削除をする
ログベースのメッセーヒ処理:ファイル良い鳥のみのRead Only
ログベースは、メッセージのオフセットはコンシューマ側が制御しているので、新たなコンシューマを定義して古いメッセージのオフセットから開始することも可能
11.2 データベースとストリーム
11.2.1 システムの同期の保持
データベース:データベース中で更新されたら、キャッシュ、検索インデックス、データウェアハウスの中でも更新されなければならない
データベースと検索インデックスの更新(2重書き込み)には並列処理によるレース(競合)問題がある
11.2.2 変更データのキャプチャ
変更データキャプチャ(CDC);データベースに書かれた全てのデータの変更を観察し、それを他のシステムへレプリケーション出来る形態で取り出すプロセス
変更ログが同じ順序で適用されるなら、検索インデックスへの反映においても、データベースと一致することが期待できる。
11.2.2.3 ログのコンパクション
Apache Kafkaがサポート
11.2.3 イベントソーシング
アプリケーションの状態に加えられた全ての変更を変更イベントのログとして保存。イベントログに書かれたイミュータブルなイベントとして構築される。
11.2.4 状態、ストリーム、イミュータビリティ
イミュータブル:作成後その状態を変えないオブジェクトのこと
11.3 ストリームの処理
ストリームを処理するコード:オペレータ、ジョブという
11.3.4 耐障害性
11.3.4.1 マイクロバッチ処理とチェックポイント処理
マイクロバッチ処理:ストリームを小さなブロックに分割。それぞれのブロックをミニチュアのバッチ処理のように扱う
バッチが小さいとスケジューリングと調整のオーバヘッドが大きいが、バッチを大きくすればストリーム処理の結果が見えるまでの遅延が長くなる。
11.3.4.3 冪等性(べきとう)
冪等な操作:複数回実行でき、その影響が一度だけ実行したのと同じになる
まとめ
ストリーム処理に現れる3種類の結合
・ストリーム・ストリーム結合
・ストリーム・テーブル結合:他方にデータベース変更ログが含まれる
・テーブル・テーブル結合:どちらもデータベースのchangelog>2つのテーブルが結合されたマテリアライズドビューに対する変更ストリーム
以上。
へポスト
