secrets-rotation

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Secrets Rotation

密钥轮换

Overview

概述

Implement automated secrets rotation strategy for credentials, API keys, certificates, and encryption keys with zero-downtime deployment and comprehensive audit logging.
为凭证、API密钥、证书和加密密钥实现自动化密钥轮换策略,支持零停机部署和全面审计日志。

When to Use

适用场景

  • API key management
  • Database credentials
  • TLS/SSL certificates
  • Encryption key rotation
  • Compliance requirements
  • Security incident response
  • Service account management
  • API密钥管理
  • 数据库凭证
  • TLS/SSL证书
  • 加密密钥轮换
  • 合规要求
  • 安全事件响应
  • 服务账号管理

Implementation Examples

实现示例

1. Node.js Secrets Manager with Rotation

1. 带轮换功能的Node.js密钥管理器

javascript
// secrets-manager.js
const AWS = require('aws-sdk');
const crypto = require('crypto');

class SecretsManager {
  constructor() {
    this.secretsManager = new AWS.SecretsManager({
      region: process.env.AWS_REGION
    });

    this.rotationSchedule = new Map();
  }

  /**
   * Generate new secret value
   */
  generateSecret(type = 'api_key', length = 32) {
    switch (type) {
      case 'api_key':
        return crypto.randomBytes(length).toString('hex');

      case 'password':
        // Generate strong password
        const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*';
        let password = '';
        for (let i = 0; i < length; i++) {
          password += chars.charAt(crypto.randomInt(chars.length));
        }
        return password;

      case 'jwt_secret':
        return crypto.randomBytes(64).toString('base64');

      default:
        return crypto.randomBytes(length).toString('base64');
    }
  }

  /**
   * Store secret in AWS Secrets Manager
   */
  async createSecret(name, value, description = '') {
    const params = {
      Name: name,
      SecretString: JSON.stringify(value),
      Description: description
    };

    try {
      const result = await this.secretsManager.createSecret(params).promise();
      return result;
    } catch (error) {
      if (error.code === 'ResourceExistsException') {
        // Update existing secret
        return this.updateSecret(name, value);
      }
      throw error;
    }
  }

  /**
   * Retrieve secret
   */
  async getSecret(name) {
    const params = { SecretId: name };

    try {
      const data = await this.secretsManager.getSecretValue(params).promise();

      if ('SecretString' in data) {
        return JSON.parse(data.SecretString);
      }

      // Binary secret
      const buff = Buffer.from(data.SecretBinary, 'base64');
      return buff.toString('ascii');
    } catch (error) {
      console.error(`Error retrieving secret ${name}:`, error);
      throw error;
    }
  }

  /**
   * Update secret value
   */
  async updateSecret(name, value) {
    const params = {
      SecretId: name,
      SecretString: JSON.stringify(value)
    };

    return this.secretsManager.updateSecret(params).promise();
  }

  /**
   * Rotate secret with zero downtime
   */
  async rotateSecret(name, type = 'api_key') {
    console.log(`Starting rotation for secret: ${name}`);

    try {
      // Step 1: Generate new secret
      const newValue = this.generateSecret(type);

      // Step 2: Store new version
      const currentSecret = await this.getSecret(name);

      // Keep old value temporarily for graceful transition
      const secretWithRotation = {
        current: newValue,
        previous: currentSecret.current || currentSecret,
        rotatedAt: new Date().toISOString()
      };

      await this.updateSecret(name, secretWithRotation);

      console.log(`New secret version created for: ${name}`);

      // Step 3: Wait for applications to pick up new secret
      await this.waitForPropagation(5000);

      // Step 4: Verify new secret works
      const verificationPassed = await this.verifySecret(name, newValue);

      if (!verificationPassed) {
        throw new Error('Secret verification failed');
      }

      // Step 5: Remove previous version after grace period
      setTimeout(async () => {
        await this.updateSecret(name, {
          current: newValue,
          rotatedAt: new Date().toISOString()
        });
        console.log(`Rotation completed for: ${name}`);
      }, 300000); // 5 minutes grace period

      return {
        success: true,
        secretName: name,
        rotatedAt: new Date().toISOString()
      };
    } catch (error) {
      console.error(`Rotation failed for ${name}:`, error);

      // Rollback on failure
      await this.rollbackRotation(name);

      throw error;
    }
  }

