OAuth 2.0 Authorization Code Flow with PKCE

Complete OAuth 2.0 authorization code flow with PKCE (Proof Key for Code Exchange) for secure authentication testing

LoadForge can record your browser, graphically build tests, scan your site with a wizard and more. Sign up now to run your first test.

Sign up now


This locustfile demonstrates how to test a complete OAuth 2.0 authorization code flow with PKCE (Proof Key for Code Exchange). This is commonly used by modern web applications and SPAs for secure authentication.

Use Cases

  • Testing OAuth 2.0 providers (Auth0, Okta, Google, etc.)
  • Load testing authentication flows in SPAs
  • Validating OAuth security implementations
  • Testing token refresh mechanisms

Complete Locustfile

import base64
import hashlib
import secrets
import urllib.parse
from locust import HttpUser, task, between
import json
import re

class OAuth2User(HttpUser):
    wait_time = between(1, 3)
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # OAuth 2.0 Configuration - Update these for your provider
        self.client_id = "your-client-id"
        self.redirect_uri = "https://your-app.com/callback"
        self.auth_server = "https://your-auth-server.com"
        self.scope = "openid profile email"
        
        # PKCE parameters
        self.code_verifier = None
        self.code_challenge = None
        self.state = None
        self.authorization_code = None
        self.access_token = None
        self.refresh_token = None
    
    def generate_pkce_parameters(self):
        """Generate PKCE code verifier and challenge"""
        # Generate code verifier (43-128 characters)
        self.code_verifier = base64.urlsafe_b64encode(
            secrets.token_bytes(32)
        ).decode('utf-8').rstrip('=')
        
        # Generate code challenge
        challenge_bytes = hashlib.sha256(self.code_verifier.encode('utf-8')).digest()
        self.code_challenge = base64.urlsafe_b64encode(challenge_bytes).decode('utf-8').rstrip('=')
        
        # Generate state parameter
        self.state = secrets.token_urlsafe(32)
    
    def on_start(self):
        """Initialize OAuth flow when user starts"""
        self.generate_pkce_parameters()
        self.initiate_oauth_flow()
    
    def initiate_oauth_flow(self):
        """Step 1: Initiate OAuth 2.0 authorization request"""
        auth_params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': self.redirect_uri,
            'scope': self.scope,
            'state': self.state,
            'code_challenge': self.code_challenge,
            'code_challenge_method': 'S256'
        }
        
        auth_url = f"{self.auth_server}/oauth/authorize?" + urllib.parse.urlencode(auth_params)
        
        with self.client.get(
            auth_url,
            name="OAuth: Authorization Request",
            catch_response=True,
            allow_redirects=False
        ) as response:
            if response.status_code in [200, 302]:
                response.success()
                # In a real scenario, user would login here
                # For testing, we simulate getting the authorization code
                self.simulate_user_login()
            else:
                response.failure(f"Authorization request failed: {response.status_code}")
    
    def simulate_user_login(self):
        """Step 2: Simulate user login and consent"""
        # This simulates the user login form submission
        login_data = {
            'username': 'test@example.com',
            'password': 'testpassword',
            'state': self.state
        }
        
        with self.client.post(
            f"{self.auth_server}/oauth/login",
            data=login_data,
            name="OAuth: User Login",
            catch_response=True,
            allow_redirects=False
        ) as response:
            if response.status_code in [200, 302]:
                response.success()
                # Extract authorization code from redirect
                if 'Location' in response.headers:
                    location = response.headers['Location']
                    self.extract_authorization_code(location)
                else:
                    # Sometimes the code is in the response body
                    self.extract_authorization_code_from_body(response.text)
            else:
                response.failure(f"User login failed: {response.status_code}")
    
    def extract_authorization_code(self, redirect_url):
        """Extract authorization code from redirect URL"""
        parsed_url = urllib.parse.urlparse(redirect_url)
        query_params = urllib.parse.parse_qs(parsed_url.query)
        
        if 'code' in query_params:
            self.authorization_code = query_params['code'][0]
            # Verify state parameter
            if 'state' in query_params and query_params['state'][0] == self.state:
                self.exchange_code_for_tokens()
            else:
                print("State parameter mismatch - potential CSRF attack")
        else:
            print("No authorization code received")
    
    def extract_authorization_code_from_body(self, response_body):
        """Extract authorization code from response body (alternative method)"""
        # Look for authorization code in response body
        code_match = re.search(r'code=([^&\s]+)', response_body)
        if code_match:
            self.authorization_code = code_match.group(1)
            self.exchange_code_for_tokens()
    
    def exchange_code_for_tokens(self):
        """Step 3: Exchange authorization code for access token"""
        if not self.authorization_code:
            return
        
        token_data = {
            'grant_type': 'authorization_code',
            'client_id': self.client_id,
            'code': self.authorization_code,
            'redirect_uri': self.redirect_uri,
            'code_verifier': self.code_verifier
        }
        
        with self.client.post(
            f"{self.auth_server}/oauth/token",
            data=token_data,
            name="OAuth: Token Exchange",
            catch_response=True
        ) as response:
            if response.status_code == 200:
                response.success()
                token_response = response.json()
                self.access_token = token_response.get('access_token')
                self.refresh_token = token_response.get('refresh_token')
                print(f"Successfully obtained access token")
            else:
                response.failure(f"Token exchange failed: {response.status_code}")
    
    @task(3)
    def make_authenticated_request(self):
        """Make authenticated API requests using the access token"""
        if not self.access_token:
            return
        
        headers = {
            'Authorization': f'Bearer {self.access_token}',
            'Content-Type': 'application/json'
        }
        
        with self.client.get(
            "/api/user/profile",
            headers=headers,
            name="API: Authenticated Request",
            catch_response=True
        ) as response:
            if response.status_code == 200:
                response.success()
            elif response.status_code == 401:
                # Token might be expired, try to refresh
                response.failure("Access token expired")
                self.refresh_access_token()
            else:
                response.failure(f"Authenticated request failed: {response.status_code}")
    
    @task(1)
    def refresh_access_token(self):
        """Step 4: Refresh access token using refresh token"""
        if not self.refresh_token:
            return
        
        refresh_data = {
            'grant_type': 'refresh_token',
            'client_id': self.client_id,
            'refresh_token': self.refresh_token
        }
        
        with self.client.post(
            f"{self.auth_server}/oauth/token",
            data=refresh_data,
            name="OAuth: Token Refresh",
            catch_response=True
        ) as response:
            if response.status_code == 200:
                response.success()
                token_response = response.json()
                self.access_token = token_response.get('access_token')
                # Some providers issue new refresh tokens
                if 'refresh_token' in token_response:
                    self.refresh_token = token_response['refresh_token']
                print("Successfully refreshed access token")
            else:
                response.failure(f"Token refresh failed: {response.status_code}")
    
    @task(1)
    def revoke_token(self):
        """Optional: Revoke tokens (logout)"""
        if not self.access_token:
            return
        
        revoke_data = {
            'token': self.access_token,
            'client_id': self.client_id
        }
        
        with self.client.post(
            f"{self.auth_server}/oauth/revoke",
            data=revoke_data,
            name="OAuth: Token Revocation",
            catch_response=True
        ) as response:
            if response.status_code in [200, 204]:
                response.success()
                self.access_token = None
                self.refresh_token = None
                print("Successfully revoked tokens")
            else:
                response.failure(f"Token revocation failed: {response.status_code}")

