メインコンテンツまでスキップ
メインコンテンツまでスキップ

Elasticからエージェントを移行する

Elasticからのエージェントの移行

Elastic Stackは、複数の可観測性データ収集エージェントを提供しています。具体的には:

  • Beatsファミリー - FilebeatMetricbeat、およびPacketbeatなど、すべてlibbeatライブラリに基づいています。これらのBeatsは、Lumberjackプロトコルを介してElasticsearch、Kafka、Redis、またはLogstashにデータを送信することをサポートします。
  • Elastic Agentは、ログ、メトリック、およびトレースを収集するための統合エージェントを提供します。このエージェントは、Elastic Fleet Serverを介して中央管理可能で、Elasticsearch、Logstash、Kafka、またはRedisへの出力をサポートしています。
  • Elasticはまた、OpenTelemetry Collector - EDOTの配布版を提供しています。現在はFleet Serverによってオーケストレーションできませんが、ClickStackへの移行を考えるユーザーにとって、より柔軟でオープンな道を提供します。

最適な移行パスは、現在使用しているエージェントに依存します。以下のセクションでは、各主要エージェントタイプに対する移行オプションを文書化しています。私たちの目標は、摩擦を最小限に抑え、可能な限りユーザーが移行中も既存のエージェントを引き続き使用できるようにすることです。

推奨移行パス

可能な限り、すべてのログ、メトリック、トレース収集にOpenTelemetry (OTel) Collectorへの移行を推奨し、エッジのエージェントロールで収集器をデプロイすることをお勧めします。これはデータを送信する最も効率的な手段を表し、アーキテクチャの複雑さやデータ変換を避けることができます。

OpenTelemetry Collectorの理由

OpenTelemetry Collectorは、可観測性データの取り込みに対して持続可能でベンダーニュートラルなソリューションを提供します。私たちは、いくつかの組織が数千、あるいは数万のElasticエージェントを運用していることを認識しています。これらのユーザーにとって、既存のエージェントインフラとの互換性を維持することが重要かもしれません。このドキュメントはそれをサポートしつつ、チームが段階的にOpenTelemetryベースの収集に移行するのを助けることを目的としています。

ClickHouse OpenTelemetryエンドポイント

すべてのデータは、ログ、メトリック、トレース、セッションデータの主な入り口として機能するOpenTelemetry (OTel) collectorインスタンスを介してClickStackに取り込まれます。このインスタンスのためには、公式のClickStack配布版を使用することを推奨します。すでにClickStackのデプロイメントモデルにバンドルされていない限り

ユーザーは、言語SDKから、またはインフラメトリックとログを収集するデータ収集エージェントを介して、この収集器にデータを送信します(OTelコレクターのエージェント役割や、FluentdVectorなどの他の技術を使用することができます)。

このコレクターはすべてのエージェント移行ステップで利用可能であると仮定します。

Beatsからの移行

広範なBeatsデプロイメントを持つユーザーは、ClickStackに移行する際にこれらを保持したいと考えるかもしれません。

現在、このオプションはFilebeatでのみテストされているため、ログ専用に適しています。

Beatsエージェントは、現在OpenTelemetryの仕様に統合されつつあるElastic Common Schema (ECS)を使用します。ただし、これらのスキーマは依然として著しく異なるため、ユーザーはECS形式のイベントをClickStackに取り込む前にOpenTelemetry形式に変換する責任があります。

この変換は、強力な変換言語であるVector Remap Language (VRL)をサポートする軽量で高性能な可観測性データパイプラインであるVectorを使用して行うことを推奨します。

FilebeatエージェントがKafkaにデータを送信するように構成されている場合(Beatsでサポートされている出力)、VectorはKafkaからこれらのイベントを消費し、VRLを使用してスキーマ変換を適用し、OTLPを介してClickStackに付属のOpenTelemetry Collectorに転送できます。

また、VectorはLogstashで使用されるLumberjackプロトコル経由でイベントを受信することもサポートしています。これにより、Beatsエージェントはデータを直接Vectorに送信でき、同じ変換プロセスを適用してからOTLPを介してClickStackのOpenTelemetry Collectorに転送できます。

