'Airflow 2.0.1: Pod Template Override not working as expected for KubernetesExecutor

Setup: Airflow 2.0.1 with Kubernetes 1.18 and Python 3.8, Kubernetes Client: 18.17.x

Pod template file:

apiVersion: v1
kind: Pod
metadata:
  name: workerPod

spec:
  containers:
    - args: []
      command: []
      env:
        - name: <Key>
          value: "<value>"
      envFrom: []
      name: base
      image: "<image_name>"
      imagePullSecrets: [name: "<image_pull_secrets>"]
      imagePullPolicy: "Always"
      ports: []
      volumeMounts:
        - mountPath: "<path>"
          name: "<name>"

The default config set in airflow.cfg is as follows:

[kubernetes]
pod_template_file = <path to template file>
worker_container_repository = <base-default-image>
worker_container_tag = <tag>
namespace = airflow
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = False
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
dags_in_image = True
dags_volume_mount_point = <volume-mount-point>
image_pull_secrets = <default-pull-secrets>

The problem is that, while certain keys are being read correctly from the pod_template_file, for instance, I can see all the env variables be set correctly as well as imagePullPolicy being read correctly as well (validated by overriding value of imagePullPolicy: "Always" from imagePullPolicy: "IfNotPresent"), but the key for imagePullSecrets is not being read correctly. I can validate this, as I get a Base credentials not provided error when the image is being pulled from the ecr repo. I have validated that the credentials are correct and I can create a pod when trying to do so explicitly.

Even when trying to set the imagePullSecrets in the airflow.cfg directly, I still end up getting the same error.

I have also tried to create the pod override using the V1 api explicitly as follows:

start_task = PythonOperator(
            task_id=<start_task_id>, python_callable=<start_task_callabel>, op_args=[<args>], dag=dag,
            executor_config={
                "pod_template_file": "<path_to_template>",
                "pod_override": k8s.V1Pod(
                    spec=k8s.V1PodSpec(
                        containers=[
                            k8s.V1Container(
                                name="base",
                                image="<image_override>",
                                image_pull_policy="<pull_policy>"
                            ),
                        ],
                        image_pull_secrets=[k8s.V1LocalObjectReference('<image_pull_secrets>')],
                    )
                ),
            },
        )

In this case I can get the docker image to be used downloaded correctly without any authentication errors. But unfortunately, the pod throws an error: AttributeError: 'V1Container' object has no attribute '_startup_probe'

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/dist-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 234, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 120, in _run_task_by_local_task_job
    run_job.run()
  File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/base_job.py", line 237, in run
    self._execute()
  File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/local_task_job.py", line 84, in _execute
    if not self.task_instance.check_and_change_state_before_execution(
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1029, in check_and_change_state_before_execution
    session.commit()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 1046, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 504, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2540, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2682, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2642, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute
    persistence.save_obj(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 230, in save_obj
    _emit_update_statements(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 885, in _emit_update_statements
    for (
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 626, in _collect_update_commands
    state.manager[propkey].impl.is_equal(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/sql/sqltypes.py", line 1738, in compare_values
    return x == y
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 221, in __eq__
    return self.to_dict() == other.to_dict()
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 196, in to_dict
    result[attr] = value.to_dict()
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1004, in to_dict
    result[attr] = list(map(
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1005, in <lambda>
    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 660, in to_dict
    value = getattr(self, attr)
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 458, in startup_probe
    return self._startup_probe
AttributeError: 'V1Container' object has no attribute '_startup_probe'


Solution 1:[1]

I had a similar issue. The problem was that we changed our airflow containers and upgraded the Kubernetes library in the new containers. There is not necessarily an issue with the new Kubernetes library but Airflow had serialized some objects (in our case TaskInstance, seems also to be the case in your case according to the shared backtrace) and it deserializes it and makes a Python object from it. So in your case it recreates a V1Container object from the serialized form it had. The new object in your case is structured in Python like this which has an attribute _startup_probe set in its initializer. But the serialized version doesn't have that attribute so it seems to be a version prior to this commit. It seems the deserialization does not cause issues but whenever the to_dict method is used issues will arise. In your case it is used to do comparison (eq) for me it was upon logging as repr uses it.

Airflow Slack community pointed me to this change which should resolve this issue. I haven't been able to test this yet but sharing this here already in case someone hits it.

Solution 2:[2]

I had a similar issue, which started occuring when we updated our airflow version. the problem was that we were installing an older version of kubernetes which was incompatible with the latest airflow, while creating the custom kubernetes container(in the dockerfile). Using the latest version of kubernetes in the dockerfile fixed the issue.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 zoro_uchia