Configuration

Before running this test in LoadForge, update these variables:

# OAuth 2.0 Provider Configuration
self.client_id = "your-oauth-client-id"
self.redirect_uri = "https://your-app.com/callback"  
self.auth_server = "https://your-auth-provider.com"
self.scope = "openid profile email"  # Adjust scopes as needed

LoadForge Setup

  1. Upload the locustfile to LoadForge
  2. Configure your target host to point to your application's API
  3. Set environment variables for sensitive data like client secrets
  4. Choose appropriate user count - OAuth flows can be resource-intensive
  5. Monitor token refresh patterns in the results

Key Testing Scenarios

  • Authorization Flow Performance: How quickly can users complete OAuth login?
  • Token Exchange Latency: Time taken to exchange codes for tokens
  • Refresh Token Behavior: How well does your app handle token refresh under load?
  • Concurrent Authentication: Multiple users authenticating simultaneously
  • Token Expiration Handling: Graceful handling of expired tokens

Security Considerations

  • PKCE Implementation: Protects against authorization code interception
  • State Parameter: Prevents CSRF attacks
  • Secure Token Storage: Tokens are stored securely in memory
  • Token Revocation: Proper cleanup when sessions end

Common OAuth Providers

This pattern works with:

  • Auth0: https://your-domain.auth0.com
  • Okta: https://your-domain.okta.com
  • Google: https://accounts.google.com
  • Microsoft: https://login.microsoftonline.com
  • GitHub: https://github.com/login/oauth

Troubleshooting

  • Invalid Client: Check your client_id configuration
  • Redirect URI Mismatch: Ensure redirect URI matches OAuth app settings
  • PKCE Errors: Verify code challenge generation is correct
  • Scope Issues: Check if requested scopes are allowed for your client

When you add this test to LoadForge, you'll be able to see detailed metrics on each step of the OAuth flow, helping you identify bottlenecks in your authentication system.

Ready to run your test?
Run your test today with LoadForge.