Connect Confluent Cloud to dbt Core
Some features may be limited. To contribute, refer to the source repository below.
- Maintained by: Confluent
- Authors: Confluent
- GitHub repo: confluentinc/dbt-confluent
- PyPI package:
dbt-confluent - Slack channel: n/a
- Supported dbt Core version: v1.11.0 and newer
- dbt support: Not Supported
- Minimum data platform version: n/a
Installing dbt-confluent
Use pip to install the adapter. Use the following command for installation:
python -m pip install dbt-confluent
Configuring dbt-confluent
For Confluent Cloud-specific configuration, please refer to Confluent Cloud configs.
Connecting to Confluent Cloud with dbt-confluent
Use the dbt-confluent adapter to connect to Confluent Cloud for Apache Flink, a fully managed stream processing service. The adapter deploys dbt models as Flink SQL statements that run continuously on Confluent Cloud.
Prerequisites
- A Confluent Cloud account
- A Flink compute pool in your environment
- A Flink API key for a service account with appropriate RBAC permissions
- Python 3.10 or later
Installation
Install dbt-confluent from PyPI:
pip install dbt-confluent
Configuring your profile
Add the following configuration to your profiles.yml file to define Confluent Cloud targets.
my_confluent_project:
target: dev
outputs:
dev:
type: confluent
cloud_provider: aws
cloud_region: us-east-1
organization_id: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
compute_pool_id: lfcp-xxxxx
environment_id: env-xxxxx
dbname: my_kafka_cluster
flink_api_key: "{{ env_var('CONFLUENT_FLINK_API_KEY') }}"
flink_api_secret: "{{ env_var('CONFLUENT_FLINK_API_SECRET') }}"
threads: 1
Description of profile fields
| Loading table... |
Understanding Confluent Cloud concepts
In Confluent Cloud for Apache Flink, dbt concepts map to Flink SQL as follows:
| Loading table... |
Environment variables
Use environment variables to configure your profile, especially in CI/CD pipelines:
export CONFLUENT_FLINK_API_KEY=your-api-key
export CONFLUENT_FLINK_API_SECRET=your-api-secret
Then reference them in profiles.yml using the env_var Jinja function:
flink_api_key: "{{ env_var('CONFLUENT_FLINK_API_KEY') }}"
flink_api_secret: "{{ env_var('CONFLUENT_FLINK_API_SECRET') }}"
Was this page helpful?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.