Variables are a generic way to store and retrieve arbitrary content or settings as a simple key value store within Airflow.
While your pipeline code definition and most of your constants and variables should be defined in code and stored in source control, it can be useful to have some variables or configuration items accessible and modifiable through the UI.
It can also be used as a context for different environments.
There are 3 ways to create variables
- UI
- CLI
- code
UI
From the UI, we can navigate to Admin-Variables to manage.
CLI
From the CLI, we can use following commands
1 | airflow variables -g key |
Code
No matter where we setup a variable, in the end we want to read variables in a DAG so that we can easily change the context of a DAG run.
There are two ways to read variables in a DAG
Python Code
1
2
3
4
5from airflow.models import Variable
Variable.set("foo", "value")
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)
baz = Variable.get("baz", default_var=None)Jinja template
You can use a variable from a jinja template with the syntax, such as a bash operator command:1
echo {{ var.value.<variable_name> }}
or if you need to deserialize a json object from the variable :
1
echo {{ var.json.<variable_name> }}
Best practice
You should avoid usage of Variables outside an operator’s execute() method or Jinja templates if possible, as Variables create a connection to metadata DB of Airflow to fetch the value, which can slow down parsing and place extra load on the DB.
Variables will create db connection every time scheduler parses a DAG
Example to understand best practice
Let’s set variable env=dev from CLI
1
$ airflow variables -s env dev
Create a DAG
1
2
3
4
5
6
7
8
9from airflow.models import Variable
env = Variable.get("env")
print('' if env is None else env + 'parse time')
with dag:
os_operator = PythonOperator(task_id = "os_operator", python_callable=print_env)
jinja_operator = BashOperator(task_id="get_variable_value", bash_command='echo {{ var.value.env }} ')Running explaination
When the scheduler parses the DAG, which shall happen every a few seconds, we will find devparse time in the log
When the DAG is scheduled, we will see bashoperator print the dev variables1
2
3
4
5
6
7//os_operator
[2020-04-08 14:56:50,752] {{logging_mixin.py:112}} INFO - devexecution time
...
//get_variable_value
[2020-04-08 14:56:59,133] {{bash_operator.py:115}} INFO - Running command: echo dev
[2020-04-08 14:56:59,151] {{bash_operator.py:122}} INFO - Output:
[2020-04-08 14:56:59,158] {{bash_operator.py:126}} INFO - dev