メインコンテンツへスキップ
メインコンテンツへスキップ

Elastic からのエージェントの移行

Elastic からのエージェント移行

Elastic Stack は、複数の Observability データ収集エージェントを提供しています。具体的には次のとおりです。

  • Beats ファミリーFilebeatMetricbeatPacketbeat など — は、いずれも libbeat ライブラリをベースとしています。これらの Beats は、Lumberjack プロトコル経由で Elasticsearch、Kafka、Redis、もしくは Logstash へのデータ送信 をサポートします。
  • Elastic Agent は、ログ、メトリクス、トレースを収集可能な統合エージェントです。このエージェントは Elastic Fleet Server 経由で集中管理でき、出力先として Elasticsearch、Logstash、Kafka、または Redis をサポートします。
  • Elastic はまた、OpenTelemetry Collector - EDOT のディストリビューションも提供しています。現在のところ Fleet Server によるオーケストレーションはできませんが、ClickStack への移行を検討しているユーザーに対し、より柔軟でオープンな移行パスを提供します。

最適な移行パスは、現在使用しているエージェントに依存します。以降のセクションでは、代表的な各エージェントタイプごとに移行オプションを説明します。本ガイドの目的は、移行時の負担を最小限に抑え、可能な限り移行期間中も既存エージェントを継続利用できるようにすることです。

推奨される移行パス

可能な限り、すべてのログ・メトリクス・トレース収集について OpenTelemetry (OTel) Collector への移行を推奨します。Collector は エージェントロールでエッジにデプロイ してください。これがデータ送信の最も効率的な方法であり、アーキテクチャの複雑化やデータ変換を避けることができます。

なぜ OpenTelemetry Collector なのか?

OpenTelemetry Collector は、オブザーバビリティデータのインジェストに対して、持続可能かつベンダーニュートラルなソリューションを提供します。組織によっては、数千台、あるいは数万台規模の Elastic Agent 群を運用している場合があることを認識しています。これらのユーザーにとっては、既存のエージェント基盤との互換性を維持することが重要となる場合があります。本ドキュメントは、そのような要件を満たしつつ、チームが徐々に OpenTelemetry ベースの収集へ移行できるよう支援することを目的としています。

ClickHouse OpenTelemetry エンドポイント

すべてのデータは OpenTelemetry (OTel) collector インスタンスを経由して ClickStack に取り込まれます。このインスタンスが、ログ、メトリクス、トレース、およびセッションデータの主なエントリポイントとして機能します。このインスタンスには、ClickStack ディストリビューション に含まれる公式 collector を使用することを推奨します(ClickStack のデプロイメントモデルにすでにバンドルされていない場合)。

ユーザーは、この collector に対して、言語 SDKs から、またはインフラストラクチャのメトリクスとログを収集するデータ収集エージェント(agent ロールで動作する OTel collector や、FluentdVector などの他のテクノロジー)を通じてデータを送信します。

以降のすべてのエージェント移行手順において、この collector が利用可能であることを前提とします。

Beats からの移行

大規模な Beats デプロイメントを運用しているユーザーは、ClickStack への移行時にもそれらを引き続き利用したい場合があります。

現時点ではこのオプションは Filebeat でのみ検証されており、そのためログ用途にのみ適しています。

Beats エージェントは Elastic Common Schema (ECS) を使用します。これは現在、ClickStack が利用する OpenTelemetry 仕様へ統合されつつあります。しかしながら、これらのスキーマには依然として大きな差異があり、現時点ではユーザー側で ECS 形式のイベントを、ClickStack へのインジェスト前に OpenTelemetry 形式へ変換する必要があります。

この変換は、軽量かつ高パフォーマンスなオブザーバビリティデータパイプラインであり、Vector Remap Language (VRL) と呼ばれる強力な変換言語をサポートする Vector を用いて実行することを推奨します。

Filebeat エージェントが、Beats でサポートされている出力先である Kafka へのデータ送信に設定されている場合、Vector は Kafka からそれらのイベントを読み取り、VRL を使用してスキーマ変換を適用し、その後 OTLP 経由で ClickStack に同梱されている OpenTelemetry Collector へ転送できます。

また、Vector は Logstash が使用する Lumberjack プロトコル経由でのイベント受信にも対応しています。これにより、Beats エージェントはデータを直接 Vector に送信でき、同じ変換処理を適用したうえで OTLP 経由で ClickStack の OpenTelemetry Collector に転送することが可能になります。

