'AsyncElasticsearch client not accepting connection with AWS Elasticsearch - AttributeError: 'AWS4Auth' object has no attribute 'encode'
I'm using AWS Elasticsearch and async elasticsearch-py package in my project to connect with the cluster.
AWS Elasticsearch Version 7.9
Python package: elasticsearch[async]==7.12.0
I'm not able to initialize the Async Elasticsearch client using the AWS4Auth
library (mentioned in official AWS ES client Python documentation)
It should successfully connect with the client. However, it gives me this error:
AttributeError: 'AWS4Auth' object has no attribute 'encode'
Sharing my code snippet:
from elasticsearch import AsyncElasticsearch, AIOHttpConnection
from requests_aws4auth import AWS4Auth
import asyncio
host = 'my-test-domain.us-east-1.es.amazonaws.com'
region = 'us-east-1'
service = 'es'
credentials = {
'access_key': "MY_ACCESS_KEY",
'secret_key': "MY_SECRET_KEY"
}
awsauth = AWS4Auth(credentials['access_key'], credentials['secret_key'], region, service)
es = AsyncElasticsearch(
hosts=[{'host': host, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=AIOHttpConnection
)
async def test():
print(await es.info())
asyncio.run(test())
Solution 1:[1]
class AWSAuthAIOHttpConnection(AIOHttpConnection):
"""Enable AWS Auth with AIOHttpConnection for AsyncElasticsearch
The AIOHttpConnection class built into elasticsearch-py is not currently
compatible with passing AWSAuth as the `http_auth` parameter, as suggested
in the docs when using AWSAuth for the non-async RequestsHttpConnection class:
https://docs.aws.amazon.com/opensearch-service/latest/developerguide/request-signing.html#request-signing-python
To work around this we patch `AIOHttpConnection.perform_request` method to add in
AWS Auth headers before making each request.
This approach was synthesized from
* https://stackoverflow.com/questions/38144273/making-a-signed-http-request-to-aws-elasticsearch-in-python
* https://github.com/DavidMuller/aws-requests-auth
* https://github.com/jmenga/requests-aws-sign
* https://github.com/byrro/aws-lambda-signed-aiohttp-requests
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._credentials = boto3.Session().get_credentials()
async def perform_request(
self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None
):
def _make_full_url(url: str) -> str:
# These steps are copied from the parent class' `perform_request` implementation.
# The ElasticSearch client only passes in the request path as the url,
# and that partial url format is rejected by the `SigV4Auth` implementation
if params:
query_string = urlencode(params)
else:
query_string = ""
full_url = self.host + url
full_url = self.url_prefix + full_url
if query_string:
full_url = "%s?%s" % (full_url, query_string)
return full_url
full_url = _make_full_url(url)
if headers is None:
headers = {}
# this request object won't be used, we just want to copy its auth headers
# after `SigV4Auth` processes it and adds the headers
_request = AWSRequest(
method=method, url=full_url, headers=headers, params=params, data=body
)
SigV4Auth(self._credentials, "es", "us-west-1").add_auth(_request)
headers.update(_request.headers.items())
# passing in the original `url` param here works too
return await super().perform_request(
method, full_url, params, body, timeout, ignore, headers
)
Solution 2:[2]
I think that with AWS4Auth you are bound to RequestsHttpConnection.
The default connection class is based on urllib3 which is more performant and lightweight than the optional requests-based class. Only use RequestsHttpConnection if you have need of any of requests advanced features like custom auth plugins etc.
from https://elasticsearch-py.readthedocs.io/en/master/transports.html
Try:
es = AsyncElasticsearch(
hosts=[{'host': host, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
or non-async version if the code above doesn't work:
es = Elasticsearch(
hosts=[{'host': host, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
Solution 3:[3]
I took @francojposa's answer above and fixed/adapted it, I tried to submit an edit to his answer but the "suggestion queue is full" or such
requirements.txt
:
boto3<2.0
elasticsearch[async]<7.14 # in 7.14 they "shut-out" anything other than elastic cloud
And here's the main definition
from urllib.parse import urlencode
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from elasticsearch import AsyncElasticsearch, AIOHttpConnection
class AWSAuthAIOHttpConnection(AIOHttpConnection):
"""Enable AWS Auth with AIOHttpConnection for AsyncElasticsearch
The AIOHttpConnection class built into elasticsearch-py is not currently
compatible with passing AWSAuth as the `http_auth` parameter, as suggested
in the docs when using AWSAuth for the non-async RequestsHttpConnection class:
https://docs.aws.amazon.com/opensearch-service/latest/developerguide/request-signing.html#request-signing-python
To work around this we patch `AIOHttpConnection.perform_request` method to add in
AWS Auth headers before making each request.
This approach was synthesized from
* https://stackoverflow.com/questions/38144273/making-a-signed-http-request-to-aws-elasticsearch-in-python
* https://github.com/DavidMuller/aws-requests-auth
* https://github.com/jmenga/requests-aws-sign
* https://github.com/byrro/aws-lambda-signed-aiohttp-requests
"""
def __init__(self, *args, aws_region=None, **kwargs):
super().__init__(*args, **kwargs)
self.aws_region = aws_region
self._credentials = boto3.Session().get_credentials()
self.auther = SigV4Auth(self._credentials, "es", self.aws_region)
def _make_full_url(self, url: str, params=None) -> str:
# These steps are copied from the parent class' `perform_request` implementation.
# The ElasticSearch client only passes in the request path as the url,
# and that partial url format is rejected by the `SigV4Auth` implementation
query_string = urlencode(params) if params else None
full_url = self.url_prefix + self.host + url
if query_string:
full_url = "%s?%s" % (full_url, query_string)
return full_url
async def perform_request(
self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None
):
full_url = self._make_full_url(url)
if headers is None:
headers = {}
# this request object won't be used, we just want to copy its auth headers
# after `SigV4Auth` processes it and adds the headers
_request = AWSRequest(
method=method, url=full_url, headers=headers, params=params, data=body
)
self.auther.add_auth(_request)
headers.update(_request.headers.items())
# passing in the original `url` param here works too
return await super().perform_request(
method, url, params, body, timeout, ignore, headers
)
Usage:
es_client = AsyncElasticsearch(
['https://aws-es-or-opensearch-url-goes-here'],
use_ssl=True, verify_certs=True,
connection_class=AWSAuthAIOHttpConnection, aws_region='us-east-1'
)
async def test():
body = {...}
results = await es_client.search(body=body, index='test', doc_type='test') # I use ES 5/6
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | francojposa |
Solution 2 | mateush92 |
Solution 3 | Kyle |