Criando orquestrações de Dados com Airflow e banco de dados DuckDb.
Primeiros passos:
Usando Astro CLI para criar um conteiner Docker com Airflow.
- O comando astro dev init inicializa um novo projeto Astronomer, criando a estrutura mínima de um projeto Airflow local (dags/, Dockerfile, requirements.txt, etc.).
astro dev init
- O Comando astro dev start Sobe o ambiente Airflow completo em Docker Compose (com Scheduler, Webserver, Triggerer, Postgres e Flower).
astro dev start
Os dois comandos acima são um atalho para você rodar e gerenciar um Airflow local usando Docker. Te fazendo economizar tempo com configuração de airflow.
Este projeto define uma DAG no Airflow que realiza a migração incremental de dados de uma tabela no PostgreSQL para o DuckDB, utilizando um operador customizado.
O objetivo é automatizar a sincronização de dados entre PostgreSQL e DuckDB, garantindo que apenas novos registros sejam inseridos no DuckDB a cada execução.
- Schedule: A cada 5 minutos (
*/5 * * * *) - Catchup: Desativado (
catchup=False) - Task:
postgres_to_duckdbusando o operador customizadoPostgresToDuckDBOperator
postgres_schema: Schema da tabela de origem no PostgreSQLpostgres_table_name: Nome da tabela de origem no PostgreSQL e destino no DuckDBduckdb_conn_id: ID da conexão Airflow para DuckDBpostgres_conn_id: ID da conexão Airflow para PostgreSQL
- Conecta ao DuckDB e ao PostgreSQL via conexões do Airflow
- Instala e carrega o módulo
postgresno DuckDB - Cria a tabela no DuckDB se não existir, copiando todos os dados do PostgreSQL
- Realiza migração incremental inserindo apenas os registros novos (com base em
created_at) - Loga a operação ao final
-
Configurar as conexões do Airflow:
motherduck_conn→ DuckDBpostgres_conn→ PostgreSQL
-
Colocar os arquivos:
duckdb_operator.py→ DAGpostgres_to_duckdb_operator.py→ operador customizado
-
Iniciar o Airflow e a DAG será executada automaticamente a cada 5 minutos.
- Certifique-se de que o módulo
postgresdo DuckDB está disponível - A sincronização incremental depende da coluna
created_atna tabela do PostgreSQL