• /
  • EnglishEspañolFrançais日本語한국어Português
  • 로그인지금 시작하기

사용자의 편의를 위해 제공되는 기계 번역입니다.

영문본과 번역본이 일치하지 않는 경우 영문본이 우선합니다. 보다 자세한 내용은 이 페이지를 방문하시기 바랍니다.

문제 신고

OpenTelemetry를 사용하여 Kubernetes(Strimzi)에서 Kafka를 모니터링하세요.

OpenTelemetry Collector 구현하거나 배포하여 Strimzi 연산자를 사용하여 Kubernetes 에서 실행 중인 Kafka 클러스터를 모니터링합니다. 수집기는 Kafka 브로커를 자동으로 발견하고 포괄적인 정보를 수집합니다.

시작하기 전에

다음 사항을 확인하십시오:

Strimzi Kafka에서 JMX를 활성화합니다.

Strimzi Kafka 리소스에서 Kafka 클러스터에 JMX가 활성화되어 있는지 확인하십시오.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
spec:
kafka:
jmxOptions: {} # Enables JMX with default settings
# ...other broker configuration

1단계: 네임스페이스 생성

OpenTelemetry Collector 전용 네임스페이스를 생성하거나 기존 Kafka 네임스페이스를 사용하십시오.

bash
$
kubectl create namespace kafka

2단계: 라이선스 키를 사용하여 비밀 키를 생성합니다.

크리에이터 클러스터 키를 Kubernetes 비밀로 저장하세요.

bash
$
kubectl create secret generic nr-license-key \
>
--from-literal=NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY \
>
-n kafka

YOUR_LICENSE_KEY 실제 뉴렐릭 키로 바꾸세요.

3단계: 구현하다, 배포하다 OpenTelemetry Collector

3.1 사용자 지정 수집기 이미지 생성

Java 런타임 및 JMX 스크래퍼가 포함된 사용자 지정 OpenTelemetry Collector 이미지를 생성합니다.

중요

버전 호환성: 이 가이드에서는 JMX Scraper 1.52.0 및 OpenTelemetry Collector 0.143.1 버전을 사용합니다. 이전 수집기 버전에는 호환성 목록에 이 스크래퍼의 해시가 포함되어 있지 않을 수 있습니다. 최상의 결과를 얻으려면 이 가이드에 나와 있는 대로 최신 버전을 사용하십시오.

타겟, 목표 실행: 시스템에 맞는 올바른 바이너리를 찾으려면 OpenTelemetry Collector 릴리스 페이지를 참조하세요(예: linux_amd64, linux_arm64, darwin_amd64). Dockerfile에서 TARGETARCH 변수를 accordingly 업데이트하세요.

Dockerfile 으로 저장:

# Multi-stage build for OpenTelemetry Collector with Java support for JMX receiver
# This image bundles the OTEL Collector with Java 17 runtime and JMX scraper JAR
FROM alpine:latest as prep
# OpenTelemetry Collector Binary
ARG OTEL_VERSION=0.143.1
ARG TARGETARCH=linux_amd64
ADD "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_${TARGETARCH}.tar.gz" /otelcontribcol
RUN tar -zxvf /otelcontribcol
# JMX Scraper JAR (for JMX receiver with YAML-based configuration)
ARG JMX_SCRAPER_JAR_VERSION=1.52.0
ADD https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v${JMX_SCRAPER_JAR_VERSION}/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
# Set permissions for nonroot user (uid 65532)
ARG USER_UID=65532
RUN chown ${USER_UID} /opt/opentelemetry-jmx-scraper.jar
# Final minimal image with Java runtime
FROM openjdk:17-jre-slim
COPY --from=prep /opt/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
COPY --from=prep /otelcol-contrib /otelcol-contrib
EXPOSE 4317 4318 8888
ENTRYPOINT ["/otelcol-contrib"]
CMD ["--config", "/conf/otel-agent-config.yaml"]

이미지를 빌드하고 푸시합니다.

bash
$
docker build -t your-registry/otel-collector-kafka:latest .
$
docker push your-registry/otel-collector-kafka:latest

