'Apache Airflow: How to template Volumes in DockerOperator using Jinja Templating

I want to convey list of volumes into DockerOperator using Jinja template:

  • hard coded volumes works fine:

    volumes=['first:/dest', 'second:/sec_destination']

  • however following jinja template does not work:

    volumes=[f"{{{{ ti.xcom_pull(task_ids='my_task', key='dockerVolumes') }}}}"] 500 Server Error: Internal Server Error ("invalid mode: /sec_destination')")

I found workaround that is acceptable for me however is not perfect:

  • acceptable only for cases where volues would have always 2 elements

    volumes=[f"{{{{ ti.xcom_pull(task_ids='my_task', key='dockerVolumes')[0] }}}}", f"{{{{ ti.xcom_pull(task_ids='my_task', key='dockerVolumes')[1] }}}}"]



Solution 1:[1]

In order to provide a value of a field by template, that field must be part of template_fields. docker operator does not have volume as template_fields that is why you cannot set it via jinja2.

The solution for this is to extend DockerOperator and include volume as template_fields.

Solution 2:[2]

Another solution is writing your own ninja filter (for spliting pulled string from xcom) and add it as elem of 'user_defined_filters' in DAG object initialization.

Solution 3:[3]

For anyone who is using airflow >= 2.0.0

volumes parameter was deprecated in favor of mounts which is a list of docker.types.Mount. Fortunately, airflow evaluates templates recursively, which means that every object with template_parameters that is a value of any field in template_fields of the parent object will be evaluated as well. So in order to evaluate docker.types.Mount fields we need to do two things:

  • Add mounts to DockerOperator.template_fields
  • Add template_fields = (<field_name_1>, ..., <field_name_n>) to every docker.types.Mount.

So to template source, target, and type parameters in the DockerOperator subclass you can implement it the following way:

class DockerOperatorExtended(DockerOperator):
    template_fields = (*DockerOperator.template_fields, 'mounts')

    def __init__(self, **kwargs):
        mounts = kwargs.get('mounts', [])
        for mount in mounts:
            mount.template_fields = ('Source', 'Target', 'Type')
        kwargs['mounts'] = mounts
        super().__init__(**kwargs)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Relic16
Solution 2 PumpR
Solution 3 alexsuh