Y-Ken Studio

新しもの好きのデータエンジニアが四方山話をお届けします。

タグ書き換えが捗るFluentd用Mixinプラグイン「Fluent::Mixin::RewriteTagName」を公開

fluent-mixin-rewrite-tag-nameというFluentd向けMixinプラグインを公開しました。
このMixinプラグインは、<source><match **>の中で使えるtagオプションを追加します。
タグ書き換え周りのプラグイン実装の省力化ができますね!

これは、次のようなプラグインで使えるタグ書き換えに用いられている機能を、その他のプラグインに転用できるよう切り出しました。

  • fluent-plugin-forest
  • fluent-plugin-record-reformer
  • fluent-plugin-rewrite-tag-filter

提供プレースホルダ

Fluent::Mixin::RewriteTagNameを導入することで、次のプレースホルダが使えるようになります。

tag_partsプレースホルダ機能

これは、ドットで区切ったタグの一部を使ってタグを再構築する機能です。 次の例のように、インデックス指定だけでなく${tag_parts[0..2]}といった範囲表現にも対応しています。 test_emit_tag_partsより抜粋したものを以下に転載します。

tag = '0.1.2.3.4'
rules = [
  {:conf=>'tag rewrited.${tag_parts[0]}',       :expect_tag => 'rewrited.0'},
  {:conf=>'tag rewrited.${tag_parts[1]}',       :expect_tag => 'rewrited.1'},
  {:conf=>'tag rewrited.${tag_parts[-1]}',      :expect_tag => 'rewrited.4'},
  {:conf=>'tag rewrited.${tag_parts[-2]}',      :expect_tag => 'rewrited.3'},
  {:conf=>'tag rewrited.${tag_parts[1..1000]}', :expect_tag => 'rewrited.1.2.3.4'},
  {:conf=>'tag rewrited.${tag_parts[0..2]}',    :expect_tag => 'rewrited.0.1.2'},
  {:conf=>'tag rewrited.${tag_parts[0..-3]}',   :expect_tag => 'rewrited.0.1.2'},
  {:conf=>'tag rewrited.${tag_parts[1..-2]}',   :expect_tag => 'rewrited.1.2.3'},
  {:conf=>'tag rewrited.${tag_parts[-2..-1]}',  :expect_tag => 'rewrited.3.4'},
]

詳細な利用方法やユースケースについては次のブログ記事が参考になります。

利用例

出力タグにホスト名を埋め込みたいというユースケースでの設定例は次の通りです。
このMixinプラグインを導入することで、ちょっとした書き換えのために複数のプラグインを経由させなくても良くなるのは良いことですね。

# Inputプラグインでの例
# ホスト名がweb01.example.comなら、tagは'message.web01.example.com'となります
<source>
  type             your_cool_plugin
  tag              message.${hostname}
</source>

# Outputプラグインでの例
# この場合には、tagは'td.foo_bar.web01.example.com'となります
<match app.foo_bar>
  type               your_cool_plugin
  tag                td.${tag}.${hostname}
  remove_tag_prefix  app.
</match>

しかしながらドットが含まれると都合が悪く、短縮ホスト名を使いたいケースがあると思います。
その場合には、ホスト名取得コマンドをhostname_commandオプションを用いて次のように指定します。

# Inputプラグインでの例
# ホスト名がweb01.example.comなら、tagは'message.web01'となります
<source>
  type             your_cool_plugin
  tag              message.${hostname}
  hostname_command hostname -s
</source>

# Outputプラグインでの例
# この場合には、tagは'td.foo_bar.web01'となります
<match app.foo_bar>
  type               your_cool_plugin
  tag                td.${tag}.${hostname}
  remove_tag_prefix  app.
  hostname_command   hostname -s
</match>

導入事例

導入方法

このRewriteTagNameMixinを使うには、大まかには次の流れで行います。 これより、タグ書き換えに便利なプレースホルダ機能の導入方法を、fluent-plugin-foobarという架空のプラグインを例に解説します。

  1. プラグインファイルの最上部にて、このmixinプラグインをrequireする
    require 'fluent/mixin/rewrite_tag_name'

  2. Outputプラグインの場合には、'remove_tag_prefix'オプションを使うためのMixinプラグインもロードする
    include Fluent::HandleTagNameMixin

  3. HandleTagNameMixinをロードした後に、このRewriteTagNameをロードする
    include Fluent::Mixin::RewriteTagName

  4. Fluent::Engine.emitする前に、次のコードを呼び出す
    emit_tag = tag.dup及びfilter_record(emit_tag, time, record)

事前準備

次のようにgemspecを編集し、このMixinプラグインをインストールします。

# 機能を追加したいプラグインのディレクトリへ移動する
$ cd fluent-plugin-foobar

# gemspecファイルを編集する
$ vim fluent-plugin-foobar.gemspec

# 依存ライブラリをインストールする
# 単に`bundle install`としても構いません
$ bundle install --path vendor/bundle 

gemspecファイル編集時には、次のように依存関係を追加します。
実装例としてfluent-plugin-anonymizer.gemspecが参考になります。

Gem::Specification.new do |spec|
  # ...中略...
  spec.add_runtime_dependency "fluent-mixin-rewrite-tag-name"
end

Inputプラグインへの実装例

require 'fluent/mixin/rewrite_tag_name'

module Fluent
  class FooBarInput < Fluent::Input
    Plugin.register_input('foo_bar', self)

    # ...中略...

    include Fluent::HandleTagNameMixin
    include Fluent::Mixin::RewriteTagName    
    config_param :hostname_command, :string, :default => 'hostname'

    # ...中略...

    def configure(conf)
      super

      # ...中略...

      # add a error handling 
      if ( !@tag && !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix )
        raise Fluent::ConfigError, "foo_bar: missing remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix."
      end      
    end

    # ...中略...

    def emit_message(tag, message)
      emit_tag = tag.dup
      filter_record(emit_tag, time, message)
      Engine.emit(emit_tag, Engine.now, message)
    end

    # ...中略...

  end
end

Outputプラグインへの実装例

require 'fluent/mixin/rewrite_tag_name'

class Fluent
  class FooBarOutput < Fluent::Output
    Fluent::Plugin.register_output('foo_bar', self)

    include Fluent::Mixin::RewriteTagName
    config_param :hostname_command, :string, :default => 'hostname'

    # ...中略...

    def configure(conf)
      super

      # ...中略...

      # add a error handling 
      if ( !@tag && !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix )
        raise Fluent::ConfigError, "foo_bar: missing remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix."
      end      
    end

    # ...中略...

    def emit(tag, es, chain)
      es.each do |time, record|
        emit_tag = tag.dup
        filter_record(emit_tag, time, record)
        Fluent::Engine.emit(emit_tag, time, record)
      end
      chain.next
    end

    # ...中略...

  end
end

まとめ

tag_parts等のプレースホルダ機能を、とても簡単に導入出来るので、是非お試しください。
どのプラグインでも共通して行う実装には、積極的にMixinプラグインを利用して挙動の統一を図っていきたいですね。

また、ニーズ次第ではこれをRewriteTagNameMixinとして、Fluentd本体のfluentd/lib/fluent/mixin.rbへマージするのも良いのでは無いかと考えています。