Explorer reports addition
We have added a new Explorer feature to reports, with a timeline scrubber and easy anomaly detection.
Basic database API testing for CRUD operations, queries, and database performance
LoadForge can record your browser, graphically build tests, scan your site with a wizard and more. Sign up now to run your first test.
This guide shows how to test database APIs with common operations like create, read, update, delete, and query performance.
from locust import task, HttpUser
import random
import json
class DatabaseAPITestUser(HttpUser):
def on_start(self):
# Database API endpoints
self.db_endpoints = {
"users": "/api/users",
"products": "/api/products",
"orders": "/api/orders",
"query": "/api/query"
}
# Test data for database operations
self.test_users = [
{"name": "John Doe", "email": "john@example.com", "age": 30},
{"name": "Jane Smith", "email": "jane@example.com", "age": 25},
{"name": "Bob Johnson", "email": "bob@example.com", "age": 35}
]
self.test_products = [
{"name": "Laptop", "price": 999.99, "category": "Electronics"},
{"name": "Book", "price": 29.99, "category": "Books"},
{"name": "Chair", "price": 149.99, "category": "Furniture"}
]
# Store created IDs for later operations
self.created_users = []
self.created_products = []
@task(4)
def test_create_user(self):
"""Test creating database records"""
user_data = random.choice(self.test_users).copy()
user_data["email"] = f"user{random.randint(1000, 9999)}@example.com"
with self.client.post(
self.db_endpoints["users"],
json=user_data,
name="Create User"
) as response:
if response.status_code in [200, 201]:
try:
result = response.json()
user_id = result.get("id") or result.get("user_id")
if user_id:
self.created_users.append(user_id)
print(f"Created user: {user_id}")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"Create user failed: {response.status_code}")
@task(5)
def test_read_user(self):
"""Test reading database records"""
if self.created_users:
user_id = random.choice(self.created_users)
else:
user_id = random.randint(1, 100) # Assume some users exist
with self.client.get(
f"{self.db_endpoints['users']}/{user_id}",
name="Read User"
) as response:
if response.status_code == 200:
try:
user = response.json()
if user.get("id") or user.get("user_id"):
print(f"Read user: {user_id}")
else:
response.failure("User data missing ID")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
elif response.status_code == 404:
print(f"User not found: {user_id}")
else:
response.failure(f"Read user failed: {response.status_code}")
@task(3)
def test_update_user(self):
"""Test updating database records"""
if not self.created_users:
return
user_id = random.choice(self.created_users)
update_data = {
"name": f"Updated User {random.randint(100, 999)}",
"age": random.randint(18, 65)
}
with self.client.put(
f"{self.db_endpoints['users']}/{user_id}",
json=update_data,
name="Update User"
) as response:
if response.status_code == 200:
try:
result = response.json()
print(f"Updated user: {user_id}")
except json.JSONDecodeError:
print(f"Updated user: {user_id} (no JSON response)")
elif response.status_code == 404:
print(f"User not found for update: {user_id}")
self.created_users.remove(user_id)
else:
response.failure(f"Update user failed: {response.status_code}")
@task(2)
def test_delete_user(self):
"""Test deleting database records"""
if not self.created_users:
return
user_id = self.created_users.pop()
with self.client.delete(
f"{self.db_endpoints['users']}/{user_id}",
name="Delete User"
) as response:
if response.status_code in [200, 204]:
print(f"Deleted user: {user_id}")
elif response.status_code == 404:
print(f"User already deleted: {user_id}")
else:
response.failure(f"Delete user failed: {response.status_code}")
@task(3)
def test_list_users(self):
"""Test listing database records with pagination"""
params = {
"page": random.randint(1, 5),
"limit": random.choice([10, 20, 50]),
"sort": random.choice(["name", "email", "created_at"])
}
with self.client.get(
self.db_endpoints["users"],
params=params,
name="List Users"
) as response:
if response.status_code == 200:
try:
result = response.json()
users = result.get("users", result.get("data", []))
total = result.get("total", len(users))
page = result.get("page", params["page"])
print(f"Listed users: {len(users)} items, page {page}, total {total}")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"List users failed: {response.status_code}")
@task(2)
def test_database_query(self):
"""Test custom database queries"""
queries = [
{"query": "SELECT COUNT(*) FROM users WHERE age > ?", "params": [25]},
{"query": "SELECT * FROM products WHERE price < ? LIMIT 10", "params": [100.0]},
{"query": "SELECT category, COUNT(*) FROM products GROUP BY category", "params": []},
{"query": "SELECT * FROM users ORDER BY created_at DESC LIMIT 5", "params": []}
]
query_data = random.choice(queries)
with self.client.post(
self.db_endpoints["query"],
json=query_data,
name="Database Query"
) as response:
if response.status_code == 200:
try:
result = response.json()
rows = result.get("rows", result.get("data", []))
execution_time = result.get("execution_time_ms", 0)
print(f"Query executed: {len(rows)} rows, {execution_time}ms")
# Validate query performance
if execution_time > 1000: # 1 second threshold
print(f"Slow query detected: {execution_time}ms")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"Database query failed: {response.status_code}")
@task(2)
def test_database_search(self):
"""Test database search functionality"""
search_terms = ["john", "electronics", "chair", "book", "user"]
search_params = {
"q": random.choice(search_terms),
"table": random.choice(["users", "products"]),
"limit": 20
}
with self.client.get(
f"{self.db_endpoints['query']}/search",
params=search_params,
name="Database Search"
) as response:
if response.status_code == 200:
try:
result = response.json()
results = result.get("results", [])
total_matches = result.get("total_matches", len(results))
print(f"Search results: {len(results)} items, {total_matches} total matches")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"Database search failed: {response.status_code}")
@task(1)
def test_database_transaction(self):
"""Test database transaction handling"""
transaction_data = {
"operations": [
{
"type": "create",
"table": "users",
"data": {"name": "Transaction User", "email": f"trans{random.randint(1000, 9999)}@example.com"}
},
{
"type": "create",
"table": "products",
"data": {"name": "Transaction Product", "price": 99.99}
}
]
}
with self.client.post(
f"{self.db_endpoints['query']}/transaction",
json=transaction_data,
name="Database Transaction"
) as response:
if response.status_code == 200:
try:
result = response.json()
success = result.get("success", False)
operations_completed = result.get("operations_completed", 0)
if success:
print(f"Transaction successful: {operations_completed} operations")
else:
print(f"Transaction failed: {operations_completed} operations completed")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"Database transaction failed: {response.status_code}")
@task(1)
def test_database_performance(self):
"""Test database performance monitoring"""
with self.client.get(
f"{self.db_endpoints['query']}/stats",
name="Database Performance"
) as response:
if response.status_code == 200:
try:
stats = response.json()
connection_count = stats.get("active_connections", 0)
avg_query_time = stats.get("avg_query_time_ms", 0)
slow_queries = stats.get("slow_queries_count", 0)
print(f"DB Performance: {connection_count} connections, {avg_query_time}ms avg query time, {slow_queries} slow queries")
# Performance validation
if avg_query_time > 500:
print(f"Warning: High average query time: {avg_query_time}ms")
except json.JSONDecodeError:
response.failure("Invalid JSON response")
else:
response.failure(f"Database performance check failed: {response.status_code}")