Airflow 과의 dbt Cloud 통합은 dbt Cloud 작업 및 리소스의 상태를 모니터링하여 실행, 모델 또는 테스트 실패와 같은 문제를 식별하는 데 도움이 됩니다.
이 통합은 Apache Airflow 에서 실행되며 그렇게 하도록 구성된 경우 실패한 테스트에 대해 Snowflake (를) 쿼리합니다.
전제 조건
- API가 활성화되고 Snowflake 데이터베이스로 사용하는 dbt Cloud 계정입니다.
- dbt Cloud 계정이 실행되는 Snowflake 계정에 액세스합니다.
- 기존 Airflow 환경 버전 2.8.1 이상 또는 Docker Compose 실행 기능.
통합 설치
다음 중 한 가지 방법으로 Airflow (를) 사용하여 뉴렐릭 dbt Cloud 통합을 설치할 수 있습니다.
- 기존 Airflow 환경에 설치합니다. 이는 운영 환경에 권장됩니다.
- docker Compose를 사용하여 설치. 이는 빠른 POC에 적합합니다.
해당 탭을 클릭하여 귀하의 요구에 가장 적합한 옵션을 선택하십시오:
Snowflake 공급자가 있는지 확인한 후 다음 명령을 실행하여 newrelic-dbt-cloud-integration
저장소를 복제합니다.
$pip install apache-airflow-providers-snowflake>=3.0.0
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
airflow/dags
의 콘텐츠를 Airflow dags 폴더의 루트에 복사합니다.
DAG에 필요한 5개의 Airflow 연결을 생성합니다. 다음 표에는 연결 이름과 이를 설정하는 데 필요한 정보가 나와 있습니다. 이 모든 것에 대해 유형은 http
입니다.
연결 이름 | 설명 | 유형 | 호스트 및 비밀번호 | |
---|---|---|---|---|
|
|
| 호스트: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT\_ID/ ( 비밀번호: 귀하의 dbt Cloud API 토큰(프로필 설정) 또는 서비스 계정 토큰 | |
| dbt 검색 API에 연결할 수 있습니다. |
| 호스트: https://metadata.cloud.getdbt.com/graphql 비밀번호: Dbt 클라우드 서비스 계정 | |
| 커스텀 대시보드를 뉴렐릭에 업로드할 수 있습니다. |
| 호스트: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT\_ID/events ( 비밀번호: NR 인사이트 삽입 API 키 | |
| 쿼리 쿠렐릭 맞춤형 대시보드를 사용할 수 있습니다. |
| 호스트: https://insights-api.newrelic.com/v1/accounts/ACCOUNT\_ID/query ( 비밀번호: NR 인사이트 쿼리 API 키 |
위의 네 가지를 구성한 후에는 Snowflake 연결을 구성해야 합니다. Snowflake를 사용하면 실패한 테스트 행을 쿼리할 수 있습니다. 눈송이 연결을 구성하는 방법에는 여러 가지 가 있습니다. 개인 키 쌍을 사용하여 구성하려면 다음 속성을 입력하십시오.
Type
: 눈송이Login
: 귀하의 Snowflake 사용자 이름Account
: 귀하의 Snowflake 계정Warehouse
: 당신의 눈송이 창고Role
: 귀하의 Snowflake 역할입니다. 실패한 모든 테스트 행을 가져오려면 역할에 dbt Cloud에서 사용되는 모든 DB에 대한 액세스 권한이 있어야 합니다.Private Key Text
: 이 연결에 사용되는 전체 개인 키입니다.Password
: 암호화된 경우 개인 키에 대한 암호 문구입니다. 암호화되지 않은 경우 비어 있습니다.
new_relic_data_pipeline_observability_get_dbt_run_metadata2
DAG를 활성화하여 설정을 완료합니다.
다음 명령어를 실행하여 newrelic-dbt-cloud-integration
저장소를 클론합니다.
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
그런 다음 Airflow 디렉터리로 cd
.
$cd newrelic-dbt-cloud-integration/airflow
그런 다음 다음 명령을 실행하여 docker compose를 초기화하고 실행합니다.
$docker-compose up airflow-init
$docker-compose up
Airflow UI: http://localhost:8080
DAG에 필요한 5개의 Airflow 연결을 생성합니다. 다음 표에는 연결 이름과 이를 설정하는 데 필요한 정보가 나와 있습니다. 이 모든 것에 대해 유형은 http
입니다.
연결 이름 | 설명 | 유형 | 호스트 및 비밀번호 | |
---|---|---|---|---|
|
|
| 호스트: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT\_ID/ ( 비밀번호: 귀하의 dbt Cloud API 토큰(프로필 설정) 또는 서비스 계정 토큰 | |
| dbt 검색 API에 연결할 수 있습니다. |
| 호스트: https://metadata.cloud.getdbt.com/graphql 비밀번호: Dbt 클라우드 서비스 계정 | |
| 커스텀 대시보드를 뉴렐릭에 업로드할 수 있습니다. |
| 호스트: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT\_ID/events ( 비밀번호: NR 인사이트 삽입 API 키 | |
| 쿼리 쿠렐릭 맞춤형 대시보드를 사용할 수 있습니다. |
| 호스트: https://insights-api.newrelic.com/v1/accounts/ACCOUNT\_ID/query ( 비밀번호: NR 인사이트 쿼리 API 키 |
위의 네 가지를 구성한 후에는 Snowflake 연결을 구성해야 합니다. Snowflake를 사용하면 실패한 테스트 행을 쿼리할 수 있습니다. 눈송이 연결을 구성하는 방법에는 여러 가지 가 있습니다. 개인 키 쌍을 사용하여 구성하려면 다음 속성을 입력하십시오.
Type
: 눈송이Login
: 귀하의 Snowflake 사용자 이름Account
: 귀하의 Snowflake 계정Warehouse
: 당신의 눈송이 창고Role
: 귀하의 Snowflake 역할입니다. 실패한 모든 테스트 행을 가져오려면 역할에 dbt Cloud에서 사용되는 모든 DB에 대한 액세스 권한이 있어야 합니다.Private Key Text
: 이 연결에 사용되는 전체 개인 키입니다.Password
: 암호화된 경우 개인 키에 대한 암호 문구입니다. 암호화되지 않은 경우 비어 있습니다.
new_relic_data_pipeline_observability_get_dbt_run_metadata2
DAG를 활성화하여 설정을 완료합니다.
데이터 찾기
이 통합은 세 가지 사용자 정의 대시보드를 생성하고 뉴렐릭에 보고합니다.
DAG 설정
사이:
이 DAG는 설정 없이 있는 그대로 실행되도록 만들어졌습니다. 동시에, 귀하의 회사에는 연결에 대한 고유한 명명 규칙이 있을 수 있다는 것을 알고 있습니다. 따라서 dag_config.yml
내부에는 다양한 연결의 이름을 설정할 수 있는 간단한 구성이 있습니다.
connections: dbt_cloud_admin_api: dbt_cloud_admin_api dbt_cloud_discovery_api: dbt_cloud_discovery_api nr_insights_query: nr_insights_query nr_insights_insert: nr_insights_insert snowflake_api: SNOWFLAKE
팀 실행:
dbt 작업은 다른 팀이 소유할 수 있지만 dbt Cloud 내에서는 이를 설정할 수 있는 장소가 없습니다. Python 코드를 사용하여 팀을 동적으로 설정할 수 있습니다. 자신만의 코드를 작성하려면 airflow/dags/nr_utils/nr_utils.py
수정하고 get_team_from_run()
에 필요한 로직을 넣으세요. 해당 함수에 전달된 실행 데이터는 다음 속성에 액세스할 수 있습니다.
- 프로젝트_이름
- 환경_이름
- 실행을 위한 dbt Cloud v2 API 에 나열된 모든 필드입니다. 모든 속성 앞에는 "run_"이 붙습니다.
다음은 예제 함수입니다:
def get_team_from_run(run: dict) -> str: team = 'Data Engineering' if run['project_id'] == '11111' and run['environment_id'] in ['55555', '33333']: team = 'Platform' if re.match(r'Catch-all', run['job_name']): team = 'Project Catch All' return team
Dbt 프로젝트 설정
Dbt 프로젝트 내에서 메타 구성을 사용하여 추가 팀 및 테스트별 설정을 지정할 수 있습니다.
Team
: 작업을 소유한run_team determines
동안 테스트 및 모델과 같은 실패한 리소스에 대한 공지 공지를 받기 위해 업스트림 또는 다운스트림 팀이 필요한 경우가 있습니다. 팀을 구성하는 것은 우리가 그렇게 하는 데 도움이 됩니다.alert_failed_test_rows
:True
로 설정하면 실패한 테스트에 대한 쿼리를 실행하고 최대 처음 10개 열을 뉴렐릭으로 보내는 실패한 테스트 행이 활성화됩니다.failed_test_rows_limit
: 뉴렐릭으로 보낼 실패한 테스트 행의 최대 개수입니다. 뉴렐릭에 부당한 금액을 보내는 상황을 방지하기 위해 행 100개로 하드 코딩된 제한이 있습니다.slack_mentions
: Slack 알림을 활성화한 경우 이 필드를 사용하면 메시지에서 언급할 사람을 설정할 수 있습니다.
dbt_project.yml
에서 이를 설정하면 팀이 '데이터 엔지니어링'으로 설정되고 실패한 테스트 행이 활성화됩니다.
models: dbt_fake_company: +meta: nr_config: team: 'Data Engineering' alert_failed_test_rows: False failed_test_rows_limit: 5 slack_mentions: '@channel, @business_users'
메시지라는 또 다른 속성을 리소스에 추가할 수 있습니다. 다음 설정에서는 실패한 특정 테스트에 대해 파트너 비즈니스 팀에 알림을 보낼 수 있습니다. 또한 실패한 테스트 행 자체에 대한 알림을 설정할 수도 있습니다.
models: - name: important_business_model tests: - some_custom_test: config: meta: nr_config: team: 'Upstream Business Team' alert_failed_test_rows: true failed_test_rows_limit: 10 slack_mentions: '@channel, @business_user1, @engineer1' message: 'Important business process produced invalid data. Please check X tool'
문제점 해결
다양한 버전의 Airflow와 다양한 버전의 공급자를 결합하면 주요 변경 사항이 발생할 수 있습니다. 경우에 따라 Airflow 환경의 특정 버전과 일치하도록 코드를 수정해야 할 수도 있습니다. 우리는 Github 저장소 에서 알려진 문제를 추적합니다.