'Apache Airflow: How to template Volumes in DockerOperator using Jinja Templating
I want to convey list of volumes into DockerOperator using Jinja template:
hard coded volumes works fine:
volumes=['first:/dest', 'second:/sec_destination']
however following jinja template does not work:
volumes=[f"{{{{ ti.xcom_pull(task_ids='my_task', key='dockerVolumes') }}}}"] 500 Server Error: Internal Server Error ("invalid mode: /sec_destination')")
I found workaround that is acceptable for me however is not perfect:
acceptable only for cases where volues would have always 2 elements
volumes=[f"{{{{ ti.xcom_pull(task_ids='my_task', key='dockerVolumes')[0] }}}}", f"{{{{ ti.xcom_pull(task_ids='my_task', key='dockerVolumes')[1] }}}}"]
Solution 1:[1]
In order to provide a value of a field by template, that field must be part of template_fields. docker operator does not have volume as template_fields that is why you cannot set it via jinja2.
The solution for this is to extend DockerOperator and include volume as template_fields.
Solution 2:[2]
Another solution is writing your own ninja filter (for spliting pulled string from xcom) and add it as elem of 'user_defined_filters' in DAG object initialization.
Solution 3:[3]
For anyone who is using airflow >= 2.0.0
volumes
parameter was deprecated in favor of mounts
which is a list of docker.types.Mount
. Fortunately, airflow evaluates templates recursively, which means that every object with template_parameters
that is a value of any field in template_fields
of the parent object will be evaluated as well. So in order to evaluate docker.types.Mount
fields we need to do two things:
- Add
mounts
toDockerOperator.template_fields
- Add
template_fields = (<field_name_1>, ..., <field_name_n>)
to everydocker.types.Mount
.
So to template source
, target
, and type
parameters in the DockerOperator
subclass you can implement it the following way:
class DockerOperatorExtended(DockerOperator):
template_fields = (*DockerOperator.template_fields, 'mounts')
def __init__(self, **kwargs):
mounts = kwargs.get('mounts', [])
for mount in mounts:
mount.template_fields = ('Source', 'Target', 'Type')
kwargs['mounts'] = mounts
super().__init__(**kwargs)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Relic16 |
Solution 2 | PumpR |
Solution 3 | alexsuh |