3.2 JMX 맞춤형 지표 ConfigMap 생성

먼저, 사용자 정의 JMX 지표 설정을 사용하여 ConfigMap을 만듭니다. jmx-kafka-config.yaml 으로 저장:

apiVersion: v1
kind: ConfigMap
metadata:
name: jmx-kafka-config
namespace: kafka
data:
jmx-kafka-config.yaml: |
---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages in per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: current heap usage
type: gauge
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

메트릭 수집 사용자 지정: kafka-jmx-config.yaml 파일에 사용자 지정 MBean 규칙을 추가하여 추가 Kafka 메트릭을 수집할 수 있습니다.

JMX ConfigMap을 적용합니다.

bash
$
kubectl apply -f jmx-kafka-config.yaml

3.3 수집기 ConfigMap 생성

OpenTelemetry Collector 설정을 포함하는 ConfigMap을 생성합니다. otel-kafka-config.yaml 으로 저장:

---
apiVersion: v1
kind: ConfigMap
metadata:
name: otel-collector-config
namespace: kafka
labels:
app: otel-collector
data:
otel-collector-config.yaml: |
receivers:
# Kafka cluster-level metrics (runs once per OTEL collector)
kafkametrics/cluster:
brokers:
- "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
protocol_version: 2.8.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
# Receiver creator for dynamic per-broker JMX receivers
receiver_creator:
watch_observers: [k8s_observer]
receivers:
# JMX receiver template (created per discovered broker pod)
jmx:
rule: type == "pod" && labels["strimzi.io/kind"] == "Kafka" && labels["strimzi.io/cluster"] == "my-cluster" && labels["strimzi.io/name"] == "my-cluster-kafka"
config:
endpoint: 'service:jmx:rmi:///jndi/rmi://`endpoint`:9999/jmxrmi'
jar_path: /opt/opentelemetry-jmx-scraper.jar
target_system: kafka
jmx_configs: /conf-jmx/jmx-kafka-config.yaml
collection_interval: 30s
# Set dynamic resource attributes from discovered pod
resource_attributes:
broker.endpoint: '`endpoint`'
exporters:
otlp:
endpoint: https://otlp.nr-data.net:4317
tls:
insecure: false
sending_queue:
num_consumers: 12
queue_size: 5000
retry_on_failure:
enabled: true
headers:
api-key: ${NEW_RELIC_LICENSE_KEY}
processors:
# Batch processor for efficiency
batch/aggregation:
send_batch_size: 1024
timeout: 30s
# Memory limiter to prevent OOM
memory_limiter:
limit_percentage: 80
spike_limit_percentage: 30
check_interval: 1s
# Detect system resources
resourcedetection:
detectors: [env, docker, system]
timeout: 5s
override: false
# Add Kafka cluster metadata
resource/kafka_metadata:
attributes:
- key: kafka.cluster.name
value: my-cluster
action: upsert
# Extract Kubernetes attributes
k8sattributes:
auth_type: serviceAccount
passthrough: false
extract:
metadata:
- k8s.pod.name
- k8s.pod.uid
- k8s.namespace.name
- k8s.node.name
labels:
- tag_name: strimzi.cluster
key: strimzi.io/cluster
from: pod
- tag_name: strimzi.kind
key: strimzi.io/kind
from: pod
# Transform metrics for New Relic UI
transform:
metric_statements:
- context: metric
statements:
# Clean up descriptions and units
- set(description, "") where description != ""
- set(unit, "") where unit != ""
- context: resource
statements:
# Extract broker.id from k8s.pod.name: my-cluster-kafka-0 -> 0 (supports multi-digit)
- set(attributes["broker.id"], ExtractPatterns(attributes["k8s.pod.name"], ".*-(?P<broker_id>\\d+)$")["broker_id"]) where attributes["k8s.pod.name"] != nil
# Remove broker.id for cluster-level metrics
transform/remove_broker_id:
metric_statements:
- context: resource
statements:
- delete_key(attributes, "broker.id")
- delete_key(attributes, "broker.endpoint")
- delete_key(attributes, "k8s.pod.name")
# Topic sum aggregation for replicas_in_sync
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [ topic ]
aggregation_type: sum
# Filter to include only cluster-level metrics
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
# Filter to exclude cluster-level metrics from broker pipeline
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
# Convert cumulative metrics to delta for New Relic
cumulativetodelta:
extensions:
# K8s observer extension
k8s_observer:
auth_type: serviceAccount
observe_pods: true
observe_nodes: false
service:
extensions: [k8s_observer]
pipelines:
# Per-broker metrics pipeline (with broker.id)
metrics/broker:
receivers:
- receiver_creator
- kafkametrics/cluster
processors:
- memory_limiter
- resourcedetection
- resource/kafka_metadata
- k8sattributes
- filter/exclude_cluster_metrics
- transform
- metricstransform/kafka_topic_sum_aggregation
- cumulativetodelta
- batch/aggregation
exporters: [otlp]
# Cluster-level metrics pipeline (without broker.id, aggregated)
metrics/cluster:
receivers:
- receiver_creator
processors:
- memory_limiter
- resourcedetection
- resource/kafka_metadata
- k8sattributes
- filter/include_cluster_metrics
- transform/remove_broker_id
- metricstransform/kafka_topic_sum_aggregation
- cumulativetodelta
- batch/aggregation
exporters: [otlp]