これらのアーキテクチャの両方を以下に示します。

Migrating agents

以下の例では、Lumberjackプロトコルを介してFilebeatからログイベントを受信するためにVectorを構成する初期ステップを提供します。ECSイベントをOTel仕様にマッピングするためのVRLを提供し、これらをOTLPを介してClickStackのOpenTelemetryコレクターに送信します。Kafkaからイベントを消費するユーザーは、VectorのLogstashソースをKafkaソースに置き換えることができ、他のステップは同じままとなります。

Vectorのインストール

公式インストールガイドを使用してVectorをインストールします。

これは、Elastic Stack OTelコレクターと同じインスタンスにインストールできます。

ユーザーは、Vectorをプロダクションに移行する際のベストプラクティスに従って、アーキテクチャやセキュリティに関して配慮することができます。

Vectorの構成

Vectorは、Logstashインスタンスを模倣してLumberjackプロトコル経由でイベントを受信するように構成する必要があります。これは、Vectorのlogstashソースを構成することで達成できます。

sources:
  beats:
    type: logstash
    address: 0.0.0.0:5044
    tls:
      enabled: false  # Set to true if you're using TLS
      # The files below are generated from the steps at https://www.elastic.co/docs/reference/fleet/secure-logstash-connections#generate-logstash-certs
      # crt_file: logstash.crt
      # key_file: logstash.key
      # ca_file: ca.crt
      # verify_certificate: true
TLS構成

相互TLSが必要な場合は、Elasticガイドの"Logstash出力のSSL/TLSを構成する"を使用して証明書とキーを生成します。これらは、上記のように構成で指定できます。

イベントはECS形式で受信されます。これらは、Vector Remap Language (VRL)トランスフォーマーを使用してOpenTelemetryスキーマに変換できます。このトランスフォーマーの構成は簡単で、スクリプトファイルは別のファイルに保持されます。

transforms:
  remap_filebeat:
    inputs: ["beats"]
    type: "remap"
    file: 'beat_to_otel.vrl'

上記のbeatsソースからイベントを受信することを注意してください。我々のリマップスクリプトを以下に示します。このスクリプトはログイベントのみでテストされていますが、他の形式の基盤となることができます。

VRL - ECSからOTelへ

# Define keys to ignore at root level
ignored_keys = ["@metadata"]


# Define resource key prefixes
resource_keys = ["host", "cloud", "agent", "service"]


# Create separate objects for resource and log record fields
resource_obj = {}
log_record_obj = {}


# Copy all non-ignored root keys to appropriate objects
root_keys = keys(.)
for_each(root_keys) -> |_index, key| {
    if !includes(ignored_keys, key) {
        val, err = get(., [key])
        if err == null {
            # Check if this is a resource field
            is_resource = false
            if includes(resource_keys, key) {
                is_resource = true
            }

            # Add to appropriate object
            if is_resource {
                resource_obj = set(resource_obj, [key], val) ?? resource_obj
            } else {
                log_record_obj = set(log_record_obj, [key], val) ?? log_record_obj
            }
        }
    }
}


# Flatten both objects separately
flattened_resources = flatten(resource_obj, separator: ".")
flattened_logs = flatten(log_record_obj, separator: ".")


# Process resource attributes
resource_attributes = []
resource_keys_list = keys(flattened_resources)
for_each(resource_keys_list) -> |_index, field_key| {
    field_value, err = get(flattened_resources, [field_key])
    if err == null && field_value != null {
        attribute, err = {
            "key": field_key,
            "value": {
                "stringValue": to_string(field_value)
            }
        }
        if (err == null) {
            resource_attributes = push(resource_attributes, attribute)
        }
    }
}


# Process log record attributes
log_attributes = []
log_keys_list = keys(flattened_logs)
for_each(log_keys_list) -> |_index, field_key| {
    field_value, err = get(flattened_logs, [field_key])
    if err == null && field_value != null {
        attribute, err = {
            "key": field_key,
            "value": {
                "stringValue": to_string(field_value)
            }
        }
        if (err == null) {
            log_attributes = push(log_attributes, attribute)
        }
    }
}


