Confluent Cloud configurations
The dbt-confluent adapter supports the following materializations and configurations for building models on Confluent Cloud for Apache Flink. For more detailed documentation of how to get started, visit the dbt documentation on Confluent Cloud.
Materializations
| Loading table... |
Unsupported materializations
table: Not officially supported. Coming soon.materialized_view: Not supported. Usestreaming_tableinstead.incremental: Not supported. dbt's batch-incremental semantics do not map to Flink's continuous processing model. Usestreaming_tableinstead.snapshot: Not supported. Flink SQL lacks the batch operations (MERGE,UPDATE) required by dbt snapshots.
Materialization-specific configurations
streaming_table
The streaming_table materialization creates a Kafka-topic-backed table and a continuously running INSERT INTO statement.
{{
config(
materialized='streaming_table',
with={
'changelog.mode': 'upsert',
'kafka.retention.time': '7 d'
}
)
}}
SELECT
order_id,
customer_id,
total_amount
FROM {{ ref('raw_orders') }}
WHERE status = 'completed'
with config
| Loading table... |
streaming_source
The streaming_source materialization creates a table backed by a connector. The connector config is required. The model SQL defines the column definitions (rather than a SELECT query). In Confluent Cloud, valid connector values include faker (mock data generation) and external table connectors for AI search. See the Confluent connector catalog and Flink CREATE TABLE documentation for available connectors and options.
{{
config(
materialized='streaming_source',
connector='faker',
with={
'rows-per-second': '1',
'number-of-rows': '100',
'changelog.mode': 'append',
}
)
}}
order_id BIGINT,
price DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,
PRIMARY KEY(order_id) NOT ENFORCED
connector config
| Loading table... |
Stateful behavior and --full-refresh
Confluent Cloud Flink SQL tables are stateful, long-running resources. The streaming_table and streaming_source materializations behave differently from traditional batch-oriented dbt materializations:
- First run: The table is created and (for
streaming_table) a continuously runningINSERT INTOstatement begins populating it. - Subsequent runs without
--full-refresh: If the table already exists, the adapter compares the existing column names, data types, andWITHoptions against the model. If nothing has drifted, the run skips the model to avoid dropping a table that has accumulated state or has downstream consumers. If drift is detected, the run fails with a compilation error. Drift detection can be disabled per model withconfig(on_schema_drift='ignore'). - Runs with
--full-refresh: The existing table is dropped and recreated from scratch, reprocessing all data.
Use --full-refresh when you need to change a table's schema, modify WITH options, or reprocess data from the beginning:
dbt run --full-refresh --select my_streaming_model
Known limitations
- No schema management: The adapter cannot create or drop schemas (Kafka clusters) or databases (environments). These must be managed in Confluent Cloud.
- No table renames:
ALTER TABLE RENAMEis not supported in Flink SQL. - Non-transactional: Confluent Cloud Flink SQL does not support transactions.
BEGINandCOMMITare no-ops. - Seeds require
full_refresh: The adapter setsfull_refresh: truefor seeds by default.
Was this page helpful?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.