  /**
   * Schedule automatic rotation
   */
  async scheduleRotation(name, intervalDays = 90) {
    const intervalMs = intervalDays * 24 * 60 * 60 * 1000;

    const rotationJob = setInterval(async () => {
      try {
        await this.rotateSecret(name);
        console.log(`Scheduled rotation completed for: ${name}`);
      } catch (error) {
        console.error(`Scheduled rotation failed for ${name}:`, error);
        // Alert operations team
        this.sendAlert(name, error);
      }
    }, intervalMs);

    this.rotationSchedule.set(name, rotationJob);

    // AWS Secrets Manager automatic rotation
    const params = {
      SecretId: name,
      RotationLambdaARN: process.env.ROTATION_LAMBDA_ARN,
      RotationRules: {
        AutomaticallyAfterDays: intervalDays
      }
    };

    await this.secretsManager.rotateSecret(params).promise();
  }

  /**
   * Rotate database credentials
   */
  async rotateDatabaseCredentials(secretName) {
    const credentials = await this.getSecret(secretName);

    // Generate new password
    const newPassword = this.generateSecret('password', 20);

    // Update database user password
    const connection = await this.connectToDatabase(credentials);

    await connection.query(
      'ALTER USER ? IDENTIFIED BY ?',
      [credentials.username, newPassword]
    );

    // Update secret
    await this.updateSecret(secretName, {
      username: credentials.username,
      password: newPassword,
      host: credentials.host,
      database: credentials.database,
      rotatedAt: new Date().toISOString()
    });

    await connection.end();

    return { success: true };
  }

  /**
   * Rotate TLS certificate
   */
  async rotateTLSCertificate(domain) {
    // Use Let's Encrypt or internal CA
    const certbot = require('certbot');

    try {
      // Request new certificate
      const newCert = await certbot.certonly({
        domains: [domain],
        email: process.env.ADMIN_EMAIL,
        agreeTos: true,
        renewByDefault: true
      });

      // Store in secrets manager
      await this.createSecret(`tls-cert-${domain}`, {
        certificate: newCert.certificate,
        privateKey: newCert.privateKey,
        chain: newCert.chain,
        issuedAt: new Date().toISOString(),
        expiresAt: newCert.expiresAt
      });

      // Update load balancer/web server
      await this.updateServerCertificate(domain, newCert);

      console.log(`TLS certificate rotated for: ${domain}`);

      return { success: true };
    } catch (error) {
      console.error('Certificate rotation failed:', error);
      throw error;
    }
  }

  async waitForPropagation(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }

  async verifySecret(name, value) {
    // Implement verification logic
    // Test API call, database connection, etc.
    return true;
  }

  async rollbackRotation(name) {
    // Restore previous version
    console.log(`Rolling back rotation for: ${name}`);
  }

  async sendAlert(secretName, error) {
    // Send to monitoring system
    console.error(`ALERT: Rotation failed for ${secretName}`, error);
  }

  async connectToDatabase(credentials) {
    // Database connection logic
    return null;
  }

  async updateServerCertificate(domain, cert) {
    // Update server configuration
    return null;
  }
}

// Usage
const secretsManager = new SecretsManager();

// Rotate API key
async function rotateAPIKey() {
  await secretsManager.rotateSecret('api-key-external-service', 'api_key');
}

// Schedule automatic rotation
async function setupRotationSchedule() {
  await secretsManager.scheduleRotation('database-credentials', 90);
  await secretsManager.scheduleRotation('api-keys', 30);
}

// Rotate database credentials
async function rotateDatabaseCreds() {
  await secretsManager.rotateDatabaseCredentials('rds-production');
}

module.exports = SecretsManager;
javascript
// secrets-manager.js
const AWS = require('aws-sdk');
const crypto = require('crypto');

