• /
  • EnglishEspañolFrançais日本語한국어Português
  • 로그인지금 시작하기

사용자의 편의를 위해 제공되는 기계 번역입니다.

영문본과 번역본이 일치하지 않는 경우 영문본이 우선합니다. 보다 자세한 내용은 이 페이지를 방문하시기 바랍니다.

문제 신고

OpenTelemetry를 사용하여 자체 호스팅 Kafka를 모니터링하세요.

OpenTelemetry Collector Linux 호스트에 직접 설치하여 자체 호스팅 Kafka 클러스터를 모니터링하세요.

시작하기 전에

다음 사항을 확인하십시오:

  • 뉴렐릭 계정

  • 모니터링 호스트에 OpenJDK가 설치되어 있습니다.

  • Kafka 브로커에서 JMX가 활성화되어 있습니다(일반적으로 9999번 포트).

  • 수집기에서 Kafka 브로커로의 네트워크 액세스:

    • Bootstrap 서버 포트(일반적으로 9092)
    • JMX 포트(일반적으로 9999)

1단계: OpenTelemetry Collector를 설치합니다.

호스트 운영 체제에 맞는 OpenTelemetry Collector Contrib 바이너리를 OpenTelemetry Collector 릴리스 페이지 에서 다운로드하여 설치하십시오.

2단계: JMX 스크래퍼를 다운로드하세요

JMX 스크래퍼는 Kafka 브로커 MBean에서 자세한 메트릭을 수집합니다.

bash
$
# Create directory in user home (no sudo needed)
$
mkdir -p ~/opentelemetry
$
curl -L -o ~/opentelemetry/opentelemetry-jmx-scraper.jar \
>
https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v1.52.0/opentelemetry-jmx-scraper.jar

중요

버전 호환성: 이 가이드에서는 JMX Scraper 1.52.0 버전을 사용합니다. 이전 버전의 OpenTelemetry Collector는 호환성 목록에 이 스크래퍼의 해시를 포함하지 않을 수 있습니다. 최상의 결과를 얻으려면 이 JMX 스크래퍼 버전을 지원하는 최신 OpenTelemetry Collector 버전을 사용하십시오.

3단계: JMX 사용자 정의 기호 설정 생성

기본 덤불, 목표 시스템에 포함되지 않은 추가 Kafka 지표를 수집하기 위해 사용자 정의 JMX 설정 파일을 만듭니다.

다음 설정으로 파일 ~/opentelemetry/kafka-jmx-config.yaml 을 생성하세요.

---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages in per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: current heap usage
type: gauge
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

메트릭 수집 사용자 지정: kafka-jmx-config.yaml 파일에 사용자 지정 MBean 규칙을 추가하여 추가 Kafka 메트릭을 수집할 수 있습니다.

4단계: 수집기 설정 생성

~/opentelemetry/config.yaml 에 메인 OpenTelemetry Collector 설정을 생성합니다.

receivers:
# Kafka metrics receiver for cluster-level metrics
kafkametrics:
brokers:
- ${env:KAFKA_BROKER_ADDRESS}
protocol_version: 2.8.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
topic_match: ".*"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
# JMX receiver for broker-specific metrics
jmx/kafka_broker-1:
jar_path: ${env:HOME}/opentelemetry/opentelemetry-jmx-scraper.jar
endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
target_system: kafka
collection_interval: 30s
jmx_configs: ${env:HOME}/opentelemetry/kafka-jmx-config.yaml
resource_attributes:
broker.id: "1"
broker.endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
processors:
batch/aggregation:
send_batch_size: 1024
timeout: 30s
resourcedetection:
detectors: [env, ec2, system]
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
resource:
attributes:
- action: insert
key: kafka.cluster.name
value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id:
metric_statements:
- context: resource
statements:
- delete_key(attributes, "broker.id")
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
transform/des_units:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [ topic ]
aggregation_type: sum
exporters:
otlp/newrelic:
endpoint: https://otlp.nr-data.net:4317
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
compression: gzip
timeout: 30s
service:
pipelines:
metrics/brokers-cluster-topics:
receivers: [jmx/kafka_broker-1, kafkametrics]
processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, batch/aggregation]
exporters: [otlp/newrelic]
metrics/jmx-cluster:
receivers: [jmx/kafka_broker-1]
processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/des_units, cumulativetodelta, batch/aggregation]
exporters: [otlp/newrelic]

설정 참고 사항:

  • OTLP 엔드포인트: https://otlp.nr-data.net:4317 (미국 지역) 또는 https://otlp.eu01.nr-data.net:4317 (유럽 지역)을 사용합니다. 다른 지역의 OTLP 엔드포인트 구성 방법을 참조하세요.

중요

여러 브로커를 사용하는 경우, 클러스터의 각 브로커를 모니터링하기 위해 서로 다른 엔드포인트와 브로커 ID를 가진 추가 JMX 수신기를 추가하십시오.

5단계: 환경 변수 설정

