Python script to import users from Adobe Commerce Cloud / Magento to Auth0

I though I would share a python script to import Adobe Commerce Cloud / Magento users from a CSV to Auth0 bulk API. It splits the csv into multiple json payload and also waits for the previous payload to be imported before doing a subsequent POST

import csv
import json
import time
import chardet 
import base64
import binascii
import os
import requests
from datetime import datetime

# Define constants as equivalent to the PHP SODIUM_CRYPTO_PWHASH_* constants
#eg php -r 'echo SODIUM_CRYPTO_PWHASH_MEMLIMIT_INTERACTIVE."\n";'
SODIUM_CRYPTO_PWHASH_MEMLIMIT_INTERACTIVE = 67108864
SODIUM_CRYPTO_PWHASH_OPSLIMIT_INTERACTIVE = 2
SODIUM_CRYPTO_PWHASH_SALTBYTES = 16

# Define files path and names
PATH = ''
CSV_FILENAME = 'users.csv'
JSON_FILENAME_PREFIX = 'users'

#Auth0 limits to 500kb per file, 1000 users per file is about right (300kb)
MAX_USERS_PER_FILE = 1000

# Set your Auth0 domain and Management API credentials
AUTH0_DOMAIN = 'XXXXXXX.auth0.com'
CLIENT_ID = ''
CLIENT_SECRET = ''
AUDIENCE = f'https://{AUTH0_DOMAIN}/api/v2/'
TOKEN_URL = f'https://{AUTH0_DOMAIN}/oauth/token'
API_URL = f'https://{AUTH0_DOMAIN}/api/v2/jobs/users-imports'
JOB_STATUS_URL = f'https://{AUTH0_DOMAIN}/api/v2/jobs/'

# Directory containing JSON files
JSON_DIR = os.getcwd()

# Log file to record responses
LOG_FILE = 'response_log.txt'

# Your valid connection ID
CONNECTION_ID = 'con_XXXXXXXXXXXXXXXXXXX'

def argon2Phc(hash_string: str) -> str:
  """
  Converts argon2 has to PHC string for Auth0.
  """
  hash_parts = hash_string.split(':')
  hash_hex = hash_parts[0]
  salt = hash_parts[1]
  
  memlimit_kib = SODIUM_CRYPTO_PWHASH_MEMLIMIT_INTERACTIVE // 1024
  opslimit = SODIUM_CRYPTO_PWHASH_OPSLIMIT_INTERACTIVE
  
  salt_encoded = base64.b64encode(salt[:SODIUM_CRYPTO_PWHASH_SALTBYTES].encode()).decode().rstrip('=')
  hash_encoded = base64.b64encode(binascii.unhexlify(hash_hex)).decode().rstrip('=')
  
  return f'$argon2id$v=19$m={memlimit_kib},t={opslimit},p=1${salt_encoded}${hash_encoded}'

def detect_encoding(file_path):
    with open(file_path, 'rb') as file: 
        detector = chardet.universaldetector.UniversalDetector() 
        for line in file: 
            detector.feed(line) 
            if detector.done: 
                break
        detector.close() 
    return detector.result['encoding']