class SecretsManager {
  constructor() {
    this.secretsManager = new AWS.SecretsManager({
      region: process.env.AWS_REGION
    });

    this.rotationSchedule = new Map();
  }

  /**
   * Generate new secret value
   */
  generateSecret(type = 'api_key', length = 32) {
    switch (type) {
      case 'api_key':
        return crypto.randomBytes(length).toString('hex');

      case 'password':
        // Generate strong password
        const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*';
        let password = '';
        for (let i = 0; i < length; i++) {
          password += chars.charAt(crypto.randomInt(chars.length));
        }
        return password;

      case 'jwt_secret':
        return crypto.randomBytes(64).toString('base64');

      default:
        return crypto.randomBytes(length).toString('base64');
    }
  }

  /**
   * Store secret in AWS Secrets Manager
   */
  async createSecret(name, value, description = '') {
    const params = {
      Name: name,
      SecretString: JSON.stringify(value),
      Description: description
    };

    try {
      const result = await this.secretsManager.createSecret(params).promise();
      return result;
    } catch (error) {
      if (error.code === 'ResourceExistsException') {
        // Update existing secret
        return this.updateSecret(name, value);
      }
      throw error;
    }
  }

  /**
   * Retrieve secret
   */
  async getSecret(name) {
    const params = { SecretId: name };

    try {
      const data = await this.secretsManager.getSecretValue(params).promise();

      if ('SecretString' in data) {
        return JSON.parse(data.SecretString);
      }

      // Binary secret
      const buff = Buffer.from(data.SecretBinary, 'base64');
      return buff.toString('ascii');
    } catch (error) {
      console.error(`Error retrieving secret ${name}:`, error);
      throw error;
    }
  }

  /**
   * Update secret value
   */
  async updateSecret(name, value) {
    const params = {
      SecretId: name,
      SecretString: JSON.stringify(value)
    };

    return this.secretsManager.updateSecret(params).promise();
  }

  /**
   * Rotate secret with zero downtime
   */
  async rotateSecret(name, type = 'api_key') {
    console.log(`Starting rotation for secret: ${name}`);

    try {
      // Step 1: Generate new secret
      const newValue = this.generateSecret(type);

      // Step 2: Store new version
      const currentSecret = await this.getSecret(name);

      // Keep old value temporarily for graceful transition
      const secretWithRotation = {
        current: newValue,
        previous: currentSecret.current || currentSecret,
        rotatedAt: new Date().toISOString()
      };

      await this.updateSecret(name, secretWithRotation);

      console.log(`New secret version created for: ${name}`);

      // Step 3: Wait for applications to pick up new secret
      await this.waitForPropagation(5000);

      // Step 4: Verify new secret works
      const verificationPassed = await this.verifySecret(name, newValue);

      if (!verificationPassed) {
        throw new Error('Secret verification failed');
      }

      // Step 5: Remove previous version after grace period
      setTimeout(async () => {
        await this.updateSecret(name, {
          current: newValue,
          rotatedAt: new Date().toISOString()
        });
        console.log(`Rotation completed for: ${name}`);
      }, 300000); // 5 minutes grace period

      return {
        success: true,
        secretName: name,
        rotatedAt: new Date().toISOString()
      };
    } catch (error) {
      console.error(`Rotation failed for ${name}:`, error);

      // Rollback on failure
      await this.rollbackRotation(name);

      throw error;
    }
  }

  /**
   * Schedule automatic rotation
   */
  async scheduleRotation(name, intervalDays = 90) {
    const intervalMs = intervalDays * 24 * 60 * 60 * 1000;

    const rotationJob = setInterval(async () => {
      try {
        await this.rotateSecret(name);
        console.log(`Scheduled rotation completed for: ${name}`);
      } catch (error) {
        console.error(`Scheduled rotation failed for ${name}:`, error);
        // Alert operations team
        this.sendAlert(name, error);
      }
    }, intervalMs);

    this.rotationSchedule.set(name, rotationJob);

    // AWS Secrets Manager automatic rotation
    const params = {
      SecretId: name,
      RotationLambdaARN: process.env.ROTATION_LAMBDA_ARN,
      RotationRules: {
        AutomaticallyAfterDays: intervalDays
      }
    };

    await this.secretsManager.rotateSecret(params).promise();
  }

