MySQLテーブルへの更新/削除イベントを逐次取得するFluentdプラグイン「fluent-plugin-mysql-replicator」をリリースしました
任意のSQLクエリで取得した結果の差分から、insert/update/deleteイベントを検知するプラグインをリリースしました。イベント検知だけでなく、レコードの内容と共にElasticsearch/Solrへ同期を行う、Outputプラグインも同封しています。
これはあえてバイナリログ(MySQLBinlogAPI)は使わずに、SQLクエリの実行結果の差分を見てinsert/update/deleteイベントを検知します。
そのため、純粋なテーブル同期だけでなく、任意のJOINやVIEWテーブルを元とした差分同期処理が実現できるのが特徴です。
y-ken/fluent-plugin-mysql-replicator
- https://github.com/y-ken/fluent-plugin-mysql-replicator
- http://rubygems.org/gems/fluent-plugin-mysql-replicator
利用例
このプラグインで実のようなことが実現できます。
テーブルの内容をElasticsearchに差分同期し、kuromojiを用いた全文検索を行う
例1
次のいずれかの条件に該当するテーブルであれば、以下のクエリで全件取得を行います。
- 更新日時カラム(updated_atなど)がない
DELETE文を用いた、行の物理削除を行う(フラグ管理ではない)
サンプルクエリ:
SELECT * FROM articles;
例2
もし次の条件を共に満たす場合には、以下のようなテーブルフルスキャンを伴わない効率的な同期処理が出来ます。
- 更新日時カラム(updated_atなど)があり、論理削除時にも更新される
DELETE文を用いない、フラグ管理を用いた論理削除を行う
サンプルクエリ :
SELECT * FROM articles WHERE DATE_ADD(updated_at, INTERVAL 5 MINUTE) > NOW();
※ いずれの場合も同じプライマリキーへの更新を検知した際は、追加ではなく更新となります
※ kuromojiはElasticsearchにelasticsearch-analysis-kuromojiプラグインをインストールすることで利用できます
テーブルの更新内容を他のRDBMSへストアする
次のようなFluentdのOutputプラグインに渡すこともできます。
- MongoDB fluent-plugin-mongo
- HBase fluent-plugin-hbase
- HDFS fluent-plugin-webhdfs
- Redshift fluent-plugin-redshift
- Cassandra fluent-plugin-cassandra
- Riak fluent-plugin-riak
なお、削除イベントに対応しているのは同封のmysql_replicator_elasticsearchとmysql_replicator_solrのみですが、ユニークキーを指定できる物であれば対応できるので、ぜひ相談またはpull-reqをください。
使い方
fluent-plugin-mysql-replicatorには、mysql_replicatorとmysql_replicator_multプラグインを内包していますが、今回は最も手軽に試せるmysql_replicatorを用いた例を紹介します。
あらかじめ、Fluentd, MySQL及びElasticsearchのインストールが済んでいるものとします。
1. プラグインのインストール
いずれかの方法でインストールします。
### システム側のRubyにインストールする場合 gem install fluent-plugin-mysql-replicator ### td-agentにインストールする場合 /usr/lib64/fluent/ruby/bin/fluent-gem install fluent-plugin-mysql-replicator ### RPMパッケージ、Yamabikoを利用する # cf. https://github.com/y-ken/yamabiko/releases
2. Fluentdの設定
任意の設定ファイルを編集します。なお、td-agentであれば/etc/td-agent/td-agent.conf
を、Yamabikoであれば/etc/yamabiko/yamabiko.conf
を編集します。
<source> type mysql_replicator # レプリケート元MySQLへの接続設定 host localhost username your_mysql_user password your_mysql_password database mysites # SELECTクエリの設定 query SELECT id, text, updated_at from search_test; primary_key id # 主キーを指定する (デフォルト: id) interval 2s # クエリを実行する間隔 (デフォルト: 1m) # 削除された主キーを検知する機能の有効化設定 (デフォルト: yes) enable_delete yes # 各イベントをどのようなタグで配送するか指定する tag replicator.mysites.search_test.${event}.${primary_key} # ${event} : 検知したイベント種別が insert/update/delete のいずれかが入る # ${primary_key} : この設定の`primary_key`の値が入る </source> <match replicator.**> type copy <store> type stdout </store> <store> type mysql_replicator_elasticsearch # Elasticsearchサーバの接続情報を指定 host localhost port 9200 # Elasticsearchへレコードを登録する際の index(Database)、type(テーブル)、そしてunique id (primary_key)をどのようにタグから分解するか指定 tag_format (?<index_name>[^\.]+)\.(?<type_name>[^\.]+).(?<event>[^\.]+)\.(?<primary_key>[^\.]+)$ # どの程度バッファに溜めてElasticsearchサーバへ転送するかを指定 flush_interval 5s # リトライ間隔の最大秒数 (fluentd >= 0.10.41) max_retry_wait 1800 </store> </match>
設定を済ませたらFluentdを起動します。
# td-agentを利用する場合 $ sudo /etc/init.d/td-agent start # Yamabikoを利用する場合 $ sudo /etc/init.d/yamabiko start
MySQL側でinsert/update/deleteクエリを発行する
mysql> create database mysites; mysql> use mysites; mysql> create table search_test(id int auto_increment, text text, PRIMARY KEY (id)); mysql> insert into search_test(text) values('aaa'); mysql> update search_test set text='bbb' where text = 'aaa'; mysql> delete from search_test where text='bbb';
その結果、ログファイルに次のようにイベントが検知されます。
なお、insert/update/deleteクエリを10秒間隔で手動実行したので、10秒ごとのタイムスタンプとなっています。
$ tail -f /var/log/td-agent/td-agent.log 2013-11-25 18:22:25 +0900 replicator.mysites.search_test.insert.id: {"id":"1","text":"aaa"} 2013-11-25 18:22:35 +0900 replicator.mysites.search_test.update.id: {"id":"1","text":"bbb"} 2013-11-25 18:22:45 +0900 replicator.mysites.search_test.delete.id: {"id":"1"}
また、同時にElasticsearchにもデータが反映されるので、それもcurlコマンドで確認します。
# index: mysites, type: search_test の内容を5件取得する $ curl "http://localhost:9200/mysites/search_test/_search?size=5&pretty"
これでSELECTクエリで取得した内容がElasticsearchに同期され、検索出来るようになりました。
インストールさえ何とかなれば、とても手軽ですね!
対応環境
推奨動作環境
MySQLの管理テーブルを利用する場合は次の通りです。(mysql_replicator_multiを利用)
MySQLの管理テーブルを利用しない場合は次の通りです。(mysql_replicatorを利用)
MySQL
Gemのmysql2から接続できる、次のようなMySQL及び互換DBに対応しています。
Elasticsearch
elasticsearch-0.90.6以降の環境で動作確認済みです。(それより前の環境でもおそらく動きますが未確認です)
Solr
MultiCoreに対応したSolr 4.0以降であれば動くはずです。(未確認)
本番環境で利用する
Fluentdプラグインのため、td-agentと共に利用することも出来ます。 しかし本来のログ収集を行うFluentdに、本プラグインのような特別な機能を追加していくと、ゆくゆくは次のような問題が発生します。
- 設定ファイルが複雑化し、タグのチェーンが読み解きづらくなる
- 様々な機能を詰め込むことで、障害時に何が原因でそのトラブルが発生したか追跡するのが困難となる
そこで、このプラグインを独立して稼働できる、Ruby+Fluentd+fluent-plugin-mysql-replicatorを独立バイナリとしてパッケージ化したミドルウェアを「Yamabiko」としてリリースしております。
こちらはtd-agentをフォークして作成したもので、より詳細な情報についてはまた別の記事で紹介します。
※ td-agent : プロダクション環境でFluentdを使う場合には有り難い、サーバ側のRubyに依存せず完全に独立して稼働できるパッケージ
まとめ
このプラグインは、削除検知が出来るので完全なテーブルのミラーリングが出来ることと、レプリケート元のテーブルに要求する制約があまりないのが特徴です。
とても手軽に試せるので、まずは手元のFluentdでお試しください!
疑問や質問、気になる点などございましたら、@yoshi_kenまでメンションください。
可能な限りサポートします。
併せて読みたい
- Fluentdベースのミドルウェア"Yamabiko"でMySQLのテーブルをElasticsearchへレプリケートする話
http://www.slideshare.net/y-ken/yamabiko-replicate-mysql-table-to-elasticsearch