I’m trying to implement AuthO on my Kafka Python application but i get this error:
Response status: 403
Response text: {“error”:“unauthorized_client”,“error_description”:“Grant type ‘client_crendetials’ not allowed for the client.”,“error_uri”:“Application Grant Types”}
Traceback (most recent call last):
File “/data/work/kafka-docker/python/main.py”, line 12, in
token = auth0.get_token()
^^^^^^^^^^^^^^^^^
File “/data/work/kafka-docker/python/auth0.py”, line 31, in get_token
response.raise_for_status()
File “/usr/lib/python3/dist-packages/requests/models.py”, line 1021, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 403 Client Error: Forbidden for url: https://dev-2wlje8cth5bo7kri.us.auth0.com/oauth/token
I tested getting the token in a test class before, using the same application an API, and it worked.
I also checked the settings and everything seems to be configured correctly.
Here is my Python code:
main.py
from auth0 import Auth0
from kafka_client import KafkaClient
AUTH0_DOMAIN = ‘dev-2wlje8cth5bo7kri.us.auth0.com’
CLIENT_ID = ‘CLIENT_ID’
CLIENT_SECRET = ‘CLIENT_SECRET’
AUDIENCE = ‘https://myapi/’
BOOTSTRAP_SERVERS = ‘localhost:9092’
GROUP_ID = ‘my-group’
auth0 = Auth0(AUTH0_DOMAIN, CLIENT_ID, CLIENT_SECRET, AUDIENCE)
token = auth0.get_token()
print(f"Token: {token}")
kafka_client = KafkaClient(BOOTSTRAP_SERVERS,token)
producer = kafka_client.create_producer()
consumer = kafka_client.create_consumer(GROUP_ID)
auth0.py
import requests
import json
class Auth0:
def init(self, domain, client_id, client_secret, audience):
self.domain =domain
self.client_id = client_id
self.client_secret = client_secret
self.audience = audience
def get_token(self):
url =f"https://{self.domain}/oauth/token"
headers = {
‘content-type’: ‘application/json’
}
data = {
‘client_id’ : self.client_id,
‘client_secret’: self.client_secret,
‘audience’ : self.audience,
‘grant_type’ : ‘client_crendetials’
}
response = requests.post(url, headers=headers, data=json.dumps(data))
print(f"Response status: {response.status_code}")
print(f"Response text: {response.text}")
response.raise_for_status()
token = response.json()[‘acess_token’]
return token
kafka_client.py
from confluent_kafka import Producer, Consumer, KafkaError
class KafkaClient:
def init(self, bootstrap_servers, token):
self.bootstrap_servers = bootstrap_servers
self.token = token
def create_producer(self):
conf = {
‘bootstrap.servers’: self.bootstrap_servers,
‘security.protocol’: ‘SASL_SSL’,
‘sasl.mechanisms’: ‘OAUTHBEARER’,
‘sasl.oauthbearer.config’: f"token={self.token}"
}
producer = Producer(**conf)
return producer
def create_consumer(self, group_id):
conf = {
‘bootstrap.servers’: self.bootstrap_servers,
‘security.protocol’:‘SASL_SSL’,
‘sasl_mechanisms’: ‘OAUTHBEARER’,
‘sasl.oauthbearer.config’: f"token={self.token}",
“group.id”:group_id,
‘auto.offset.reset’:‘earliest’
}
consumer = Consumer(**conf)
return consumer
And here is the test class that get the token without the error:
get_token.py
import requests
import json
def get_auth0_token():
AUTH0_DOMAIN = ‘dev-2wlje8cth5bo7kri.us.auth0.com’
CLIENT_ID = ‘CLIENT_ID’
CLIENT_SECRET = ‘CLIENT_SECRET’
AUDIENCE = ‘https://myapi/’
url = f"https://{AUTH0_DOMAIN}/oauth/token"
headers = {
‘content-type’: ‘application/json’
}
data = {
‘client_id’: CLIENT_ID,
‘client_secret’: CLIENT_SECRET,
‘audience’: AUDIENCE,
‘grant_type’: ‘client_credentials’
}
response = requests.post(url, headers=headers, data=json.dumps(data))
response.raise_for_status()
token = response.json()[‘access_token’]
return token
token = get_auth0_token()
print(f"Token: {token}")
And here is my settings, to check if i missed anything:
Client:
API: