OpenTelemetry Collector Linux 호스트에 직접 설치하여 자체 호스팅 Kafka 클러스터를 모니터링하세요.
시작하기 전에
다음 사항을 확인하십시오:
모니터링 호스트에 OpenJDK가 설치되어 있습니다.
Kafka 브로커에서 JMX가 활성화되어 있습니다(일반적으로 9999번 포트).
수집기에서 Kafka 브로커로의 네트워크 액세스:
- Bootstrap 서버 포트(일반적으로 9092)
- JMX 포트(일반적으로 9999)
1단계: OpenTelemetry Collector를 설치합니다.
호스트 운영 체제에 맞는 OpenTelemetry Collector Contrib 바이너리를 OpenTelemetry Collector 릴리스 페이지 에서 다운로드하여 설치하십시오.
2단계: JMX 스크래퍼를 다운로드하세요
JMX 스크래퍼는 Kafka 브로커 MBean에서 자세한 메트릭을 수집합니다.
$# Create directory in user home (no sudo needed)$mkdir -p ~/opentelemetry$curl -L -o ~/opentelemetry/opentelemetry-jmx-scraper.jar \> https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v1.52.0/opentelemetry-jmx-scraper.jar중요
버전 호환성: 이 가이드에서는 JMX Scraper 1.52.0 버전을 사용합니다. 이전 버전의 OpenTelemetry Collector는 호환성 목록에 이 스크래퍼의 해시를 포함하지 않을 수 있습니다. 최상의 결과를 얻으려면 이 JMX 스크래퍼 버전을 지원하는 최신 OpenTelemetry Collector 버전을 사용하십시오.
3단계: JMX 사용자 정의 기호 설정 생성
기본 덤불, 목표 시스템에 포함되지 않은 추가 Kafka 지표를 수집하기 위해 사용자 정의 JMX 설정 파일을 만듭니다.
다음 설정으로 파일 ~/opentelemetry/kafka-jmx-config.yaml 을 생성하세요.
---rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages in per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)팁
메트릭 수집 사용자 지정: kafka-jmx-config.yaml 파일에 사용자 지정 MBean 규칙을 추가하여 추가 Kafka 메트릭을 수집할 수 있습니다.
JMX 메트릭 규칙의 기본 구문을알아보세요.
Kafka 모니터링 문서에서 사용 가능한 MBean 이름을 찾아보세요.
이를 통해 특정 모니터링 요구 사항에 따라 Kafka 브로커에서 노출하는 모든 JMX 메트릭을 수집할 수 있습니다.
4단계: 수집기 설정 생성
~/opentelemetry/config.yaml 에 메인 OpenTelemetry Collector 설정을 생성합니다.
receivers: # Kafka metrics receiver for cluster-level metrics kafkametrics: brokers: - ${env:KAFKA_BROKER_ADDRESS} protocol_version: 2.8.0 scrapers: - brokers - topics - consumers collection_interval: 30s topic_match: ".*" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
# JMX receiver for broker-specific metrics jmx/kafka_broker-1: jar_path: ${env:HOME}/opentelemetry/opentelemetry-jmx-scraper.jar endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS} target_system: kafka collection_interval: 30s jmx_configs: ${env:HOME}/opentelemetry/kafka-jmx-config.yaml resource_attributes: broker.id: "1" broker.endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
processors: batch/aggregation: send_batch_size: 1024 timeout: 30s
resourcedetection: detectors: [env, ec2, system] system: resource_attributes: host.name: enabled: true host.id: enabled: true
resource: attributes: - action: insert key: kafka.cluster.name value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id: metric_statements: - context: resource statements: - delete_key(attributes, "broker.id")
filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
transform/des_units: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [ topic ] aggregation_type: sum
exporters: otlp/newrelic: endpoint: https://otlp.nr-data.net:4317 headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} compression: gzip timeout: 30s
service: pipelines: metrics/brokers-cluster-topics: receivers: [jmx/kafka_broker-1, kafkametrics] processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, batch/aggregation] exporters: [otlp/newrelic]
metrics/jmx-cluster: receivers: [jmx/kafka_broker-1] processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/des_units, cumulativetodelta, batch/aggregation] exporters: [otlp/newrelic]설정 참고 사항:
- OTLP 엔드포인트:
https://otlp.nr-data.net:4317(미국 지역) 또는https://otlp.eu01.nr-data.net:4317(유럽 지역)을 사용합니다. 다른 지역의 OTLP 엔드포인트 구성 방법을 참조하세요.
중요
여러 브로커를 사용하는 경우, 클러스터의 각 브로커를 모니터링하기 위해 서로 다른 엔드포인트와 브로커 ID를 가진 추가 JMX 수신기를 추가하십시오.
5단계: 환경 변수 설정
필요한 환경 변수를 설정하세요:
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BROKER_ADDRESS="localhost:9092"$export KAFKA_BROKER_JMX_ADDRESS="localhost:9999"바꾸다:
YOUR_LICENSE_KEY당신의 뉴렐릭 피규어와 함께my-kafka-clusterKafka 클러스터에 고유한 이름을 지정하세요.localhost:9092Kafka 부트스트랩 서버 주소와 함께localhost:9999Kafka 브로커의 JMX 엔드포인트를 사용하세요.
6단계: 수집기를 시작합니다
수집기를 직접 실행하세요(sudo 권한 필요 없음):
$# Start the collector with your config$otelcol-contrib --config ~/opentelemetry/config.yaml수집기는 몇 분 내에 Kafka 지표를 뉴렐릭으로 보내기 시작할 것입니다.
지속적인 실행을 위한 systemd 서비스를 생성합니다(초기 설정 시 sudo 권한이 필요합니다).
$# Create systemd service file$sudo tee /etc/systemd/system/otelcol-contrib.service > /dev/null <<EOF$[Unit]$Description=OpenTelemetry Collector for Kafka$After=network.target$
$[Service]$Type=simple$User=$USER$WorkingDirectory=$HOME/opentelemetry$ExecStart=/usr/local/bin/otelcol-contrib --config $HOME/opentelemetry/config.yaml$Restart=on-failure$Environment="NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY"$Environment="KAFKA_CLUSTER_NAME=my-kafka-cluster"$Environment="KAFKA_BROKER_ADDRESS=localhost:9092"$Environment="KAFKA_BROKER_JMX_ADDRESS=localhost:9999"$
$[Install]$WantedBy=multi-user.target$EOFYOUR_LICENSE_KEY 및 기타 값을 교체한 다음 서비스를 활성화하고 시작하십시오.
$sudo systemctl daemon-reload$sudo systemctl enable otelcol-contrib$sudo systemctl start otelcol-contrib$sudo systemctl status otelcol-contrib7단계: (선택 사항) 제작자 또는 소비자를 구성합니다.
Kafka 생산자 및 소비자 근로자로부터 디버그 수준의 텔레메트리를 수집하려면 OpenTelemetry 클라이언트 에이전트를 사용하세요.
다음 에이전트를 다운로드하세요:
bash$mkdir -p ~/otel-java$curl -L -o ~/otel-java/opentelemetry-javaagent.jar \>https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar에이전트로 시작하세요:
bash$java \>-javaagent:~/otel-java/opentelemetry-javaagent.jar \>-Dotel.service.name="kafka-producer-1" \>-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \>-Dotel.exporter.otlp.endpoint=https://otlp.nr-data.net:4317 \>-Dotel.exporter.otlp.protocol="grpc" \>-Dotel.metrics.exporter="otlp" \>-Dotel.traces.exporter="otlp" \>-Dotel.logs.exporter="otlp" \>-Dotel.instrumentation.kafka.experimental-span-attributes="true" \>-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \>-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \>-Dotel.instrumentation.kafka.enabled="true" \>-jar your-kafka-application.jar
바꾸다:
kafka-producer-1생산자 또는 소비자 애플리케이션에 고유한 이름을 지정하세요.my-kafka-cluster수집기 설정에 사용된 것과 동일한 클러스터 이름을 사용합니다.https://otlp.nr-data.net:4317뉴렐릭 OTLP 엔드포인트를 사용하세요(EU 지역의 경우https://otlp.eu01.nr-data.net:4317사용). 다른 엔드포인트 및 설정 옵션은 OTLP 엔드포인트 구성을 참조하세요.
잔류 에이전트는 코드 변경이 전혀 없는 기본 Kafka 측정, 캡처 기능을 제공합니다.
- 요청 지연시간
- 처리량 지표
- 오류율
- 분산 추적
고급 설정에 대해서는 Kafka 측정, 로그 문서를 참조하세요.
6단계: (선택 사항) Kafka 브로커 로그 전달
호스트에서 Kafka 브로커 로그를 수집하여 뉴렐릭으로 전송하려면 OpenTelemetry Collector 에서 파일 로그 수신기를 구성하십시오.
데이터 찾기
몇 분 후, Kafka 창이 뉴렐릭에 나타날 것입니다. 뉴렐릭 UI 의 다양한 보기에서 Kafka 범위를 탐색하는 방법에 대한 자세한 지침은 "데이터 찾기"를 참조하세요.
NRQL을 사용하여 데이터를 쿼리할 수도 있습니다.
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'문제점 해결
다음 단계
- Kafka 메트릭 살펴보기 - 전체 메트릭 참조 자료를 확인하세요
- 맞춤형 대시보드 만들기 - Kafka 데이터에 대한 시각화 구축
- 알림 설정 - 소비자 지연 및 복제되지 않은 파티션과 같은 중요한 지표를 모니터링합니다.