설정 참고 사항:

  • my-cluster-kafka-bootstrap Strimzi Kafka 서비스 이름으로 바꾸세요.
  • rulekafka.cluster.name 에서 my-cluster 클러스터 이름으로 바꾸세요.
  • 네임스페이스가 다르면 업데이트하세요. kafka
  • OTLP 엔드포인트: https://otlp.nr-data.net:4317 (미국 지역) 또는 https://otlp.eu01.nr-data.net:4317 (유럽 지역)을 사용합니다. 다른 지역의 OTLP 엔드포인트 구성 방법을 참조하세요.
  • receiver_creator 은 Strimzi 레이블을 사용하여 Kafka 브로커를 자동으로 검색합니다.

ConfigMap을 적용합니다.

bash
$
kubectl apply -f otel-kafka-config.yaml

3.4 구현하다, 배포하다 수집기

구현, 배포를 만듭니다. otel-collector-deployment.yaml 으로 저장:

apiVersion: apps/v1
kind: Deployment
metadata:
name: otel-collector
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: otel-collector
template:
metadata:
labels:
app: otel-collector
spec:
serviceAccountName: otel-collector
containers:
- name: otel-collector
image: your-registry/otel-collector-kafka:latest
env:
- name: NEW_RELIC_LICENSE_KEY
valueFrom:
secretKeyRef:
name: nr-license-key
key: NEW_RELIC_LICENSE_KEY
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "500m"
memory: "1Gi"
volumeMounts:
- name: vol-kafka-test-cluster
mountPath: /conf
- name: jmx-config
mountPath: /conf-jmx
ports:
- containerPort: 4317 # OTLP gRPC
- containerPort: 4318 # OTLP HTTP
- containerPort: 8888 # Metrics
volumes:
- name: vol-kafka-test-cluster
configMap:
name: otel-collector-config
items:
- key: otel-collector-config.yaml
path: otel-agent-config.yaml
- name: jmx-config
configMap:
name: jmx-kafka-config
items:
- key: jmx-kafka-config.yaml
path: jmx-kafka-config.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: otel-collector
namespace: kafka
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: otel-collector
rules:
- apiGroups: [""]
resources: ["pods", "nodes"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: otel-collector
subjects:
- kind: ServiceAccount
name: otel-collector
namespace: kafka
roleRef:
kind: ClusterRole
name: otel-collector
apiGroup: rbac.authorization.k8s.io

자원 설정:

  • 위의 리소스 제한은 중간 규모의 Kafka 클러스터(브로커 5~10개, 토픽 20~100개)에 적합합니다.

구현을 적용합니다.

bash
$
kubectl apply -f otel-collector-deployment.yaml

수집기가 실행 중인지 확인하십시오.

bash
$
kubectl get pods -n kafka -l app=otel-collector
$
kubectl logs -n kafka -l app=otel-collector -f

4단계: (선택 사항) 제작자 또는 소비자 참여

Kubernetes 에서 실행되는 Kafka 제작자 및 소비자 제작으로부터 로그 수준의 텔레메트리를 수집하려면 OpenTelemetry 서버 에이전트를 사용하여 이를 로그합니다.

당신의 Kafka를 편집하세요

Kafka 생산자 또는 소비자 군대를 위해 기존 구현에 OpenTelemetry 동양 에이전트를 추가하고 배포합니다.

  1. 에이전트 에이전트 다운로드: 에이전트 JAR을 다운로드하려면 init 컨테이너를 추가하세요.

    initContainers:
    - name: download-otel-agent
    image: busybox:latest
    command:
    - sh
    - -c
    - |
    wget -O /otel/opentelemetry-javaagent.jar \
    https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
    volumeMounts:
    - name: otel-agent
    mountPath: /otel
  2. 군대 에이전트 구성: 견고한 컨테이너에 환경 변수를 추가합니다.

    env:
    - name: JAVA_TOOL_OPTIONS
    value: >-
    -javaagent:/otel/opentelemetry-javaagent.jar
    -Dotel.service.name="kafka-producer"
    -Dotel.resource.attributes="kafka.cluster.name=my-cluster"
    -Dotel.exporter.otlp.endpoint="http://localhost:4317"
    -Dotel.exporter.otlp.protocol="grpc"
    -Dotel.metrics.exporter="otlp"
    -Dotel.traces.exporter="otlp"
    -Dotel.logs.exporter="otlp"
    -Dotel.instrumentation.kafka.experimental-span-attributes="true"
    -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true"
    -Dotel.instrumentation.kafka.producer-propagation.enabled="true"
    -Dotel.instrumentation.kafka.enabled="true"
    volumeMounts:
    - name: otel-agent
    mountPath: /otel
  3. 볼륨 추가: 볼륨 정의를 포함하세요:

    volumes:
    - name: otel-agent
    emptyDir: {}

바꾸다:

  • kafka-producer 귀하의 독특한 이름으로
  • my-cluster Kafka 클러스터 이름을 사용하여

위 설정은 텔메트리를 localhost:4317에서 실행되는 OpenTelemetry Collector 로 보냅니다. 이 설정을 사용하여 구현하다, 배포하다:

receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
exporters:
otlp/newrelic:
endpoint: https://otlp.nr-data.net:4317
headers:
api-key: "${NEW_RELIC_LICENSE_KEY}"
compression: gzip
timeout: 30s
service:
pipelines:
traces:
receivers: [otlp]
exporters: [otlp/newrelic]
metrics:
receivers: [otlp]
exporters: [otlp/newrelic]
logs:
receivers: [otlp]
exporters: [otlp/newrelic]

이를 통해 처리를 사용자 지정하고, 필터를 추가하거나, 여러 백앤드에게 라우팅할 수 있습니다. 다른 엔드포인트 설정에 대해서는 OTLP 엔드포인트 구성을 참조하세요.

잔류 에이전트는 코드 변경이 전혀 없는 기본 Kafka 측정, 캡처 기능을 제공합니다.

  • 요청 지연시간
  • 처리량 지표
  • 오류율
  • 분산 추적

고급 설정에 대해서는 Kafka 측정, 로그 문서를 참조하세요.

5단계: (선택 사항) Kafka 브로커 로그 전달

Kubernetes 환경에서 Kafka 브로커 로그를 수집하여 뉴렐릭으로 전송하려면 OpenTelemetry Collector 에서 파일 로그 수신기를 구성하십시오.

데이터 찾기

몇 분 후, Kafka 창이 뉴렐릭에 나타날 것입니다. 뉴렐릭 UI 의 다양한 보기에서 Kafka 범위를 탐색하는 방법에 대한 자세한 지침은 "데이터 찾기" 참조하세요.

NRQL을 사용하여 데이터를 쿼리할 수도 있습니다.

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'

문제점 해결

다음 단계

Copyright © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.