Skip to main content

Connect Confluent Cloud to dbt Core

Community plugin

Some features may be limited. To contribute, refer to the source repository below.

  • Maintained by: Confluent
  • Authors: Confluent
  • GitHub repo: confluentinc/dbt-confluent
  • PyPI package: dbt-confluent
  • Slack channel: n/a
  • Supported dbt Core version: v1.11.0 and newer
  • dbt support: Not Supported
  • Minimum data platform version: n/a

Installing dbt-confluent

Use pip to install the adapter. Use the following command for installation:

python -m pip install dbt-confluent

Configuring dbt-confluent

For Confluent Cloud-specific configuration, please refer to Confluent Cloud configs.

Connecting to Confluent Cloud with dbt-confluent

Use the dbt-confluent adapter to connect to Confluent Cloud for Apache Flink, a fully managed stream processing service. The adapter deploys dbt models as Flink SQL statements that run continuously on Confluent Cloud.

Prerequisites

Installation

Install dbt-confluent from PyPI:

pip install dbt-confluent

Configuring your profile

Add the following configuration to your profiles.yml file to define Confluent Cloud targets.

~/.dbt/profiles.yml
my_confluent_project:
target: dev
outputs:
dev:
type: confluent
cloud_provider: aws
cloud_region: us-east-1
organization_id: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
compute_pool_id: lfcp-xxxxx
environment_id: env-xxxxx
dbname: my_kafka_cluster
flink_api_key: "{{ env_var('CONFLUENT_FLINK_API_KEY') }}"
flink_api_secret: "{{ env_var('CONFLUENT_FLINK_API_SECRET') }}"
threads: 1

Description of profile fields

FieldRequiredDescription
typeYesMust be set to confluent.
cloud_providerYesThe cloud provider for your Confluent Cloud environment (aws, azure, or gcp).
cloud_regionYesThe cloud region for your environment (for example, us-east-1, us-west-2).
organization_idYesYour Confluent Cloud organization ID (UUID format). Find this in the Confluent Cloud Console under Settings.
compute_pool_idYesThe ID of the Flink compute pool to run statements on (for example, lfcp-xxxxx).
environment_idYesThe Confluent Cloud environment ID (for example, env-xxxxx). Maps to the Flink SQL catalog.
dbnameYesThe Kafka cluster name within the environment. Maps to the Flink SQL database.
flink_api_keyYesA Flink API key for authenticating with Confluent Cloud. Use env_var to avoid storing secrets in plain text.
flink_api_secretYesThe corresponding Flink API secret. Use env_var to avoid storing secrets in plain text.
execution_modeNoSets the default execution mode for statements. One of: streaming_query (default), streaming_ddl, snapshot, snapshot_ddl.
statement_name_prefixNoAdds a prefix to each Flink statement name. Default: dbt-.
statement_labelNoApplies a label to all Flink statements so you can filter them in the Confluent Cloud Console. Default: dbt-confluent.
threadsNoSets how many models dbt runs concurrently. Default: 1.
Loading table...

Understanding Confluent Cloud concepts

In Confluent Cloud for Apache Flink, dbt concepts map to Flink SQL as follows:

dbt conceptConfluent Cloud / Flink SQL concept
database (via environment_id)Flink SQL catalog (Confluent Cloud environment)
schema (via dbname)Flink SQL database (Kafka cluster)
modelFlink SQL statement
tableKafka topic with a schema
Loading table...

Environment variables

Use environment variables to configure your profile, especially in CI/CD pipelines:

export CONFLUENT_FLINK_API_KEY=your-api-key
export CONFLUENT_FLINK_API_SECRET=your-api-secret

Then reference them in profiles.yml using the env_var Jinja function:

flink_api_key: "{{ env_var('CONFLUENT_FLINK_API_KEY') }}"
flink_api_secret: "{{ env_var('CONFLUENT_FLINK_API_SECRET') }}"

Was this page helpful?

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.

0
Loading