  /**
   * Rotate database credentials
   */
  async rotateDatabaseCredentials(secretName) {
    const credentials = await this.getSecret(secretName);

    // Generate new password
    const newPassword = this.generateSecret('password', 20);

    // Update database user password
    const connection = await this.connectToDatabase(credentials);

    await connection.query(
      'ALTER USER ? IDENTIFIED BY ?',
      [credentials.username, newPassword]
    );

    // Update secret
    await this.updateSecret(secretName, {
      username: credentials.username,
      password: newPassword,
      host: credentials.host,
      database: credentials.database,
      rotatedAt: new Date().toISOString()
    });

    await connection.end();

    return { success: true };
  }

  /**
   * Rotate TLS certificate
   */
  async rotateTLSCertificate(domain) {
    // Use Let's Encrypt or internal CA
    const certbot = require('certbot');

    try {
      // Request new certificate
      const newCert = await certbot.certonly({
        domains: [domain],
        email: process.env.ADMIN_EMAIL,
        agreeTos: true,
        renewByDefault: true
      });

      // Store in secrets manager
      await this.createSecret(`tls-cert-${domain}`, {
        certificate: newCert.certificate,
        privateKey: newCert.privateKey,
        chain: newCert.chain,
        issuedAt: new Date().toISOString(),
        expiresAt: newCert.expiresAt
      });

      // Update load balancer/web server
      await this.updateServerCertificate(domain, newCert);

      console.log(`TLS certificate rotated for: ${domain}`);

      return { success: true };
    } catch (error) {
      console.error('Certificate rotation failed:', error);
      throw error;
    }
  }

  async waitForPropagation(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }

  async verifySecret(name, value) {
    // Implement verification logic
    // Test API call, database connection, etc.
    return true;
  }

  async rollbackRotation(name) {
    // Restore previous version
    console.log(`Rolling back rotation for: ${name}`);
  }

  async sendAlert(secretName, error) {
    // Send to monitoring system
    console.error(`ALERT: Rotation failed for ${secretName}`, error);
  }

  async connectToDatabase(credentials) {
    // Database connection logic
    return null;
  }

  async updateServerCertificate(domain, cert) {
    // Update server configuration
    return null;
  }
}

// Usage
const secretsManager = new SecretsManager();

// Rotate API key
async function rotateAPIKey() {
  await secretsManager.rotateSecret('api-key-external-service', 'api_key');
}

// Schedule automatic rotation
async function setupRotationSchedule() {
  await secretsManager.scheduleRotation('database-credentials', 90);
  await secretsManager.scheduleRotation('api-keys', 30);
}

// Rotate database credentials
async function rotateDatabaseCreds() {
  await secretsManager.rotateDatabaseCredentials('rds-production');
}

module.exports = SecretsManager;

2. Python Secrets Rotation with Vault

2. 基于Vault的Python密钥轮换

python
undefined
python
undefined

secrets_rotation.py

secrets_rotation.py

import hvac import secrets import string from datetime import datetime, timedelta from typing import Dict, Any import psycopg2 import boto3
class SecretsRotation: def init(self, vault_url: str, vault_token: str): self.vault_client = hvac.Client(url=vault_url, token=vault_token) self.ssm = boto3.client('ssm')
def generate_secret(self, secret_type: str = 'api_key', length: int = 32) -> str:
    """Generate new secret value"""
    if secret_type == 'api_key':
        return secrets.token_urlsafe(length)

    elif secret_type == 'password':
        # Strong password with all character types
        chars = string.ascii_letters + string.digits + string.punctuation
        return ''.join(secrets.choice(chars) for _ in range(length))

    elif secret_type == 'jwt_secret':
        return secrets.token_urlsafe(64)

    else:
        return secrets.token_bytes(length).hex()

