はじめに
なんかバージョンアップでいろいろ変わってるっぽいので、一通り眺めてみました。 お題はfluent-plugin-bigqueryです。(あんま関係ないけど)
こちらの記事がとても参考になりました。 BufferedOutput pluginの代表的なoptionについて
かなり適当なので、詳しくはソースコードを参照してください。
サンプル
fluent-plugin-bigqueryのREADMEにのってるloadのサンプルはこんな感じ。
<match bigquery>
@type bigquery
method load
buffer_type file
buffer_path bigquery.*.buffer
flush_interval 1800
flush_at_shutdown true
try_flush_interval 1
utc
auth_method json_key
json_key json_key_path.json
time_format %s
time_field time
project yourproject_id
dataset yourdataset_id
auto_create_table true
table yourtable%{time_slice}
schema_path bq_schema.json
</match>
継承関係
fluentdのプラグインのパラメータはクラスの継承ツリーの中にあるそれぞれのクラスで定義されているので、理解しやすいようにそれぞれのクラスごとにパラメータをみていきたいと思います。 そのために、まずは継承関係を整理します。
outputプラグインの継承関係
Fluent::BigQueryOutput < Fluent::Compat::TimeSlicedOutput < Fluent::Plugin::Output < Fluent::Plugin::Base
bufferの継承関係
今回はfile bufferについて。
Fluent::Plugin::FileBuffer < Fluent::Plugin::Buffer < Fluent::Plugin::Base
パラメータ
型についてはこちらを参照してください。 http://docs.fluentd.org/articles/config-file#supported-data-types-for-values
outputプラグイン
Fluent::BigQueryOutput
まぁここは適当に。
time_format
: timeのformattime_field
: timestampとして使うフィールドmethod
: ストリーミングインサートinsert
orload
utc
: UTCauth_method
: 認証方法json_key
: JSON Keyのパスproject
: BigQueryのload先projectdataset
: BigQueryのload先datasettable
: BigQueryのload先tableschema_path
: schemaファイルのパス
Fluent::Compat::TimeSlicedOutput
https://github.com/fluent/fluentd/blob/master/lib/fluent/compat/output.rb
ここでは、ほとんどdefault値の書き換えぐらいしかやってないっぽい。 カッコでくくってあるパラメータ名は新しい名前で、こちらでエイリアスが張ってあります。 旧パラメータ名でソースコード探しても見つかりません 😭
パラメータ | 型 | default | 説明 |
---|---|---|---|
buffer_type |
string | "file" |
使うbufferのタイプ |
flush_interval |
time | nil |
buffer chunkをflushする間隔 |
try_flush_interval (flush_thread_interval ) |
float | 1 |
try_flush する間隔 |
disable_retry_limit (retry_forever ) |
bool | false |
永遠にretryする |
retry_limit (`retry_max_times``) |
integer | 17 |
リトライする回数 |
retry_wait |
time | 1.0 |
最初のリトライ間隔 (リトライするたびにどんどん長くなる) |
max_retry_wait (retry_max_interval ) |
time | nil |
リトライ間隔の最大値 |
num_threads (flush_thread_count ) |
integer | 1 |
バッファをflushするスレッドの数 |
queued_chunk_flush_interval (flush_thread_burst_interval ) |
time | 1 |
queued chunkをflushする間隔 |
time_slice_format |
string | "%Y%m%d" |
ファイル名の一部として使う |
time_slice_wait (timekey_wait ) |
time | 10 * 60 |
古いログが届くのを待つ時間 |
timezone (timekey_zone ) |
string | nil |
特定のtimezoneで時間をparseする |
buffer_chunk_limit (chunk_limit_size ) |
size | 256 * 1024 * 1024 |
各buffer chunkのサイズ |
buffer_queue_limit (queue_length_limit ) |
integer | 256 |
chunk queueの長さの上限 |
buffer_queue_full_action (overflow_action ) |
enum | :exception |
buffer queueがbuffer_queue_limit を超えた時の動作1 |
flush_at_shutdown |
bool | false |
shutdown直前にflushする |
Fluent::Plugin::Output
https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/output.rb
TimeSlicedOutput
の方で別名が設定されてるところをみると、このクラスで用意されてるパラメータはprivateな感じなのかもしれません。
普段使うときはあまり意識しなくてもよさそうです。
説明も適当です 😇 多分ソース読んだ方が早いと思います。
パラメータ | 型 | default | 説明 |
---|---|---|---|
timekey |
time | nil |
|
timekey_wait |
time | nil |
|
timekey_use_utc |
bool | false |
デフォルトはlocaltime |
timekey_zone |
string | Time.now.strftime("%z") |
タイムゾーン |
flush_at_shutdown |
bool | nil |
シャットダウン直前にflushを試みる |
flush_mode |
enum | :default |
2 |
flush_interval |
time | 60 |
buffer chunkをflushする間隔 |
flush_thread_count |
integer | 1 |
bufferをflushするthreadの数 |
flush_thread_interval |
float | 1.0 |
try_flush する間隔 |
flush_thread_burst_interval |
float | 1.0 |
多くのbuffer chunkがqueueされているときのflush間隔 |
delayed_commit_timeout |
time | 60 |
buffer chunkがpluginにコミットされるときのタイムアウト? |
overflow_action |
enum | :throw_exception |
bufferがあふれたときの動作3 |
retry_forever |
bool | false |
retry_timeout とretry_max_times を無視して永遠にretryする |
retry_timeout |
time | 72 * 60 * 60 |
リトライのタイムアウト |
retry_max_times |
integer | nil |
最大リトライ回数 |
retry_secondary_threshold |
float | 0.8 |
secondaryを使うようになるretry_timeoutの割合? |
retry_type |
enum | :exponential_backoff |
次のリトライまでの間隔をどう決めるか4 |
retry_wait |
time | 1 |
リトライ間隔の計算に使われる |
retry_exponential_backoff_base |
float | 2 |
リトライ間隔の計算に使われる |
retry_max_interval |
time | nil |
リトライ間隔の最大値 |
retry_randomize |
bool | true |
ランダム間隔でリトライする |
bufferプラグイン
Fluent::Plugin::FileBuffer
https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/buf_file.rb
パラメータ | 型 | default | 説明 |
---|---|---|---|
buffer_path |
string | bufferのファイルパス | |
buffer_chunk_limit_size |
256 * 1024 * 1024 |
||
buffer_total_limit_size |
64 * 1024 * 1024 * 1024 |
||
buffer_file_permission |
string | nil |
bufferファイルのパーミッション |
buffer_dir_permission |
string | nil |
bufferディレクトリのパーミッション |
Fluent::Plugin::Buffer
https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/buffer.rb
パラメータ | 型 | default | 説明 |
---|---|---|---|
buffer_chunk_limit_size |
size | 8 * 1024 * 1024 |
chunkの最大サイズ |
buffer_total_limit_size |
size | 512 * 1024 * 1024 |
buffer合計の最大サイズ |
buffer_queue_length_limit |
integer | nil |
queueの長さ5 |
buffer_chunk_records_limit |
integer | nil |
chunk内のレコード数の上限? |
buffer_chunk_full_threshold |
float | 0.95 |
chunk sizeかrecord数がこのthreshold%をこえるとenqueueされる |
buffer_compress |
enum | :text |
圧縮するかどうか6 |
-
list: [:exception, :block, :drop_oldest_chunk]
↩︎ -
list: [:default, :lazy, :interval, :immediate]
. How to enqueue chunks to be flushed. “interval” flushes per flush_interval, “immediate” flushes just after event arrival. ↩︎ -
list: [:throw_exception, :block, :drop_oldest_chunk]
↩︎ -
https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/output.rb#L81 ↩︎
-
If user specify this value and (chunk_size * queue_length) is smaller than total_size, then total_size is automatically configured to that value ↩︎
-
list: [:text, :gzip]
↩︎