以下に、これら 2 つのアーキテクチャを示します。

エージェント移行

次の例では、Lumberjack プロトコル経由で Filebeat からのログイベントを受信するように Vector を設定するための初期ステップを示します。受信した ECS イベントを OTel の仕様へマッピングするための VRL を提示し、その後 OTLP 経由で ClickStack の OpenTelemetry Collector に送信します。Kafka からイベントを取り込むユーザーは、Vector の Logstash ソースを Kafka ソース に置き換えることができます — それ以外の手順はすべて同一です。

vectorのインストール

公式インストールガイドを使用してVectorをインストールします。

これは、Elastic Stack OTel collectorと同じインスタンスにインストールすることができます。

Vectorを本番環境に移行する際は、アーキテクチャとセキュリティに関するベストプラクティスに従うことを推奨します。

vectorの設定

VectorをLumberjackプロトコル経由でイベントを受信するように設定し、Logstashインスタンスを模倣します。これは、Vectorのlogstashソースを設定することで実現できます:

sources:
  beats:
    type: logstash
    address: 0.0.0.0:5044
    tls:
      enabled: false  # TLSを使用する場合はtrueに設定してください
      # 以下のファイルは https://www.elastic.co/docs/reference/fleet/secure-logstash-connections#generate-logstash-certs の手順で生成されます
      # crt_file: logstash.crt
      # key_file: logstash.key
      # ca_file: ca.crt
      # verify_certificate: true
TLS設定

相互TLSが必要な場合は、Elasticガイド"Configure SSL/TLS for the Logstash output"を使用して証明書と鍵を生成します。生成した証明書と鍵は、上記の設定例のように指定することができます。

イベントはECS形式で受信されます。これらはVector Remap Language(VRL)トランスフォーマーを使用してOpenTelemetryスキーマに変換できます。このトランスフォーマーの設定は簡単で、スクリプトファイルを別ファイルとして保持します。

transforms:
  remap_filebeat:
    inputs: ["beats"]
    type: "remap"
    file: 'beat_to_otel.vrl'

上記の beats ソースからイベントを受信します。remap スクリプトを以下に示します。このスクリプトはログイベントでのみテスト済みですが、他の形式の基礎として利用できます。

VRL - ECS から OTel へのマッピング
# ルートレベルで無視するキーを定義
ignored_keys = ["@metadata"]

# リソースキーのプレフィックスを定義
resource_keys = ["host", "cloud", "agent", "service"]

# リソースフィールドとログレコードフィールド用の個別オブジェクトを作成
resource_obj = {}
log_record_obj = {}

# 無視対象外のすべてのルートキーを適切なオブジェクトにコピー
root_keys = keys(.)
for_each(root_keys) -> |_index, key| {
    if !includes(ignored_keys, key) {
        val, err = get(., [key])
        if err == null {
            # リソースフィールドかどうかを確認
            is_resource = false
            if includes(resource_keys, key) {
                is_resource = true
            }

            # 適切なオブジェクトに追加
            if is_resource {
                resource_obj = set(resource_obj, [key], val) ?? resource_obj
            } else {
                log_record_obj = set(log_record_obj, [key], val) ?? log_record_obj
            }
        }
    }
}

# 両方のオブジェクトを個別にフラット化
flattened_resources = flatten(resource_obj, separator: ".")
flattened_logs = flatten(log_record_obj, separator: ".")

# リソース属性を処理
resource_attributes = []
resource_keys_list = keys(flattened_resources)
for_each(resource_keys_list) -> |_index, field_key| {
    field_value, err = get(flattened_resources, [field_key])
    if err == null && field_value != null {
        attribute, err = {
            "key": field_key,
            "value": {
                "stringValue": to_string(field_value)
            }
        }
        if (err == null) {
            resource_attributes = push(resource_attributes, attribute)
        }
    }
}

# ログレコード属性を処理
log_attributes = []
log_keys_list = keys(flattened_logs)
for_each(log_keys_list) -> |_index, field_key| {
    field_value, err = get(flattened_logs, [field_key])
    if err == null && field_value != null {
        attribute, err = {
            "key": field_key,
            "value": {
                "stringValue": to_string(field_value)
            }
        }
        if (err == null) {
            log_attributes = push(log_attributes, attribute)
        }
    }
}