def rotate_secret(self, path: str, secret_type: str = 'api_key') -> Dict[str, Any]:
    """Rotate secret with zero downtime"""
    print(f"Starting rotation for: {path}")

    try:
        # Read current secret
        current_secret = self.vault_client.secrets.kv.v2.read_secret(path=path)
        current_data = current_secret['data']['data']

        # Generate new value
        new_value = self.generate_secret(secret_type)

        # Store with both old and new values
        rotation_data = {
            'current': new_value,
            'previous': current_data.get('current', current_data.get('value')),
            'rotated_at': datetime.utcnow().isoformat()
        }

        self.vault_client.secrets.kv.v2.create_or_update_secret(
            path=path,
            secret=rotation_data
        )

        print(f"Secret rotated successfully: {path}")

        return {
            'success': True,
            'path': path,
            'rotated_at': rotation_data['rotated_at']
        }

    except Exception as e:
        print(f"Rotation failed for {path}: {e}")
        raise

def rotate_database_password(self, secret_path: str) -> Dict[str, Any]:
    """Rotate database credentials"""
    # Get current credentials
    secret = self.vault_client.secrets.kv.v2.read_secret(path=secret_path)
    creds = secret['data']['data']

    # Generate new password
    new_password = self.generate_secret('password', 20)

    # Connect to database
    conn = psycopg2.connect(
        host=creds['host'],
        database=creds['database'],
        user=creds['username'],
        password=creds['password']
    )

    cursor = conn.cursor()

    try:
        # Update password in database
        cursor.execute(
            f"ALTER USER {creds['username']} WITH PASSWORD %s",
            (new_password,)
        )
        conn.commit()

        # Update secret in Vault
        updated_creds = {
            **creds,
            'password': new_password,
            'rotated_at': datetime.utcnow().isoformat()
        }

        self.vault_client.secrets.kv.v2.create_or_update_secret(
            path=secret_path,
            secret=updated_creds
        )

        print(f"Database credentials rotated: {secret_path}")

        return {'success': True}

    finally:
        cursor.close()
        conn.close()

def schedule_rotation(self, path: str, interval_days: int = 90):
    """Schedule automatic rotation using AWS Lambda"""
    # Create rotation schedule in AWS Secrets Manager
    # or use cron job

    schedule_expression = f"rate({interval_days} days)"

    # This would trigger a Lambda function
    print(f"Rotation scheduled for {path}: every {interval_days} days")

def rotate_encryption_keys(self, key_id: str):
    """Rotate encryption keys"""
    kms = boto3.client('kms')

    # Enable automatic key rotation
    kms.enable_key_rotation(KeyId=key_id)

    print(f"Automatic rotation enabled for KMS key: {key_id}")

def audit_rotation_history(self, path: str) -> list:
    """Get rotation history"""
    versions = self.vault_client.secrets.kv.v2.read_secret_metadata(path=path)

    history = []
    for version, metadata in versions['data']['versions'].items():
        history.append({
            'version': version,
            'created_time': metadata['created_time'],
            'deleted': metadata.get('deletion_time') is not None
        })

    return sorted(history, key=lambda x: x['created_time'], reverse=True)
import hvac import secrets import string from datetime import datetime, timedelta from typing import Dict, Any import psycopg2 import boto3
class SecretsRotation: def init(self, vault_url: str, vault_token: str): self.vault_client = hvac.Client(url=vault_url, token=vault_token) self.ssm = boto3.client('ssm')
def generate_secret(self, secret_type: str = 'api_key', length: int = 32) -> str:
    """Generate new secret value"""
    if secret_type == 'api_key':
        return secrets.token_urlsafe(length)

    elif secret_type == 'password':
        # Strong password with all character types
        chars = string.ascii_letters + string.digits + string.punctuation
        return ''.join(secrets.choice(chars) for _ in range(length))

    elif secret_type == 'jwt_secret':
        return secrets.token_urlsafe(64)

    else:
        return secrets.token_bytes(length).hex()

