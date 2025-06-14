This locustfile demonstrates how to test a complete OAuth 2.0 authorization code flow with PKCE (Proof Key for Code Exchange). This is commonly used by modern web applications and SPAs for secure authentication.

Use Cases

Testing OAuth 2.0 providers (Auth0, Okta, Google, etc.)

Load testing authentication flows in SPAs

Validating OAuth security implementations

Testing token refresh mechanisms

Complete Locustfile

import base64 import hashlib import secrets import urllib.parse from locust import HttpUser, task, between import json import re class OAuth2User(HttpUser): wait_time = between(1, 3) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # OAuth 2.0 Configuration - Update these for your provider self.client_id = "your-client-id" self.redirect_uri = "https://your-app.com/callback" self.auth_server = "https://your-auth-server.com" self.scope = "openid profile email" # PKCE parameters self.code_verifier = None self.code_challenge = None self.state = None self.authorization_code = None self.access_token = None self.refresh_token = None def generate_pkce_parameters(self): """Generate PKCE code verifier and challenge""" # Generate code verifier (43-128 characters) self.code_verifier = base64.urlsafe_b64encode( secrets.token_bytes(32) ).decode('utf-8').rstrip('=') # Generate code challenge challenge_bytes = hashlib.sha256(self.code_verifier.encode('utf-8')).digest() self.code_challenge = base64.urlsafe_b64encode(challenge_bytes).decode('utf-8').rstrip('=') # Generate state parameter self.state = secrets.token_urlsafe(32) def on_start(self): """Initialize OAuth flow when user starts""" self.generate_pkce_parameters() self.initiate_oauth_flow() def initiate_oauth_flow(self): """Step 1: Initiate OAuth 2.0 authorization request""" auth_params = { 'response_type': 'code', 'client_id': self.client_id, 'redirect_uri': self.redirect_uri, 'scope': self.scope, 'state': self.state, 'code_challenge': self.code_challenge, 'code_challenge_method': 'S256' } auth_url = f"{self.auth_server}/oauth/authorize?" + urllib.parse.urlencode(auth_params) with self.client.get( auth_url, name="OAuth: Authorization Request", catch_response=True, allow_redirects=False ) as response: if response.status_code in [200, 302]: response.success() # In a real scenario, user would login here # For testing, we simulate getting the authorization code self.simulate_user_login() else: response.failure(f"Authorization request failed: {response.status_code}") def simulate_user_login(self): """Step 2: Simulate user login and consent""" # This simulates the user login form submission login_data = { 'username': 'test@example.com', 'password': 'testpassword', 'state': self.state } with self.client.post( f"{self.auth_server}/oauth/login", data=login_data, name="OAuth: User Login", catch_response=True, allow_redirects=False ) as response: if response.status_code in [200, 302]: response.success() # Extract authorization code from redirect if 'Location' in response.headers: location = response.headers['Location'] self.extract_authorization_code(location) else: # Sometimes the code is in the response body self.extract_authorization_code_from_body(response.text) else: response.failure(f"User login failed: {response.status_code}") def extract_authorization_code(self, redirect_url): """Extract authorization code from redirect URL""" parsed_url = urllib.parse.urlparse(redirect_url) query_params = urllib.parse.parse_qs(parsed_url.query) if 'code' in query_params: self.authorization_code = query_params['code'][0] # Verify state parameter if 'state' in query_params and query_params['state'][0] == self.state: self.exchange_code_for_tokens() else: print("State parameter mismatch - potential CSRF attack") else: print("No authorization code received") def extract_authorization_code_from_body(self, response_body): """Extract authorization code from response body (alternative method)""" # Look for authorization code in response body code_match = re.search(r'code=([^&\s]+)', response_body) if code_match: self.authorization_code = code_match.group(1) self.exchange_code_for_tokens() def exchange_code_for_tokens(self): """Step 3: Exchange authorization code for access token""" if not self.authorization_code: return token_data = { 'grant_type': 'authorization_code', 'client_id': self.client_id, 'code': self.authorization_code, 'redirect_uri': self.redirect_uri, 'code_verifier': self.code_verifier } with self.client.post( f"{self.auth_server}/oauth/token", data=token_data, name="OAuth: Token Exchange", catch_response=True ) as response: if response.status_code == 200: response.success() token_response = response.json() self.access_token = token_response.get('access_token') self.refresh_token = token_response.get('refresh_token') print(f"Successfully obtained access token") else: response.failure(f"Token exchange failed: {response.status_code}") @task(3) def make_authenticated_request(self): """Make authenticated API requests using the access token""" if not self.access_token: return headers = { 'Authorization': f'Bearer {self.access_token}', 'Content-Type': 'application/json' } with self.client.get( "/api/user/profile", headers=headers, name="API: Authenticated Request", catch_response=True ) as response: if response.status_code == 200: response.success() elif response.status_code == 401: # Token might be expired, try to refresh response.failure("Access token expired") self.refresh_access_token() else: response.failure(f"Authenticated request failed: {response.status_code}") @task(1) def refresh_access_token(self): """Step 4: Refresh access token using refresh token""" if not self.refresh_token: return refresh_data = { 'grant_type': 'refresh_token', 'client_id': self.client_id, 'refresh_token': self.refresh_token } with self.client.post( f"{self.auth_server}/oauth/token", data=refresh_data, name="OAuth: Token Refresh", catch_response=True ) as response: if response.status_code == 200: response.success() token_response = response.json() self.access_token = token_response.get('access_token') # Some providers issue new refresh tokens if 'refresh_token' in token_response: self.refresh_token = token_response['refresh_token'] print("Successfully refreshed access token") else: response.failure(f"Token refresh failed: {response.status_code}") @task(1) def revoke_token(self): """Optional: Revoke tokens (logout)""" if not self.access_token: return revoke_data = { 'token': self.access_token, 'client_id': self.client_id } with self.client.post( f"{self.auth_server}/oauth/revoke", data=revoke_data, name="OAuth: Token Revocation", catch_response=True ) as response: if response.status_code in [200, 204]: response.success() self.access_token = None self.refresh_token = None print("Successfully revoked tokens") else: response.failure(f"Token revocation failed: {response.status_code}")