필요한 환경 변수를 설정하세요:

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BROKER_ADDRESS="localhost:9092"
$
export KAFKA_BROKER_JMX_ADDRESS="localhost:9999"

바꾸다:

  • YOUR_LICENSE_KEY 당신의 뉴렐릭 피규어와 함께
  • my-kafka-cluster Kafka 클러스터에 고유한 이름을 지정하세요.
  • localhost:9092 Kafka 부트스트랩 서버 주소와 함께
  • localhost:9999 Kafka 브로커의 JMX 엔드포인트를 사용하세요.

6단계: 수집기를 시작합니다

수집기를 직접 실행하세요(sudo 권한 필요 없음):

bash
$
# Start the collector with your config
$
otelcol-contrib --config ~/opentelemetry/config.yaml

수집기는 몇 분 내에 Kafka 지표를 뉴렐릭으로 보내기 시작할 것입니다.

지속적인 실행을 위한 systemd 서비스를 생성합니다(초기 설정 시 sudo 권한이 필요합니다).

bash
$
# Create systemd service file
$
sudo tee /etc/systemd/system/otelcol-contrib.service > /dev/null <<EOF
$
[Unit]
$
Description=OpenTelemetry Collector for Kafka
$
After=network.target
$
$
[Service]
$
Type=simple
$
User=$USER
$
WorkingDirectory=$HOME/opentelemetry
$
ExecStart=/usr/local/bin/otelcol-contrib --config $HOME/opentelemetry/config.yaml
$
Restart=on-failure
$
Environment="NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY"
$
Environment="KAFKA_CLUSTER_NAME=my-kafka-cluster"
$
Environment="KAFKA_BROKER_ADDRESS=localhost:9092"
$
Environment="KAFKA_BROKER_JMX_ADDRESS=localhost:9999"
$
$
[Install]
$
WantedBy=multi-user.target
$
EOF

YOUR_LICENSE_KEY 및 기타 값을 교체한 다음 서비스를 활성화하고 시작하십시오.

bash
$
sudo systemctl daemon-reload
$
sudo systemctl enable otelcol-contrib
$
sudo systemctl start otelcol-contrib
$
sudo systemctl status otelcol-contrib

7단계: (선택 사항) 제작자 또는 소비자를 구성합니다.

Kafka 생산자 및 소비자 근로자로부터 디버그 수준의 텔레메트리를 수집하려면 OpenTelemetry 클라이언트 에이전트를 사용하세요.

  1. 다음 에이전트를 다운로드하세요:

    bash
    $
    mkdir -p ~/otel-java
    $
    curl -L -o ~/otel-java/opentelemetry-javaagent.jar \
    >
    https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
  2. 에이전트로 시작하세요:

    bash
    $
    java \
    >
    -javaagent:~/otel-java/opentelemetry-javaagent.jar \
    >
    -Dotel.service.name="kafka-producer-1" \
    >
    -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
    >
    -Dotel.exporter.otlp.endpoint=https://otlp.nr-data.net:4317 \
    >
    -Dotel.exporter.otlp.protocol="grpc" \
    >
    -Dotel.metrics.exporter="otlp" \
    >
    -Dotel.traces.exporter="otlp" \
    >
    -Dotel.logs.exporter="otlp" \
    >
    -Dotel.instrumentation.kafka.experimental-span-attributes="true" \
    >
    -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
    >
    -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
    >
    -Dotel.instrumentation.kafka.enabled="true" \
    >
    -jar your-kafka-application.jar

바꾸다:

  • kafka-producer-1 생산자 또는 소비자 애플리케이션에 고유한 이름을 지정하세요.
  • my-kafka-cluster 수집기 설정에 사용된 것과 동일한 클러스터 이름을 사용합니다.
  • https://otlp.nr-data.net:4317 뉴렐릭 OTLP 엔드포인트를 사용하세요(EU 지역의 경우 https://otlp.eu01.nr-data.net:4317 사용). 다른 엔드포인트 및 설정 옵션은 OTLP 엔드포인트 구성을 참조하세요.

잔류 에이전트는 코드 변경이 전혀 없는 기본 Kafka 측정, 캡처 기능을 제공합니다.

  • 요청 지연시간
  • 처리량 지표
  • 오류율
  • 분산 추적

고급 설정에 대해서는 Kafka 측정, 로그 문서를 참조하세요.

6단계: (선택 사항) Kafka 브로커 로그 전달

호스트에서 Kafka 브로커 로그를 수집하여 뉴렐릭으로 전송하려면 OpenTelemetry Collector 에서 파일 로그 수신기를 구성하십시오.

데이터 찾기

몇 분 후, Kafka 창이 뉴렐릭에 나타날 것입니다. 뉴렐릭 UI 의 다양한 보기에서 Kafka 범위를 탐색하는 방법에 대한 자세한 지침은 "데이터 찾기" 참조하세요.

NRQL을 사용하여 데이터를 쿼리할 수도 있습니다.

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'

문제점 해결

다음 단계

Copyright © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.