def rotate_secret(self, path: str, secret_type: str = 'api_key') -> Dict[str, Any]:
    """Rotate secret with zero downtime"""
    print(f"Starting rotation for: {path}")

    try:
        # Read current secret
        current_secret = self.vault_client.secrets.kv.v2.read_secret(path=path)
        current_data = current_secret['data']['data']

        # Generate new value
        new_value = self.generate_secret(secret_type)

        # Store with both old and new values
        rotation_data = {
            'current': new_value,
            'previous': current_data.get('current', current_data.get('value')),
            'rotated_at': datetime.utcnow().isoformat()
        }

        self.vault_client.secrets.kv.v2.create_or_update_secret(
            path=path,
            secret=rotation_data
        )

        print(f"Secret rotated successfully: {path}")

        return {
            'success': True,
            'path': path,
            'rotated_at': rotation_data['rotated_at']
        }

    except Exception as e:
        print(f"Rotation failed for {path}: {e}")
        raise

def rotate_database_password(self, secret_path: str) -> Dict[str, Any]:
    """Rotate database credentials"""
    # Get current credentials
    secret = self.vault_client.secrets.kv.v2.read_secret(path=secret_path)
    creds = secret['data']['data']

    # Generate new password
    new_password = self.generate_secret('password', 20)

    # Connect to database
    conn = psycopg2.connect(
        host=creds['host'],
        database=creds['database'],
        user=creds['username'],
        password=creds['password']
    )

    cursor = conn.cursor()

    try:
        # Update password in database
        cursor.execute(
            f"ALTER USER {creds['username']} WITH PASSWORD %s",
            (new_password,)
        )
        conn.commit()

        # Update secret in Vault
        updated_creds = {
            **creds,
            'password': new_password,
            'rotated_at': datetime.utcnow().isoformat()
        }

        self.vault_client.secrets.kv.v2.create_or_update_secret(
            path=secret_path,
            secret=updated_creds
        )

        print(f"Database credentials rotated: {secret_path}")

        return {'success': True}

    finally:
        cursor.close()
        conn.close()

def schedule_rotation(self, path: str, interval_days: int = 90):
    """Schedule automatic rotation using AWS Lambda"""
    # Create rotation schedule in AWS Secrets Manager
    # or use cron job

    schedule_expression = f"rate({interval_days} days)"

    # This would trigger a Lambda function
    print(f"Rotation scheduled for {path}: every {interval_days} days")

def rotate_encryption_keys(self, key_id: str):
    """Rotate encryption keys"""
    kms = boto3.client('kms')

    # Enable automatic key rotation
    kms.enable_key_rotation(KeyId=key_id)

    print(f"Automatic rotation enabled for KMS key: {key_id}")

def audit_rotation_history(self, path: str) -> list:
    """Get rotation history"""
    versions = self.vault_client.secrets.kv.v2.read_secret_metadata(path=path)

    history = []
    for version, metadata in versions['data']['versions'].items():
        history.append({
            'version': version,
            'created_time': metadata['created_time'],
            'deleted': metadata.get('deletion_time') is not None
        })

    return sorted(history, key=lambda x: x['created_time'], reverse=True)

Usage

Usage

if name == 'main': rotation = SecretsRotation( vault_url='http://localhost:8200', vault_token='your-token' )
# Rotate API key
rotation.rotate_secret('api-keys/external-service', 'api_key')

# Rotate database credentials
rotation.rotate_database_password('database/production')

# Schedule rotations
rotation.schedule_rotation('api-keys/external-service', 30)
rotation.schedule_rotation('database/production', 90)

# View history
history = rotation.audit_rotation_history('api-keys/external-service')
print(f"Rotation history: {history}")
undefined
if name == 'main': rotation = SecretsRotation( vault_url='http://localhost:8200', vault_token='your-token' )
# Rotate API key
rotation.rotate_secret('api-keys/external-service', 'api_key')

# Rotate database credentials
rotation.rotate_database_password('database/production')

# Schedule rotations
rotation.schedule_rotation('api-keys/external-service', 30)
rotation.schedule_rotation('database/production', 90)

# View history
history = rotation.audit_rotation_history('api-keys/external-service')
print(f"Rotation history: {history}")
undefined

3. Kubernetes Secrets Rotation

3. Kubernetes密钥轮换

yaml
undefined
yaml
undefined

