Skip to main content
Apache Airflow® is a platform created by the community to programmatically author, schedule, and monitor workflows. A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run. You declare a DAG in a Python file in the $AIRFLOW_HOME/dags folder of your Airflow instance. This page shows you how to use a Python connector in a DAG to integrate Apache Airflow with a .

Prerequisites

To follow the steps on this page:
  • Create a target with the Real-time analytics capability enabled.

    You need your connection details. This procedure also works for .
This example DAG uses the company table you create in Optimize time-series data in hypertables

Install python connectivity libraries

To install the Python libraries required to connect to :
  1. Enable connections between Airflow and
    pip install psycopg2-binary
    
  2. Enable connection types in the Airflow UI
    pip install apache-airflow-providers-postgres
    

Create a connection between Airflow and your

In your Airflow instance, securely connect to your :
  1. Run Airflow On your development machine, run the following command:
    airflow standalone
    
    The username and password for Airflow UI are displayed in the standalone | Login with username line in the output.
  2. Add a connection from Airflow to your
    1. In your browser, navigate to localhost:8080, then select Admin > Connections.
    2. Click + (Add a new record), then use your connection info to fill in the form. The Connection Type is Postgres.

Exchange data between Airflow and your

To exchange data between Airflow and your :
  1. Create and execute a DAG To insert data in your from Airflow:
    1. In $AIRFLOW_HOME/dags/timescale_dag.py, add the following code:
      from airflow import DAG
      from airflow.operators.python_operator import PythonOperator
      from airflow.hooks.postgres_hook import PostgresHook
      from datetime import datetime
      
      def insert_data_to_timescale():
          hook = PostgresHook(postgres_conn_id='the ID of the connenction you created')
          conn = hook.get_conn()
          cursor = conn.cursor()
          """
            This could be any query. This example inserts data into the table
            you create in:
      
            https://docs.tigerdata.com/getting-started/latest/try-key-features-timescale-products/#optimize-time-series-data-in-hypertables
           """
          cursor.execute("INSERT INTO crypto_assets (symbol, name) VALUES (%s, %s)",
           ('NEW/Asset','New Asset Name'))
          conn.commit()
          cursor.close()
          conn.close()
      
      default_args = {
          'owner': 'airflow',
          'start_date': datetime(2023, 1, 1),
          'retries': 1,
      }
      
      dag = DAG('timescale_dag', default_args=default_args, schedule_interval='@daily')
      
      insert_task = PythonOperator(
          task_id='insert_data',
          python_callable=insert_data_to_timescale,
          dag=dag,
      )
      
      This DAG uses the company table created in Create regular tables for relational data.
    2. In your browser, refresh the Airflow UI.
    3. In Search DAGS, type timescale_dag and press ENTER.
    4. Press the play icon and trigger the DAG: daily eth volume of assets
  2. Verify that the data appears in
    1. In , navigate to your service and click SQL editor.
    2. Run a query to view your data. For example: SELECT symbol, name FROM company;. You see the new rows inserted in the table.
You have successfully integrated Apache Airflow with and created a data pipeline.