Loading...
Loading...
Expert in Kanidm modern identity management system specializing in user/group management, OAuth2/OIDC, LDAP, RADIUS, SSH key management, WebAuthn, and MFA. Deep expertise in secure authentication flows, credential policies, access control, and platform integrations. Use when implementing identity management, SSO, authentication systems, or securing access to infrastructure.
npx skill4agent add martinholovsky/claude-skills-generator kanidm-expert# tests/test_kanidm_oauth2.py
import pytest
import httpx
class TestOAuth2Integration:
"""Test OAuth2/OIDC integration with Kanidm."""
@pytest.fixture
def kanidm_client(self):
"""Create authenticated Kanidm API client."""
return httpx.Client(
base_url="https://idm.example.com",
verify=True,
timeout=30.0
)
def test_oauth2_client_registration(self, kanidm_client):
"""Test OAuth2 client is properly registered."""
# This test will fail until implementation
response = kanidm_client.get(
"/oauth2/openid/myapp/.well-known/openid-configuration"
)
assert response.status_code == 200
config = response.json()
assert "authorization_endpoint" in config
assert "token_endpoint" in config
assert "userinfo_endpoint" in config
def test_oauth2_scopes_configured(self, kanidm_client):
"""Test required scopes are enabled."""
response = kanidm_client.get(
"/oauth2/openid/myapp/.well-known/openid-configuration"
)
config = response.json()
scopes = config.get("scopes_supported", [])
required_scopes = ["openid", "email", "profile", "groups"]
for scope in required_scopes:
assert scope in scopes, f"Missing scope: {scope}"
def test_token_exchange_flow(self, kanidm_client):
"""Test token exchange with authorization code."""
# Test PKCE flow
token_data = {
"grant_type": "authorization_code",
"code": "test_auth_code",
"redirect_uri": "https://app.example.com/callback",
"code_verifier": "test_verifier"
}
response = kanidm_client.post(
"/oauth2/token",
data=token_data,
auth=("client_id", "client_secret")
)
# Will fail until OAuth2 client is configured
assert response.status_code in [200, 400] # 400 for invalid code is OK# tests/test_kanidm_ldap.py
import ldap3
class TestLDAPIntegration:
"""Test LDAP integration with Kanidm."""
def test_ldap_connection(self):
"""Test LDAPS connection to Kanidm."""
server = ldap3.Server(
"ldaps://idm.example.com:3636",
use_ssl=True,
get_info=ldap3.ALL
)
conn = ldap3.Connection(
server,
user="name=ldap_bind,dc=idm,dc=example,dc=com",
password="test_password",
auto_bind=True
)
assert conn.bound, "LDAP bind failed"
conn.unbind()
def test_user_search(self):
"""Test LDAP user search."""
# Setup connection...
conn.search(
"dc=idm,dc=example,dc=com",
"(uid=jsmith)",
attributes=["uid", "mail", "displayName", "memberOf"]
)
assert len(conn.entries) == 1
user = conn.entries[0]
assert user.uid.value == "jsmith"
assert user.mail.value is not None
def test_group_membership(self):
"""Test user group memberships via LDAP."""
# Verify user is in expected groups
conn.search(
"dc=idm,dc=example,dc=com",
"(uid=jsmith)",
attributes=["memberOf"]
)
groups = conn.entries[0].memberOf.values
assert "developers" in str(groups)# tests/test_kanidm_config.sh
#!/bin/bash
# Test Kanidm configuration
set -e
echo "Testing Kanidm server connectivity..."
curl -sf https://idm.example.com/status || exit 1
echo "Testing OAuth2 endpoint..."
curl -sf https://idm.example.com/oauth2/openid/myapp/.well-known/openid-configuration || exit 1
echo "Testing LDAPS connectivity..."
ldapsearch -H ldaps://idm.example.com:3636 \
-D "name=ldap_bind,dc=idm,dc=example,dc=com" \
-w "$LDAP_BIND_PASSWORD" \
-b "dc=idm,dc=example,dc=com" \
"(objectClass=*)" -LLL | head -1 || exit 1
echo "Testing user existence..."
kanidm person get jsmith || exit 1
echo "Testing group membership..."
kanidm group list-members developers | grep -q jsmith || exit 1
echo "All tests passed!"# Implement OAuth2 client registration
kanidm oauth2 create myapp "My Application" \
--origin https://app.example.com
kanidm oauth2 add-redirect-url myapp \
https://app.example.com/callback
kanidm oauth2 enable-scope myapp openid email profile groups
# Implement LDAP bind account
kanidm service-account create ldap_bind "LDAP Bind Account"
kanidm service-account credential set-password ldap_bind
kanidm group add-members idm_account_read_priv ldap_bind
# Implement user and group
kanidm person create jsmith "John Smith" --mail john.smith@example.com
kanidm group add-members developers jsmith# Add security hardening
kanidm oauth2 enable-pkce myapp
kanidm oauth2 set-token-lifetime myapp --access 3600 --refresh 86400
# Add scope mapping for authorization
kanidm oauth2 create-scope-map myapp groups developers admins# Run all tests
pytest tests/test_kanidm_*.py -v
# Run integration tests
bash tests/test_kanidm_config.sh
# Verify security configuration
kanidm oauth2 get myapp | grep -q "pkce_enabled: true"
kanidm audit-log export --since "1 hour ago" --format json | jq .# Good: Connection pool for LDAP
import ldap3
from ldap3 import ServerPool, ROUND_ROBIN
# Create server pool for load balancing and failover
servers = [
ldap3.Server("ldaps://idm1.example.com:3636", use_ssl=True),
ldap3.Server("ldaps://idm2.example.com:3636", use_ssl=True),
]
server_pool = ServerPool(servers, ROUND_ROBIN, active=True)
# Connection pool with keep-alive
connection_pool = ldap3.Connection(
server_pool,
user="name=ldap_bind,dc=idm,dc=example,dc=com",
password=LDAP_PASSWORD,
client_strategy=ldap3.REUSABLE, # Connection pooling
pool_size=10,
pool_lifetime=300 # Recycle connections every 5 minutes
)
# Bad: New connection per request
def bad_search(username):
conn = ldap3.Connection(server, user=bind_dn, password=pwd)
conn.bind()
conn.search(...)
conn.unbind() # Connection overhead for every request!# Good: HTTP connection pooling for Kanidm API
import httpx
# Reusable client with connection pooling
kanidm_client = httpx.Client(
base_url="https://idm.example.com",
limits=httpx.Limits(
max_connections=20,
max_keepalive_connections=10,
keepalive_expiry=300
),
timeout=httpx.Timeout(30.0, connect=10.0)
)
# Bad: New client per request
def bad_api_call():
with httpx.Client() as client: # New connection every time!
return client.get("https://idm.example.com/api/...")# Good: Cache OAuth2 tokens to reduce auth requests
from functools import lru_cache
import time
class TokenCache:
def __init__(self):
self._cache = {}
def get_token(self, client_id: str) -> str | None:
"""Get cached token if still valid."""
if client_id in self._cache:
token, expiry = self._cache[client_id]
if time.time() < expiry - 60: # 1 minute buffer
return token
return None
def set_token(self, client_id: str, token: str, expires_in: int):
"""Cache token with expiry."""
self._cache[client_id] = (token, time.time() + expires_in)
token_cache = TokenCache()
async def get_access_token(client_id: str, client_secret: str) -> str:
# Check cache first
cached = token_cache.get_token(client_id)
if cached:
return cached
# Fetch new token
async with httpx.AsyncClient() as client:
response = await client.post(
"https://idm.example.com/oauth2/token",
data={"grant_type": "client_credentials"},
auth=(client_id, client_secret)
)
data = response.json()
token_cache.set_token(client_id, data["access_token"], data["expires_in"])
return data["access_token"]
# Bad: Fetch token on every request
async def bad_get_token():
# No caching - hits Kanidm on every API call!
response = await client.post("/oauth2/token", ...)
return response.json()["access_token"]# Good: Efficient LDAP search with specific attributes
def get_user_info(username: str):
conn.search(
search_base="dc=idm,dc=example,dc=com",
search_filter=f"(uid={ldap3.utils.conv.escape_filter_chars(username)})",
search_scope=ldap3.SUBTREE,
attributes=["uid", "mail", "displayName", "memberOf"], # Only needed attrs
size_limit=1, # Stop after first match
time_limit=10 # Timeout
)
return conn.entries[0] if conn.entries else None
# Bad: Fetch all attributes
def bad_get_user(username):
conn.search(
"dc=idm,dc=example,dc=com",
f"(uid={username})", # No escaping - LDAP injection risk!
attributes=ldap3.ALL_ATTRIBUTES # Fetches everything - slow!
)# Good: Batch LDAP queries for multiple users
def get_users_batch(usernames: list[str]) -> list:
"""Fetch multiple users in single query."""
escaped = [ldap3.utils.conv.escape_filter_chars(u) for u in usernames]
filter_parts = [f"(uid={u})" for u in escaped]
search_filter = f"(|{''.join(filter_parts)})"
conn.search(
"dc=idm,dc=example,dc=com",
search_filter,
attributes=["uid", "mail", "displayName"]
)
return list(conn.entries)
# Bad: Individual query per user
def bad_get_users(usernames):
results = []
for username in usernames: # N queries instead of 1!
conn.search(..., f"(uid={username})", ...)
results.append(conn.entries[0])
return results# Good: Service account with API token for automation
import os
class KanidmClient:
def __init__(self):
self.base_url = os.environ["KANIDM_URL"]
self.api_token = os.environ["KANIDM_API_TOKEN"]
self._client = httpx.Client(
base_url=self.base_url,
headers={"Authorization": f"Bearer {self.api_token}"},
timeout=30.0
)
def get_user(self, username: str):
response = self._client.get(f"/v1/person/{username}")
response.raise_for_status()
return response.json()
def close(self):
self._client.close()
# Usage with context manager
class KanidmClientContext:
def __enter__(self):
self.client = KanidmClient()
return self.client
def __exit__(self, *args):
self.client.close()
# Bad: Interactive authentication for automation
def bad_automation():
# Prompts for password - can't automate!
subprocess.run(["kanidm", "login"])# Good: Async for concurrent identity operations
import asyncio
import httpx
async def verify_users_async(usernames: list[str]) -> dict[str, bool]:
"""Verify multiple users exist concurrently."""
async with httpx.AsyncClient(
base_url="https://idm.example.com",
headers={"Authorization": f"Bearer {API_TOKEN}"}
) as client:
tasks = [
client.get(f"/v1/person/{username}")
for username in usernames
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
return {
username: not isinstance(resp, Exception) and resp.status_code == 200
for username, resp in zip(usernames, responses)
}
# Bad: Sequential verification
def bad_verify_users(usernames):
results = {}
for username in usernames: # One at a time - slow!
response = client.get(f"/v1/person/{username}")
results[username] = response.status_code == 200
return results# Install Kanidm server
# For production: use proper TLS certificates
kanidmd cert-generate --ca-path /data/ca.pem --cert-path /data/cert.pem \
--key-path /data/key.pem --domain idm.example.com
# Configure server.toml
cat > /etc/kanidm/server.toml <<EOF
# Core settings
bindaddress = "[::]:8443"
ldapbindaddress = "[::]:3636"
domain = "idm.example.com"
origin = "https://idm.example.com"
# Database
db_path = "/data/kanidm.db"
# TLS (REQUIRED for production)
tls_chain = "/data/cert.pem"
tls_key = "/data/key.pem"
# Logging
log_level = "info"
# Backup (CRITICAL)
online_backup = "/data/backups/"
EOF
# Initialize database (FIRST TIME ONLY)
kanidmd database init
# Recover admin password
kanidmd recover-account admin
# Start server
kanidmd server -c /etc/kanidm/server.toml# Create user with full attributes
kanidm person create jsmith "John Smith" \
--mail john.smith@example.com
# Set POSIX attributes for Unix/Linux
kanidm person posix set jsmith --gidnumber 10000
# Add to groups
kanidm group add-members developers jsmith
kanidm group add-members vpn_users jsmith
# Set strong password policy
kanidm person credential set-password jsmith
# Enable WebAuthn (REQUIRED for privileged accounts)
# User enrolls via web UI: https://idm.example.com/
# Suspend account (don't delete - audit trail)
kanidm account lock jsmith --reason "Offboarding - 2025-11-19"
# Generate API token for service accounts
kanidm service-account api-token generate svc_gitlab \
--name "GitLab OIDC Integration" --expiry "2026-01-01"# Register OAuth2 client for application
kanidm oauth2 create gitlab_oidc "GitLab SSO" \
--origin https://gitlab.example.com
# Add redirect URIs (EXACT MATCH REQUIRED)
kanidm oauth2 add-redirect-url gitlab_oidc \
https://gitlab.example.com/users/auth/openid_connect/callback
# Enable required scopes
kanidm oauth2 enable-scope gitlab_oidc openid email profile groups
# Set token lifetimes
kanidm oauth2 set-token-lifetime gitlab_oidc --access 3600 --refresh 86400
# Enable PKCE for mobile/SPA clients
kanidm oauth2 enable-pkce mobile_app
# Map groups to claims (for authorization)
kanidm oauth2 create-scope-map gitlab_oidc groups developers admins
# Get client credentials
kanidm oauth2 show-basic-secret gitlab_oidc
# Output: client_id and client_secret
# Application configuration
# Provider: https://idm.example.com/oauth2/openid/gitlab_oidc
# Discovery: https://idm.example.com/oauth2/openid/gitlab_oidc/.well-known/openid-configuration# Create LDAP bind account
kanidm service-account create ldap_bind "LDAP Bind Account"
kanidm service-account credential set-password ldap_bind
# Grant LDAP read access
kanidm group add-members idm_account_read_priv ldap_bind
# LDAP connection parameters
# Server: ldaps://idm.example.com:3636
# Base DN: dc=idm,dc=example,dc=com
# Bind DN: name=ldap_bind,dc=idm,dc=example,dc=com
# Bind Password: [set above]
# Test LDAP search
ldapsearch -H ldaps://idm.example.com:3636 \
-D "name=ldap_bind,dc=idm,dc=example,dc=com" \
-W -b "dc=idm,dc=example,dc=com" \
"(uid=jsmith)"
# Common LDAP attributes
# uid: username
# mail: email address
# displayName: full name
# memberOf: group memberships
# uidNumber: POSIX UID
# gidNumber: POSIX GID
# loginShell: /bin/bash
# homeDirectory: /home/username# Configure RADIUS client (network device)
kanidm radius create wifi_controller "Wireless Controller" \
--address 10.0.1.100
# Generate strong shared secret
kanidm radius generate-secret wifi_controller
# Output: Strong random secret - configure on network device
# Grant RADIUS access to group
kanidm group create wifi_users "Wireless Network Users"
kanidm group add-members wifi_users jsmith
kanidm radius add-group wifi_controller wifi_users
# Configure network device
# RADIUS Server: idm.example.com
# Authentication Port: 1812
# Accounting Port: 1813
# Shared Secret: [from generate-secret above]
# Test RADIUS authentication
# Use tool like radtest or network device test
radtest jsmith password idm.example.com 0 shared-secret
# Monitor RADIUS logs
journalctl -u kanidmd -f | grep radius# User uploads SSH public key via CLI
kanidm person ssh add-publickey jsmith "ssh-name" \
"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIExample..."
# Configure SSH server to fetch keys from Kanidm
# Install kanidm-ssh package on target systems
# /etc/ssh/sshd_config
cat >> /etc/ssh/sshd_config <<EOF
# Kanidm SSH key management
AuthorizedKeysCommand /usr/bin/kanidm_ssh_authorizedkeys %u
AuthorizedKeysCommandUser nobody
PubkeyAuthentication yes
EOF
# Configure kanidm-ssh client
cat > /etc/kanidm/config <<EOF
uri = "https://idm.example.com"
verify_ca = true
verify_hostnames = true
EOF
# Restart SSH
systemctl restart sshd
# PAM integration for password authentication
# /etc/pam.d/common-auth (Debian/Ubuntu)
auth sufficient pam_kanidm.so
auth required pam_deny.so
# NSS integration for user resolution
# /etc/nsswitch.conf
passwd: files kanidm
group: files kanidm
shadow: files kanidm
# Test PAM authentication
pamtester login jsmith authenticate# Create strong credential policy
kanidm credential-policy create high_security \
--minimum-length 16 \
--require-uppercase \
--require-lowercase \
--require-number \
--require-symbol \
--password-history 12
# Apply to privileged group
kanidm group create privileged_users "High Security Policy Users"
kanidm group add-members privileged_users admin sysadmin
kanidm credential-policy apply high_security privileged_users
# Configure account lockout
kanidm account-policy set-lockout --threshold 5 --duration 3600
# Enable comprehensive audit logging
# server.toml
log_level = "info" # or "debug" for detailed auditing
# Monitor authentication failures
journalctl -u kanidmd -f | grep "authentication failure"
# Regular backup (CRITICAL)
# Online backup (server running)
kanidmd backup /data/backups/kanidm-$(date +%Y%m%d-%H%M%S).json
# Offline backup (server stopped)
kanidmd database backup /data/backups/
# Test restore procedure
kanidmd database restore /data/backups/kanidm-20251119.json
# Verify database integrity
kanidmd database verify
# Export audit logs
kanidm audit-log export --since "2025-11-01" --format json > audit.json# ❌ DON'T - Plain LDAP exposes credentials
ldapsearch -H ldap://idm.example.com:389 ...
# ✅ DO - Always use LDAPS
ldapsearch -H ldaps://idm.example.com:3636 ...
# ❌ DON'T - Overprivileged bind account
kanidm group add-members idm_admins ldap_bind
# ✅ DO - Minimal read-only access
kanidm group add-members idm_account_read_priv ldap_bind# ❌ DON'T - Predictable or short secrets
kanidm radius set-secret wifi_controller "password123"
# ✅ DO - Use generate-secret for strong random secrets
kanidm radius generate-secret wifi_controller# ❌ DON'T - Password-only for admin access
kanidm person credential set-password admin
# ✅ DO - Require WebAuthn for admins
# User must enroll WebAuthn via web UI
# Configure credential policy to require WebAuthn
kanidm credential-policy create admin_policy --require-webauthn
kanidm group add-members idm_admins admin
kanidm credential-policy apply admin_policy idm_admins# ❌ DON'T - Wildcard URIs enable token theft
kanidm oauth2 add-redirect-url myapp "https://*.example.com/callback"
# ✅ DO - Exact URI matching
kanidm oauth2 add-redirect-url myapp "https://app.example.com/callback"
kanidm oauth2 add-redirect-url myapp "https://app2.example.com/callback"# ❌ DON'T - No backups
# [Server runs with no backup procedures]
# ✅ DO - Automated daily backups
# Create backup script
cat > /usr/local/bin/kanidm-backup.sh <<'EOF'
#!/bin/bash
BACKUP_DIR="/data/backups"
DATE=$(date +%Y%m%d-%H%M%S)
kanidmd backup "${BACKUP_DIR}/kanidm-${DATE}.json"
# Keep last 30 days
find "${BACKUP_DIR}" -name "kanidm-*.json" -mtime +30 -delete
EOF
# Cron job
0 2 * * * /usr/local/bin/kanidm-backup.sh# ❌ DON'T - Reuse UIDs after account deletion
# User jsmith (uid=10001) deleted
kanidm person create newuser "New User" --gidnumber 10001 # DANGEROUS!
# ✅ DO - Increment UIDs, never reuse
kanidm person create newuser "New User" --gidnumber 10015 # Next available# ❌ DON'T - Direct internet exposure
bindaddress = "0.0.0.0:8443" # No firewall, no reverse proxy
# ✅ DO - Behind reverse proxy with rate limiting
# nginx reverse proxy with rate limiting
location / {
proxy_pass https://localhost:8443;
limit_req zone=auth burst=5;
}
# Or firewall restriction
ufw allow from 10.0.0.0/8 to any port 8443# ❌ DON'T - Delete accounts (loses audit trail)
kanidm person delete jsmith
# ✅ DO - Lock accounts to preserve history
kanidm account lock jsmith --reason "Offboarding - 2025-11-19"
# Review locked accounts
kanidm person get jsmith# tests/test_kanidm_service.py
import pytest
from unittest.mock import Mock, patch, MagicMock
import httpx
class TestKanidmService:
"""Unit tests for Kanidm service layer."""
@pytest.fixture
def mock_client(self):
"""Create mock httpx client."""
return Mock(spec=httpx.Client)
def test_get_user_success(self, mock_client):
"""Test successful user retrieval."""
mock_client.get.return_value = Mock(
status_code=200,
json=lambda: {
"attrs": {
"uuid": ["abc-123"],
"name": ["jsmith"],
"displayname": ["John Smith"],
"mail": ["john@example.com"]
}
}
)
from myapp.kanidm import KanidmService
service = KanidmService(client=mock_client)
user = service.get_user("jsmith")
assert user["name"] == "jsmith"
assert user["mail"] == "john@example.com"
mock_client.get.assert_called_once_with("/v1/person/jsmith")
def test_get_user_not_found(self, mock_client):
"""Test user not found handling."""
mock_client.get.return_value = Mock(status_code=404)
from myapp.kanidm import KanidmService
service = KanidmService(client=mock_client)
with pytest.raises(UserNotFoundError):
service.get_user("nonexistent")
def test_oauth2_token_validation(self, mock_client):
"""Test OAuth2 token introspection."""
mock_client.post.return_value = Mock(
status_code=200,
json=lambda: {
"active": True,
"sub": "jsmith",
"scope": "openid email profile",
"exp": 1732123456
}
)
from myapp.kanidm import validate_token
result = validate_token(mock_client, "test_token")
assert result["active"] is True
assert result["sub"] == "jsmith"
def test_group_membership_check(self, mock_client):
"""Test group membership verification."""
mock_client.get.return_value = Mock(
status_code=200,
json=lambda: {
"attrs": {
"memberof": ["developers", "vpn_users"]
}
}
)
from myapp.kanidm import is_member_of
assert is_member_of(mock_client, "jsmith", "developers") is True
assert is_member_of(mock_client, "jsmith", "admins") is False# tests/integration/test_kanidm_integration.py
import pytest
import os
import httpx
import ldap3
@pytest.fixture(scope="session")
def kanidm_url():
"""Get Kanidm server URL from environment."""
return os.environ.get("KANIDM_TEST_URL", "https://idm.test.example.com")
@pytest.fixture(scope="session")
def api_token():
"""Get API token for testing."""
return os.environ["KANIDM_TEST_TOKEN"]
@pytest.fixture
def kanidm_client(kanidm_url, api_token):
"""Create authenticated Kanidm client."""
client = httpx.Client(
base_url=kanidm_url,
headers={"Authorization": f"Bearer {api_token}"},
timeout=30.0
)
yield client
client.close()
class TestOAuth2Integration:
"""Integration tests for OAuth2/OIDC."""
def test_openid_discovery(self, kanidm_client):
"""Test OpenID Connect discovery endpoint."""
response = kanidm_client.get(
"/oauth2/openid/testapp/.well-known/openid-configuration"
)
assert response.status_code == 200
config = response.json()
assert "issuer" in config
assert "authorization_endpoint" in config
assert "token_endpoint" in config
assert "jwks_uri" in config
def test_token_endpoint(self, kanidm_client):
"""Test token endpoint responds correctly."""
response = kanidm_client.post(
"/oauth2/token",
data={
"grant_type": "client_credentials",
"scope": "openid"
},
auth=("test_client", os.environ["TEST_CLIENT_SECRET"])
)
assert response.status_code == 200
tokens = response.json()
assert "access_token" in tokens
assert "token_type" in tokens
assert tokens["token_type"] == "Bearer"
class TestLDAPIntegration:
"""Integration tests for LDAP."""
@pytest.fixture
def ldap_connection(self):
"""Create LDAP connection."""
server = ldap3.Server(
os.environ.get("KANIDM_LDAP_URL", "ldaps://idm.test.example.com:3636"),
use_ssl=True,
get_info=ldap3.ALL
)
conn = ldap3.Connection(
server,
user=os.environ["LDAP_BIND_DN"],
password=os.environ["LDAP_BIND_PASSWORD"],
auto_bind=True
)
yield conn
conn.unbind()
def test_ldap_bind(self, ldap_connection):
"""Test LDAP bind succeeds."""
assert ldap_connection.bound
def test_user_search(self, ldap_connection):
"""Test LDAP user search."""
ldap_connection.search(
search_base=os.environ.get("LDAP_BASE_DN", "dc=idm,dc=example,dc=com"),
search_filter="(uid=testuser)",
attributes=["uid", "mail", "displayName"]
)
assert len(ldap_connection.entries) >= 0 # May or may not exist
def test_group_search(self, ldap_connection):
"""Test LDAP group search."""
ldap_connection.search(
search_base=os.environ.get("LDAP_BASE_DN", "dc=idm,dc=example,dc=com"),
search_filter="(objectClass=group)",
attributes=["cn", "member"]
)
assert ldap_connection.result["result"] == 0
class TestRADIUSIntegration:
"""Integration tests for RADIUS (requires radtest)."""
@pytest.mark.skip(reason="Requires RADIUS client tools")
def test_radius_authentication(self):
"""Test RADIUS authentication flow."""
import subprocess
result = subprocess.run(
[
"radtest",
"testuser",
os.environ["TEST_USER_PASSWORD"],
os.environ.get("RADIUS_SERVER", "idm.test.example.com"),
"0",
os.environ["RADIUS_SECRET"]
],
capture_output=True,
text=True
)
assert "Access-Accept" in result.stdout# tests/e2e/test_auth_flows.py
import pytest
from playwright.sync_api import Page, expect
class TestWebAuthnFlow:
"""E2E tests for WebAuthn authentication."""
@pytest.fixture
def kanidm_url(self):
return "https://idm.test.example.com"
def test_login_page_loads(self, page: Page, kanidm_url):
"""Test login page is accessible."""
page.goto(kanidm_url)
expect(page.locator("input[name='username']")).to_be_visible()
expect(page.locator("button[type='submit']")).to_be_visible()
def test_oauth2_authorization_flow(self, page: Page, kanidm_url):
"""Test OAuth2 authorization code flow."""
# Start authorization
page.goto(
f"{kanidm_url}/oauth2/authorize?"
"client_id=testapp&"
"redirect_uri=https://app.test.example.com/callback&"
"response_type=code&"
"scope=openid%20email%20profile"
)
# Should redirect to login
expect(page.locator("input[name='username']")).to_be_visible()
# Login
page.fill("input[name='username']", "testuser")
page.fill("input[name='password']", "testpassword")
page.click("button[type='submit']")
# Should redirect to callback with code
page.wait_for_url("**/callback?code=*")
assert "code=" in page.url# tests/security/test_kanidm_security.py
import pytest
import httpx
class TestSecurityConfiguration:
"""Security configuration tests."""
@pytest.fixture
def client(self):
return httpx.Client(timeout=10.0, verify=True)
def test_tls_required(self, client):
"""Test that HTTP is rejected, only HTTPS works."""
# HTTP should fail or redirect
with pytest.raises(httpx.ConnectError):
client.get("http://idm.example.com:8080")
# HTTPS should work
response = client.get("https://idm.example.com/status")
assert response.status_code == 200
def test_no_plain_ldap(self):
"""Test that plain LDAP is disabled."""
import ldap3
import socket
# Plain LDAP (port 389) should be closed
server = ldap3.Server("idm.example.com", port=389, use_ssl=False)
conn = ldap3.Connection(server)
# Should fail to connect
with pytest.raises((ldap3.core.exceptions.LDAPSocketOpenError, socket.error)):
conn.bind()
def test_oauth2_redirect_uri_validation(self, client):
"""Test that only exact redirect URIs are allowed."""
# Valid redirect
response = client.get(
"https://idm.example.com/oauth2/authorize",
params={
"client_id": "testapp",
"redirect_uri": "https://app.example.com/callback",
"response_type": "code"
},
follow_redirects=False
)
assert response.status_code in [302, 200]
# Invalid redirect should be rejected
response = client.get(
"https://idm.example.com/oauth2/authorize",
params={
"client_id": "testapp",
"redirect_uri": "https://evil.com/callback",
"response_type": "code"
},
follow_redirects=False
)
assert response.status_code in [400, 403]
def test_account_lockout(self, client):
"""Test account lockout after failed attempts."""
# Attempt multiple failed logins
for _ in range(6):
response = client.post(
"https://idm.example.com/v1/auth",
json={"username": "testuser", "password": "wrongpassword"}
)
# Account should be locked
response = client.post(
"https://idm.example.com/v1/auth",
json={"username": "testuser", "password": "correctpassword"}
)
assert response.status_code == 403
assert "locked" in response.text.lower()# Run all unit tests
pytest tests/test_*.py -v
# Run integration tests (requires test environment)
export KANIDM_TEST_URL="https://idm.test.example.com"
export KANIDM_TEST_TOKEN="your-test-token"
pytest tests/integration/ -v
# Run security tests
pytest tests/security/ -v --tb=short
# Run with coverage
pytest tests/ --cov=myapp --cov-report=html
# Run E2E tests
playwright install chromium
pytest tests/e2e/ -v
# Continuous integration
pytest tests/ -v --junitxml=results.xmlpytest tests/test_*.py -vpytest tests/integration/ -vpytest tests/security/ -vpytest tests/e2e/ -vreferences/integration-guide.mdreferences/security-config.md