# Get timestamp for timeUnixNano (convert to nanoseconds)
timestamp_nano = if exists(.@timestamp) {
    to_unix_timestamp!(parse_timestamp!(.@timestamp, format: "%Y-%m-%dT%H:%M:%S%.3fZ"), unit: "nanoseconds")
} else {
    to_unix_timestamp(now(), unit: "nanoseconds")
}


# Get message/body field
body_value = if exists(.message) {
    to_string!(.message)
} else if exists(.body) {
    to_string!(.body)
} else {
    ""
}


# Create the OpenTelemetry structure
. = {
    "resourceLogs": [
        {
            "resource": {
                "attributes": resource_attributes
            },
            "scopeLogs": [
                {
                    "scope": {},
                    "logRecords": [
                        {
                            "timeUnixNano": to_string(timestamp_nano),
                            "severityNumber": 9,
                            "severityText": "info",
                            "body": {
                                "stringValue": body_value
                            },
                            "attributes": log_attributes
                        }
                    ]
                }
            ]
        }
    ]
}

最後に、変換されたイベントはOTLPを介してClickStackにOpenTelemetryコレクターに送信できます。これには、Vectorでremap_filebeat変換からイベントを取り込むOTLPシンクの構成が必要です。

sinks:
  otlp:
    type: opentelemetry
    inputs: [remap_filebeat] # receives events from a remap transform - see below
    protocol:
      type: http  # Use "grpc" for port 4317
      uri: http://localhost:4318/v1/logs # logs endpoint for the OTel collector 
      method: post
      encoding:
        codec: json
      framing:
        method: newline_delimited
      headers:
        content-type: application/json
        authorization: ${YOUR_INGESTION_API_KEY}

ここでのYOUR_INGESTION_API_KEYはClickStackによって生成されます。このキーは、HyperDXアプリのTeam Settings → API Keysで見つけることができます。

Ingestion keys

私たちの最終的な完全な構成は以下に示されています。

sources:
  beats:
    type: logstash
    address: 0.0.0.0:5044
    tls:
      enabled: false  # Set to true if you're using TLS
        #crt_file: /data/elasticsearch-9.0.1/logstash/logstash.crt
        #key_file: /data/elasticsearch-9.0.1/logstash/logstash.key
        #ca_file: /data/elasticsearch-9.0.1/ca/ca.crt
        #verify_certificate: true

transforms:
  remap_filebeat:
    inputs: ["beats"]
    type: "remap"
    file: 'beat_to_otel.vrl'

sinks:
  otlp:
    type: opentelemetry
    inputs: [remap_filebeat]
    protocol:
      type: http  # Use "grpc" for port 4317
      uri: http://localhost:4318/v1/logs
      method: post
      encoding:
        codec: json
      framing:
        method: newline_delimited
      headers:
        content-type: application/json

Filebeatの構成

既存のFilebeatインストールは、単にイベントをVectorに送信するように変更される必要があります。これには、Logstash出力の構成が必要で、再びTLSはオプションで構成できます。


# ------------------------------ Logstash Output -------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

Elastic Agentからの移行

Elastic Agentは、異なるElastic Beatsを1つのパッケージに統合します。このエージェントは、Elastic Fleetと統合されており、中央集権的にオーケストレーションおよび構成することができます。

Elastic Agentsを展開しているユーザーは、いくつかの移行パスがあります:

  • エージェントをLumberjackプロトコル経由でVectorエンドポイントに送信するように構成します。これは、Elastic Agentでログデータを収集しているユーザーに対してのみテストされています。 これは、KibanaのFleet UIを通じて中央に設定できます。
  • Elastic OpenTelemetry Collector (EDOT)としてエージェントを実行します。Elastic Agentには、アプリケーションとインフラを一度の手間で計器化し、複数のベンダーやバックエンドにデータを送信できる埋め込まれたEDOTコレクターが含まれています。この構成では、ユーザーは単にEDOTコレクターを構成して、OTLPを介してClickStack OTelコレクターにイベントを転送することができます。このアプローチはすべてのイベントタイプをサポートします。