secrets-rotation-cronjob.yaml

secrets-rotation-cronjob.yaml

apiVersion: batch/v1 kind: CronJob metadata: name: secrets-rotation namespace: production spec: schedule: "0 2 * * 0" # Weekly at 2 AM Sunday jobTemplate: spec: template: spec: serviceAccountName: secrets-rotator containers: - name: rotate image: secrets-rotator:latest env: - name: VAULT_ADDR value: "http://vault:8200" - name: VAULT_TOKEN valueFrom: secretKeyRef: name: vault-token key: token command: - /bin/sh - -c - | # Rotate secrets python /app/rotate_secrets.py
--secret database-password
--secret api-keys
--secret tls-certificates
      restartPolicy: OnFailure

apiVersion: v1 kind: ServiceAccount metadata: name: secrets-rotator namespace: production

apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: secrets-rotator namespace: production rules:
  • apiGroups: [""] resources: ["secrets"] verbs: ["get", "list", "update", "patch"]

apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: secrets-rotator namespace: production subjects:
  • kind: ServiceAccount name: secrets-rotator roleRef: kind: Role name: secrets-rotator apiGroup: rbac.authorization.k8s.io
undefined
apiVersion: batch/v1 kind: CronJob metadata: name: secrets-rotation namespace: production spec: schedule: "0 2 * * 0" # Weekly at 2 AM Sunday jobTemplate: spec: template: spec: serviceAccountName: secrets-rotator containers: - name: rotate image: secrets-rotator:latest env: - name: VAULT_ADDR value: "http://vault:8200" - name: VAULT_TOKEN valueFrom: secretKeyRef: name: vault-token key: token command: - /bin/sh - -c - | # Rotate secrets python /app/rotate_secrets.py
--secret database-password
--secret api-keys
--secret tls-certificates
      restartPolicy: OnFailure

apiVersion: v1 kind: ServiceAccount metadata: name: secrets-rotator namespace: production

apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: secrets-rotator namespace: production rules:
  • apiGroups: [""] resources: ["secrets"] verbs: ["get", "list", "update", "patch"]

apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: secrets-rotator namespace: production subjects:
  • kind: ServiceAccount name: secrets-rotator roleRef: kind: Role name: secrets-rotator apiGroup: rbac.authorization.k8s.io
undefined

Best Practices

最佳实践

✅ DO

✅ 建议

  • Automate rotation
  • Use grace periods
  • Verify new secrets
  • Maintain rotation audit trail
  • Implement rollback procedures
  • Monitor rotation failures
  • Use managed services (AWS Secrets Manager)
  • Test rotation procedures
  • 自动化轮换流程
  • 设置过渡期
  • 验证新密钥有效性
  • 保留轮换审计记录
  • 实现回滚流程
  • 监控轮换失败事件
  • 使用托管服务(如AWS Secrets Manager)
  • 测试轮换流程

❌ DON'T

❌ 避免

  • Hardcode secrets
  • Share secrets
  • Skip verification
  • Rotate without grace period
  • Ignore rotation failures
  • Store secrets in version control
  • 硬编码密钥
  • 共享密钥
  • 跳过验证步骤
  • 无过渡期直接轮换
  • 忽略轮换失败
  • 在版本控制系统中存储密钥

Rotation Schedule

轮换周期

  • API Keys: 30-90 days
  • Database Passwords: 90 days
  • TLS Certificates: Before expiry
  • Encryption Keys: 1 year
  • Service Account Tokens: 90 days
  • API密钥:30-90天
  • 数据库密码:90天
  • TLS证书:过期前轮换
  • 加密密钥:1年
  • 服务账号令牌:90天

Zero-Downtime Strategy

零停机策略

  1. Generate new secret
  2. Store with versioning
  3. Grace period (both versions valid)
  4. Verification
  5. Deprecate old version
  6. Remove after grace period
  1. 生成新密钥
  2. 版本化存储
  3. 过渡期(新旧版本均有效)
  4. 验证
  5. 弃用旧版本
  6. 过渡期后删除旧版本

Resources

参考资源