Configuration

Before running this test in LoadForge, update these variables:

# OAuth 2.0 Provider Configuration self.client_id = "your-oauth-client-id" self.redirect_uri = "https://your-app.com/callback" self.auth_server = "https://your-auth-provider.com" self.scope = "openid profile email" # Adjust scopes as needed

LoadForge Setup

Upload the locustfile to LoadForge Configure your target host to point to your application's API Set environment variables for sensitive data like client secrets Choose appropriate user count - OAuth flows can be resource-intensive Monitor token refresh patterns in the results

Key Testing Scenarios

Authorization Flow Performance : How quickly can users complete OAuth login?

: How quickly can users complete OAuth login? Token Exchange Latency : Time taken to exchange codes for tokens

: Time taken to exchange codes for tokens Refresh Token Behavior : How well does your app handle token refresh under load?

: How well does your app handle token refresh under load? Concurrent Authentication : Multiple users authenticating simultaneously

: Multiple users authenticating simultaneously Token Expiration Handling: Graceful handling of expired tokens

Security Considerations

✅ PKCE Implementation : Protects against authorization code interception

: Protects against authorization code interception ✅ State Parameter : Prevents CSRF attacks

: Prevents CSRF attacks ✅ Secure Token Storage : Tokens are stored securely in memory

: Tokens are stored securely in memory ✅ Token Revocation: Proper cleanup when sessions end

Common OAuth Providers

This pattern works with:

Auth0 : https://your-domain.auth0.com

: Okta : https://your-domain.okta.com

: Google : https://accounts.google.com

: Microsoft : https://login.microsoftonline.com

: GitHub: https://github.com/login/oauth

Troubleshooting

Invalid Client : Check your client_id configuration

: Check your configuration Redirect URI Mismatch : Ensure redirect URI matches OAuth app settings

: Ensure redirect URI matches OAuth app settings PKCE Errors : Verify code challenge generation is correct

: Verify code challenge generation is correct Scope Issues: Check if requested scopes are allowed for your client

When you add this test to LoadForge, you'll be able to see detailed metrics on each step of the OAuth flow, helping you identify bottlenecks in your authentication system.