これらのオプションの両方を下に示します。

Vector経由でデータを送信

Vectorのインストールと構成

Filebeatからの移行用に文書化された手順と同じ手順を使用して、Vectorをインストールおよび構成します。

Elasticエージェントを構成

Elastic Agentは、LogstashプロトコルLumberjackを介してデータを送信するように構成する必要があります。これはサポートされているデプロイメントパターンで、中央に設定するか、エージェント構成ファイルelastic-agent.yamlを通じて設定することができます(Fleetなしで展開する場合)。

Kibanaを通じた中央設定は、Fleetに出力を追加することによって実現できます。

Add Logstash output

この出力は、エージェントポリシーで使用できます。これにより、ポリシーを使用しているすべてのエージェントが自動的にそのデータをVectorに送信することになります。

Agent settings

これにはTLSによる安全な通信が必要であるため、ユーザーがそのVectorインスタンスをLogstashの役割として想定していることを前提とした、"Logstash出力のSSL/TLSを構成する"ガイドを推奨します。

この設定には、Vector内のLogstashソースも相互TLSを設定する必要があります。ガイドで生成されたキーと証明書を使用して、入力を適切に構成してください。

sources:
  beats:
    type: logstash
    address: 0.0.0.0:5044
    tls:
      enabled: true  # Set to true if you're using TLS. 
      # The files below are generated from the steps at https://www.elastic.co/docs/reference/fleet/secure-logstash-connections#generate-logstash-certs
      crt_file: logstash.crt
      key_file: logstash.key
      ca_file: ca.crt
      verify_certificate: true

ElasticエージェントをOpenTelemetryコレクターとして実行

Elastic Agentには、アプリケーションとインフラを一度の手間で計器化し、複数のベンダーやバックエンドにデータを送信できる埋め込まれたEDOTコレクターが含まれています。

エージェント統合とオーケストレーション

Elastic Agentに埋め込まれたEDOTコレクターを実行しているユーザーは、エージェントが提供する既存の統合機能を利用できません。また、コレクターはFleetによって中央管理できず、ユーザーは独立モードでエージェントを実行することを余儀なくされ、自ら構成を管理する必要があります。

Elastic AgentをEDOTコレクターで実行するには、公式Elasticガイドを参照してください。ガイドに示されているようにElasticエンドポイントの構成を行うのではなく、既存のexportersを削除し、OTLP出力を構成してClickStack OpenTelemetryコレクターにデータを送信します。たとえば、エクスポーターの構成は次のようになります。

exporters:
  # Exporter to send logs and metrics to Elasticsearch Managed OTLP Input
  otlp:
    endpoint: localhost:4317
    headers:
      authorization: ${YOUR_INGESTION_API_KEY}
    tls:
      insecure: true

ここでのYOUR_INGESTION_API_KEYはClickStackによって生成されます。このキーは、HyperDXアプリのTeam Settings → API Keysで見つけることができます。

Ingestion keys

もしVectorが相互TLSを使用するように構成されており、ガイドから生成された証明書とキーが使用されている場合、otlpエクスポーターは次のように構成される必要があります。

exporters:
  # Exporter to send logs and metrics to Elasticsearch Managed OTLP Input
  otlp:
    endpoint: localhost:4317
    headers:
      authorization: ${YOUR_INGESTION_API_KEY}
    tls:
      insecure: false
      ca_file: /path/to/ca.crt
      cert_file: /path/to/client.crt
      key_file: /path/to/client.key

Elastic OpenTelemetryコレクターからの移行

すでにElastic OpenTelemetry Collector (EDOT)を実行しているユーザーは、エージェントをClickStack OpenTelemetryコレクターにOTLP経由で送信するように単に再構成できます。これに関与する手順は、Elastic AgentをOpenTelemetryコレクターとして実行するための手順と同じです。このアプローチはすべてのデータタイプで使用できます。