I though I would share a python script to import Adobe Commerce Cloud / Magento users from a CSV to Auth0 bulk API. It splits the csv into multiple json payload and also waits for the previous payload to be imported before doing a subsequent POST
import csv
import json
import time
import chardet
import base64
import binascii
import os
import requests
from datetime import datetime
# Define constants as equivalent to the PHP SODIUM_CRYPTO_PWHASH_* constants
#eg php -r 'echo SODIUM_CRYPTO_PWHASH_MEMLIMIT_INTERACTIVE."\n";'
SODIUM_CRYPTO_PWHASH_MEMLIMIT_INTERACTIVE = 67108864
SODIUM_CRYPTO_PWHASH_OPSLIMIT_INTERACTIVE = 2
SODIUM_CRYPTO_PWHASH_SALTBYTES = 16
# Define files path and names
PATH = ''
CSV_FILENAME = 'users.csv'
JSON_FILENAME_PREFIX = 'users'
#Auth0 limits to 500kb per file, 1000 users per file is about right (300kb)
MAX_USERS_PER_FILE = 1000
# Set your Auth0 domain and Management API credentials
AUTH0_DOMAIN = 'XXXXXXX.auth0.com'
CLIENT_ID = ''
CLIENT_SECRET = ''
AUDIENCE = f'https://{AUTH0_DOMAIN}/api/v2/'
TOKEN_URL = f'https://{AUTH0_DOMAIN}/oauth/token'
API_URL = f'https://{AUTH0_DOMAIN}/api/v2/jobs/users-imports'
JOB_STATUS_URL = f'https://{AUTH0_DOMAIN}/api/v2/jobs/'
# Directory containing JSON files
JSON_DIR = os.getcwd()
# Log file to record responses
LOG_FILE = 'response_log.txt'
# Your valid connection ID
CONNECTION_ID = 'con_XXXXXXXXXXXXXXXXXXX'
def argon2Phc(hash_string: str) -> str:
"""
Converts argon2 has to PHC string for Auth0.
"""
hash_parts = hash_string.split(':')
hash_hex = hash_parts[0]
salt = hash_parts[1]
memlimit_kib = SODIUM_CRYPTO_PWHASH_MEMLIMIT_INTERACTIVE // 1024
opslimit = SODIUM_CRYPTO_PWHASH_OPSLIMIT_INTERACTIVE
salt_encoded = base64.b64encode(salt[:SODIUM_CRYPTO_PWHASH_SALTBYTES].encode()).decode().rstrip('=')
hash_encoded = base64.b64encode(binascii.unhexlify(hash_hex)).decode().rstrip('=')
return f'$argon2id$v=19$m={memlimit_kib},t={opslimit},p=1${salt_encoded}${hash_encoded}'
def detect_encoding(file_path):
with open(file_path, 'rb') as file:
detector = chardet.universaldetector.UniversalDetector()
for line in file:
detector.feed(line)
if detector.done:
break
detector.close()
return detector.result['encoding']
def convert_csv_to_auth0_json():
"""
Converts a CSV file to JSON format for Auth0 User Import/Export Extension.
"""
# Read CSV data\
csv_file = PATH + CSV_FILENAME
csv_file_encoding = detect_encoding(csv_file)
with open(csv_file, newline='', encoding=csv_file_encoding) as f:
csv_reader = csv.DictReader(f,delimiter=';')
headers = csv_reader.fieldnames
# Set initial file splitter
current_splitter = None
splitter = 0
# Create data array
data = []
for row in csv_reader:
#Set initial json payload
user_data = {}
custom_password_hash = {}
hash = {}
# Create new array in data when splitter increase
if splitter != current_splitter:
current_splitter = splitter
data.append([])
# Map CSV headers to Auth0 user object properties
for header in headers:
if header.lower() == 'email':
user_data[header] = row[header]
elif header.lower() == 'email_verified':
user_data[header] = True
elif header.lower() == 'custom_password_hash' and row[header] != '\\N' :
custom_password_hash['algorithm'] = 'argon2'
hash['value'] = argon2Phc(row[header])
custom_password_hash['hash'] = hash
user_data[header] = custom_password_hash
data[splitter].append(user_data)
# Set file splitter increment based on max user per file
user_pointer = csv_reader.line_num - 1
if (user_pointer) % MAX_USERS_PER_FILE == 0:
splitter += 1
# Write data to JSON file
for idx, datarow in enumerate(data):
json_file = PATH + JSON_FILENAME_PREFIX + '_' + str(idx + 1) + '.json'
with open(json_file, 'w') as f:
json.dump(datarow, f, indent=2)
def get_api_token():
"""Obtains an API token from Auth0 using OAuth2."""
headers = {'content-type': 'application/json'}
payload = {
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'audience': AUDIENCE,
'grant_type': 'client_credentials'
}
response = requests.post(TOKEN_URL, headers=headers, json=payload)
response.raise_for_status()
return response.json()['access_token']
def log_response(response, filename):
"""Logs the response to a log file."""
with open(LOG_FILE, 'a') as log_file:
log_file.write(f'Time: {datetime.now()}\n')
log_file.write(f'Filename: {filename}\n')
log_file.write(f'Status Code: {response.status_code}\n')
log_file.write(f'Response: {response.json()}\n')
log_file.write('\n' + '-'*50 + '\n\n')
def check_job_status(job_id, api_token):
"""Checks the status of a given job."""
headers = {
'Authorization': f'Bearer {api_token}',
'Content-Type': 'application/json'
}
response = requests.get(f'{JOB_STATUS_URL}{job_id}', headers=headers)
response.raise_for_status()
job_status = response.json().get('status')
return job_status
def post_json_files(api_token):
"""Reads JSON files and posts them to the Auth0 API."""
headers = {
'Authorization': f'Bearer {api_token}'
}
previous_job_id = None
# Sort the filenames in JSON_DIR
filenames = sorted([f for f in os.listdir(JSON_DIR) if f.endswith('.json')])
for filename in filenames:
# If there is a previous job, wait until it is completed
if previous_job_id:
while True:
job_status = check_job_status(previous_job_id, api_token)
if job_status == 'completed':
break
elif job_status == 'failed':
print(f'Previous job {previous_job_id} failed.')
return
else:
print(f'Waiting for job {previous_job_id} to complete...')
time.sleep(10) # Wait for 10 seconds before checking again
file_path = os.path.join(JSON_DIR, filename)
with open(file_path, 'r') as json_file:
json_data = json.load(json_file)
# Convert JSON data to a file-like object
json_bytes = json.dumps(json_data).encode('utf-8')
files = {
'users': ('users.json', json_bytes, 'application/json'),
'connection_id': (None, CONNECTION_ID),
'send_completion_email': (None, 'false')
}
# Post the JSON data to the API
response = requests.post(API_URL, headers=headers, files=files)
# Log the response with filename
log_response(response, filename)
print(f'Posted {filename}: Status Code {response.status_code}')
# Get the job ID from the response
response_data = response.json()
previous_job_id = response_data.get('id')
# Generate JSON files
convert_csv_to_auth0_json()
# Get Auth0 token
api_token = get_api_token()
#Post JSON files
post_json_files(api_token)