# timeUnixNano用のタイムスタンプを取得(ナノ秒に変換)
timestamp_nano = if exists(.@timestamp) {
    to_unix_timestamp!(parse_timestamp!(.@timestamp, format: "%Y-%m-%dT%H:%M:%S%.3fZ"), unit: "nanoseconds")
} else {
    to_unix_timestamp(now(), unit: "nanoseconds")
}

# message/bodyフィールドを取得
body_value = if exists(.message) {
    to_string!(.message)
} else if exists(.body) {
    to_string!(.body)
} else {
    ""
}

# OpenTelemetry構造を作成
. = {
    "resourceLogs": [
        {
            "resource": {
                "attributes": resource_attributes
            },
            "scopeLogs": [
                {
                    "scope": {},
                    "logRecords": [
                        {
                            "timeUnixNano": to_string(timestamp_nano),
                            "severityNumber": 9,
                            "severityText": "info",
                            "body": {
                                "stringValue": body_value
                            },
                            "attributes": log_attributes
                        }
                    ]
                }
            ]
        }
    ]
}

最後に、変換されたイベントは、OTLPを介したOpenTelemetryコレクター経由でClickStackに送信できます。これには、remap_filebeat変換からイベントを入力として受け取るVectorでのOTLPシンクの設定が必要です。

sinks:
  otlp:
    type: opentelemetry
    inputs: [remap_filebeat] # remap変換からイベントを受信 - 詳細は以下を参照
    protocol:
      type: http  # ポート4317を使用する場合は "grpc" を指定
      uri: http://localhost:4318/v1/logs # OTel collectorのログエンドポイント 
      method: post
      encoding:
        codec: json
      framing:
        method: newline_delimited
      headers:
        content-type: application/json
        authorization: ${YOUR_INGESTION_API_KEY}

ここでのYOUR_INGESTION_API_KEYはClickStackによって生成されます。このキーはHyperDXアプリのTeam Settings → API Keysから確認できます。

インジェストキー

最終的な完全な設定は以下の通りです:

sources:
  beats:
    type: logstash
    address: 0.0.0.0:5044
    tls:
      enabled: false  # TLSを使用する場合はtrueに設定してください
        #crt_file: /data/elasticsearch-9.0.1/logstash/logstash.crt
        #key_file: /data/elasticsearch-9.0.1/logstash/logstash.key
        #ca_file: /data/elasticsearch-9.0.1/ca/ca.crt
        #verify_certificate: true

transforms:
  remap_filebeat:
    inputs: ["beats"]
    type: "remap"
    file: 'beat_to_otel.vrl'

sinks:
  otlp:
    type: opentelemetry
    inputs: [remap_filebeat]
    protocol:
      type: http  # ポート4317の場合は"grpc"を使用してください
      uri: http://localhost:4318/v1/logs
      method: post
      encoding:
        codec: json
      framing:
        method: newline_delimited
      headers:
        content-type: application/json

Filebeatの設定

既存のFilebeatインストールは、イベントをVectorに送信するように変更するだけで済みます。これにはLogstash出力の設定が必要です。TLSもオプションで設定可能です。

# ------------------------------ Logstash出力 -------------------------------
output.logstash:
  # Logstashホスト
  hosts: ["localhost:5044"]

  # SSL設定(オプション)。デフォルトは無効です。
  # HTTPSサーバー検証用のルート証明書のリスト
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # SSLクライアント認証用の証明書
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # クライアント証明書の秘密鍵
  #ssl.key: "/etc/pki/client/cert.key"

Elastic Agent からの移行

Elastic Agent は、複数の Elastic Beats を 1 つのパッケージに統合したものです。このエージェントは Elastic Fleet と連携し、中央集約的にオーケストレーションおよび設定を行えます。

Elastic Agent をデプロイしているユーザーには、いくつかの移行方法があります。

  • エージェントを構成し、Lumberjack プロトコル経由で Vector エンドポイントに送信するようにします。これは現在、Elastic Agent でログデータのみを収集しているユーザー向けにテストされています。 この方法は Kibana の Fleet UI から一元的に設定できます。
  • エージェントを Elastic OpenTelemetry Collector (EDOT) として実行する。Elastic Agent には EDOT Collector が組み込まれており、アプリケーションとインフラストラクチャを一度計装するだけで、複数のベンダーやバックエンドにデータを送信できます。この構成では、EDOT Collector を設定して、OTLP 経由で ClickStack の OTel collector にイベントを転送するだけで済みます。このアプローチは、すべてのイベントタイプをサポートします。

以下で、これら 2 つのオプションについて順に説明します。

Vector 経由でデータを送信する

Vector をインストールして設定する

Filebeat からの移行時に記載されている同じ手順を使用して、Vector をインストールおよび設定します。

Elastic Agent を設定する

Elastic Agent は、Logstash プロトコルである Lumberjack 経由でデータを送信するように設定する必要があります。これはサポートされているデプロイメントパターンであり、Fleet を使用しないデプロイの場合は、Kibana を用いた中央管理、またはエージェント設定ファイル elastic-agent.yaml を直接編集する方法のいずれかで設定できます。

Kibana からの中央設定は、Fleet に Output を追加することで行えます。

Logstash output を追加

この Output は、その後エージェントポリシーで使用できます。これにより、そのポリシーを使用しているすべてのエージェントが、自動的に Vector にデータを送信するようになります。

エージェント設定

これは TLS を用いたセキュア通信の設定が必要となるため、ガイド「Configure SSL/TLS for the Logstash output」を参照することを推奨します。このガイドは、自身の Vector インスタンスが Logstash の役割を担う前提で読み進めることができます。

また、Vector 側で Logstash source を相互 TLS 認証を用いるように設定する必要がある点に注意してください。ガイドで生成したキーと証明書を使用して、適切に input を構成してください。

sources:
  beats:
    type: logstash
    address: 0.0.0.0:5044
    tls:
      enabled: true  # TLS を使用する場合は true に設定します。
      # 以下のファイルは https://www.elastic.co/docs/reference/fleet/secure-logstash-connections#generate-logstash-certs の手順で生成されます
      crt_file: logstash.crt
      key_file: logstash.key
      ca_file: ca.crt
      verify_certificate: true

OpenTelemetry collector として Elastic Agent を実行する

Elastic Agent には EDOT Collector が組み込まれており、一度アプリケーションやインフラストラクチャを計装することで、複数のベンダーやバックエンドにデータを送信できます。

Agent のインテグレーションとオーケストレーション

Elastic Agent に同梱されている EDOT Collector を実行しているユーザーは、Agent が提供する既存のインテグレーションを活用できません。さらに、Collector は Fleet から集中管理できないため、ユーザーは Agent をスタンドアロンモード で実行し、自身で設定を管理する必要があります。

EDOT Collector と共に Elastic Agent を実行するには、Elastic の公式ガイドを参照してください。ガイドに記載されているように Elastic エンドポイントを設定するのではなく、既存の exporters を削除し、OTLP 出力を設定して、データを ClickStack の OpenTelemetry collector に送信します。たとえば、exporters の設定は次のようになります。

exporters:
  # Elasticsearch Managed OTLP Inputにログとメトリクスを送信するエクスポーター
  otlp:
    endpoint: localhost:4317
    headers:
      authorization: ${YOUR_INGESTION_API_KEY}
    tls:
      insecure: true

ここでの YOUR_INGESTION_API_KEY は ClickStack によって発行されます。キーは HyperDX アプリの Team Settings → API Keys で確認できます。

インジェストキー

Vector が相互 TLS (mTLS) を使用するように構成されており、証明書および鍵がガイド "Logstash 出力向けに SSL/TLS を構成する" の手順に従って生成されている場合は、otlp エクスポーターも同様に設定する必要があります。例えば、次のように構成します。

exporters:
  # ログとメトリクスをElasticsearch Managed OTLP Inputに送信するエクスポーター
  otlp:
    endpoint: localhost:4317
    headers:
      authorization: ${YOUR_INGESTION_API_KEY}
    tls:
      insecure: false
      ca_file: /path/to/ca.crt
      cert_file: /path/to/client.crt
      key_file: /path/to/client.key

Elastic OpenTelemetry collector からの移行

すでにElastic OpenTelemetry Collector (EDOT) を利用している場合は、エージェントの設定を変更して、OTLP 経由で ClickStack OpenTelemetry collector に送信するようにするだけで済みます。必要な手順は、上で説明した Elastic Agent を OpenTelemetry collector として実行する 場合と同じです。このアプローチは、すべての種類のデータに対して利用できます。