'Airflow 2.0.1: Pod Template Override not working as expected for KubernetesExecutor
Setup: Airflow 2.0.1 with Kubernetes 1.18 and Python 3.8, Kubernetes Client: 18.17.x
Pod template file:
apiVersion: v1
kind: Pod
metadata:
name: workerPod
spec:
containers:
- args: []
command: []
env:
- name: <Key>
value: "<value>"
envFrom: []
name: base
image: "<image_name>"
imagePullSecrets: [name: "<image_pull_secrets>"]
imagePullPolicy: "Always"
ports: []
volumeMounts:
- mountPath: "<path>"
name: "<name>"
The default config set in airflow.cfg is as follows:
[kubernetes]
pod_template_file = <path to template file>
worker_container_repository = <base-default-image>
worker_container_tag = <tag>
namespace = airflow
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = False
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
dags_in_image = True
dags_volume_mount_point = <volume-mount-point>
image_pull_secrets = <default-pull-secrets>
The problem is that, while certain keys are being read correctly from the pod_template_file, for instance, I can see all the env variables
be set correctly as well as imagePullPolicy
being read correctly as well (validated by overriding value of imagePullPolicy: "Always"
from imagePullPolicy: "IfNotPresent"
), but the key for imagePullSecrets
is not being read correctly. I can validate this, as I get a Base credentials not provided
error when the image is being pulled from the ecr repo. I have validated that the credentials are correct and I can create a pod when trying to do so explicitly.
Even when trying to set the imagePullSecrets
in the airflow.cfg
directly, I still end up getting the same error.
I have also tried to create the pod override using the V1 api explicitly as follows:
start_task = PythonOperator(
task_id=<start_task_id>, python_callable=<start_task_callabel>, op_args=[<args>], dag=dag,
executor_config={
"pod_template_file": "<path_to_template>",
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
image="<image_override>",
image_pull_policy="<pull_policy>"
),
],
image_pull_secrets=[k8s.V1LocalObjectReference('<image_pull_secrets>')],
)
),
},
)
In this case I can get the docker image to be used downloaded correctly without any authentication errors. But unfortunately, the pod throws an error: AttributeError: 'V1Container' object has no attribute '_startup_probe'
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/dist-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/cli.py", line 89, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 234, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 120, in _run_task_by_local_task_job
run_job.run()
File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/base_job.py", line 237, in run
self._execute()
File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/local_task_job.py", line 84, in _execute
if not self.task_instance.check_and_change_state_before_execution(
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1029, in check_and_change_state_before_execution
session.commit()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 1046, in commit
self.transaction.commit()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 504, in commit
self._prepare_impl()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2540, in flush
self._flush(objects)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2682, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.raise_(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2642, in _flush
flush_context.execute()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute
persistence.save_obj(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 230, in save_obj
_emit_update_statements(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 885, in _emit_update_statements
for (
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 626, in _collect_update_commands
state.manager[propkey].impl.is_equal(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/sql/sqltypes.py", line 1738, in compare_values
return x == y
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 221, in __eq__
return self.to_dict() == other.to_dict()
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 196, in to_dict
result[attr] = value.to_dict()
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1004, in to_dict
result[attr] = list(map(
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1005, in <lambda>
lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 660, in to_dict
value = getattr(self, attr)
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 458, in startup_probe
return self._startup_probe
AttributeError: 'V1Container' object has no attribute '_startup_probe'
Solution 1:[1]
I had a similar issue. The problem was that we changed our airflow containers and upgraded the Kubernetes library in the new containers. There is not necessarily an issue with the new Kubernetes library but Airflow had serialized some objects (in our case TaskInstance, seems also to be the case in your case according to the shared backtrace) and it deserializes it and makes a Python object from it. So in your case it recreates a V1Container
object from the serialized form it had. The new object in your case is structured in Python like this which has an attribute _startup_probe set in its initializer. But the serialized version doesn't have that attribute so it seems to be a version prior to this commit. It seems the deserialization does not cause issues but whenever the to_dict method is used issues will arise. In your case it is used to do comparison (eq) for me it was upon logging as repr uses it.
Airflow Slack community pointed me to this change which should resolve this issue. I haven't been able to test this yet but sharing this here already in case someone hits it.
Solution 2:[2]
I had a similar issue, which started occuring when we updated our airflow version. the problem was that we were installing an older version of kubernetes which was incompatible with the latest airflow, while creating the custom kubernetes container(in the dockerfile). Using the latest version of kubernetes in the dockerfile fixed the issue.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | zoro_uchia |