def convert_csv_to_auth0_json():
  """
  Converts a CSV file to JSON format for Auth0 User Import/Export Extension.
  """
  # Read CSV data\
  csv_file = PATH + CSV_FILENAME
  csv_file_encoding = detect_encoding(csv_file)
  with open(csv_file, newline='', encoding=csv_file_encoding) as f:
    csv_reader = csv.DictReader(f,delimiter=';')
    headers = csv_reader.fieldnames

    # Set initial file splitter
    current_splitter = None
    splitter = 0
   
    # Create data array
    data = []

    for row in csv_reader:

      #Set initial json payload
      user_data = {}
      custom_password_hash = {}
      hash = {}

      # Create new array in data when splitter increase
      if splitter != current_splitter:
        current_splitter = splitter
        data.append([])
      
      # Map CSV headers to Auth0 user object properties
      for header in headers:

        if header.lower() == 'email':
          user_data[header] = row[header]
        elif header.lower() == 'email_verified':
          user_data[header] = True
        elif header.lower() == 'custom_password_hash' and row[header] != '\\N' :
          custom_password_hash['algorithm'] = 'argon2'
          hash['value'] = argon2Phc(row[header])
          custom_password_hash['hash'] = hash
          user_data[header] = custom_password_hash

      data[splitter].append(user_data)

      # Set file splitter increment based on max user per file
      user_pointer = csv_reader.line_num - 1
      if (user_pointer) % MAX_USERS_PER_FILE == 0:
        splitter += 1

  # Write data to JSON file
  for idx, datarow in enumerate(data):
    json_file = PATH + JSON_FILENAME_PREFIX + '_' + str(idx + 1) + '.json'
    with open(json_file, 'w') as f:
      json.dump(datarow, f, indent=2)

def get_api_token():
    """Obtains an API token from Auth0 using OAuth2."""
    headers = {'content-type': 'application/json'}
    payload = {
        'client_id': CLIENT_ID,
        'client_secret': CLIENT_SECRET,
        'audience': AUDIENCE,
        'grant_type': 'client_credentials'
    }
    response = requests.post(TOKEN_URL, headers=headers, json=payload)
    response.raise_for_status()
    return response.json()['access_token']

def log_response(response, filename):
    """Logs the response to a log file."""
    with open(LOG_FILE, 'a') as log_file:
        log_file.write(f'Time: {datetime.now()}\n')
        log_file.write(f'Filename: {filename}\n')
        log_file.write(f'Status Code: {response.status_code}\n')
        log_file.write(f'Response: {response.json()}\n')
        log_file.write('\n' + '-'*50 + '\n\n')

def check_job_status(job_id, api_token):
    """Checks the status of a given job."""
    headers = {
        'Authorization': f'Bearer {api_token}',
        'Content-Type': 'application/json'
    }
    response = requests.get(f'{JOB_STATUS_URL}{job_id}', headers=headers)
    response.raise_for_status()
    job_status = response.json().get('status')
    return job_status

def post_json_files(api_token):
    """Reads JSON files and posts them to the Auth0 API."""
    headers = {
        'Authorization': f'Bearer {api_token}'
    }
    previous_job_id = None

    # Sort the filenames in JSON_DIR
    filenames = sorted([f for f in os.listdir(JSON_DIR) if f.endswith('.json')])

    for filename in filenames:
        # If there is a previous job, wait until it is completed
        if previous_job_id:
            while True:
                job_status = check_job_status(previous_job_id, api_token)
                if job_status == 'completed':
                    break
                elif job_status == 'failed':
                    print(f'Previous job {previous_job_id} failed.')
                    return
                else:
                    print(f'Waiting for job {previous_job_id} to complete...')
                    time.sleep(10)  # Wait for 10 seconds before checking again

        file_path = os.path.join(JSON_DIR, filename)
        with open(file_path, 'r') as json_file:
            json_data = json.load(json_file)
            
            # Convert JSON data to a file-like object
            json_bytes = json.dumps(json_data).encode('utf-8')
            files = {
                'users': ('users.json', json_bytes, 'application/json'),
                'connection_id': (None, CONNECTION_ID),
                'send_completion_email': (None, 'false')
            }
            
            # Post the JSON data to the API
            response = requests.post(API_URL, headers=headers, files=files)
            
            # Log the response with filename
            log_response(response, filename)
            print(f'Posted {filename}: Status Code {response.status_code}')
            
            # Get the job ID from the response
            response_data = response.json()
            previous_job_id = response_data.get('id')

# Generate JSON files
convert_csv_to_auth0_json()

# Get Auth0 token
api_token = get_api_token()

#Post JSON files
post_json_files(api_token)

Hey there @marcandre.caron welcome to the community, and thanks for sharing! :slight_smile:

forgot to mention but the csv file must have a header line with those fields :

“email”;“email_verified”;“custom_password_hash”

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.