validation-quarantine

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Validation & Quarantine

数据验证与隔离

Validate incoming data with quality scoring and quarantine suspicious records without blocking the pipeline.
在不阻塞数据管道的情况下,通过质量评分验证传入数据并隔离可疑记录。

When to Use This Skill

何时使用该技能

  • Processing external data sources that are unreliable
  • Need quality scoring beyond simple schema validation
  • Want to quarantine suspicious data for manual review
  • Can't afford to block the pipeline for bad data
  • 处理不可靠的外部数据源
  • 需要超出简单 schema 验证的质量评分
  • 希望隔离可疑数据以便人工审核
  • 不能因不良数据阻塞数据管道

Core Concepts

核心概念

External data sources are unreliable. Schema violations crash pipelines, low-quality data pollutes databases, and you can't manually review every record.
The solution:
  1. Validate against schema
  2. Score quality based on domain rules
  3. Pass high-quality data through
  4. Quarantine suspicious data for review
  5. Reject invalid data
┌─────────────┐     ┌───────────────┐     ┌─────────────┐
│  Raw Data   │────▶│   Validator   │────▶│   Valid     │
└─────────────┘     └───────────────┘     └─────────────┘
                    ┌─────────────┐
                    │ Quarantine  │
                    └─────────────┘
外部数据源通常不可靠。Schema 违规会导致管道崩溃,低质量数据会污染数据库,而你无法人工审核每一条记录。
解决方案:
  1. 基于 schema 进行验证
  2. 根据领域规则进行质量评分
  3. 放行高质量数据
  4. 隔离可疑数据以待审核
  5. 拒绝无效数据
┌─────────────┐     ┌───────────────┐     ┌─────────────┐
│  Raw Data   │────▶│   Validator   │────▶│   Valid     │
└─────────────┘     └───────────────┘     └─────────────┘
                    ┌─────────────┐
                    │ Quarantine  │
                    └─────────────┘

Implementation

实现

TypeScript

TypeScript

typescript
import { z, ZodError } from 'zod';

interface ValidationResult<T> {
  success: boolean;
  data?: T;
  errors?: { path: string; message: string; code: string }[];
  qualityScore: number;
  warnings: string[];
}

interface BatchResult<T> {
  valid: T[];
  invalid: { original: unknown; errors: any[] }[];
  quarantined: { original: unknown; score: number; warnings: string[] }[];
  metrics: {
    totalProcessed: number;
    validPercent: number;
    avgQualityScore: number;
    processingTimeMs: number;
  };
}

class DataValidator<T> {
  constructor(
    private schema: z.ZodSchema<T>,
    private qualityScorer: (data: T) => { score: number; warnings: string[] },
    private quarantineThreshold = 50
  ) {}

  validate(raw: unknown): ValidationResult<T> {
    try {
      const parsed = this.schema.parse(raw);
      const { score, warnings } = this.qualityScorer(parsed);
      return { success: true, data: parsed, qualityScore: score, warnings };
    } catch (error) {
      if (error instanceof ZodError) {
        return {
          success: false,
          errors: error.errors.map(e => ({
            path: e.path.join('.'),
            message: e.message,
            code: e.code,
          })),
          qualityScore: 0,
          warnings: [],
        };
      }
      throw error;
    }
  }

  validateBatch(items: unknown[]): BatchResult<T> {
    const start = Date.now();
    const valid: T[] = [];
    const invalid: any[] = [];
    const quarantined: any[] = [];
    let totalScore = 0;

    for (const item of items) {
      const result = this.validate(item);
      
      if (!result.success) {
        invalid.push({ original: item, errors: result.errors });
      } else if (result.qualityScore < this.quarantineThreshold) {
        quarantined.push({
          original: item,
          score: result.qualityScore,
          warnings: result.warnings,
        });
      } else {
        valid.push(result.data!);
        totalScore += result.qualityScore;
      }
    }

    return {
      valid,
      invalid,
      quarantined,
      metrics: {
        totalProcessed: items.length,
        validPercent: items.length > 0 ? (valid.length / items.length) * 100 : 0,
        avgQualityScore: valid.length > 0 ? totalScore / valid.length : 0,
        processingTimeMs: Date.now() - start,
      },
    };
  }
}

// ============================================
// Quality Scorer Example
// ============================================

interface Article {
  title: string;
  url: string;
  domain: string;
  publishedAt: string;
}

function scoreArticle(article: Article): { score: number; warnings: string[] } {
  let score = 100;
  const warnings: string[] = [];

  // Title checks
  if (article.title.length < 20) {
    score -= 10;
    warnings.push('Short title');
  }
  if (/\b(click|subscribe|newsletter)\b/i.test(article.title)) {
    score -= 15;
    warnings.push('Promotional language');
  }

  // Source checks
  const ugcPlatforms = ['blogspot', 'wordpress', 'medium'];
  if (ugcPlatforms.some(p => article.domain.includes(p))) {
    score -= 10;
    warnings.push('User-generated content platform');
  }

  // Freshness check
  const ageMs = Date.now() - new Date(article.publishedAt).getTime();
  if (ageMs > 365 * 24 * 60 * 60 * 1000) {
    score -= 20;
    warnings.push('Article over 1 year old');
  }

  return { score: Math.max(0, score), warnings };
}

// ============================================
// Quarantine Store
// ============================================

type QuarantineReason = 
  | 'low_quality_score'
  | 'suspicious_content'
  | 'duplicate_detected'
  | 'source_blacklisted';

interface QuarantinedItem<T> {
  id: string;
  data: T;
  reason: QuarantineReason;
  qualityScore: number;
  warnings: string[];
  quarantinedAt: string;
  reviewStatus: 'pending' | 'approved' | 'rejected';
}

class QuarantineStore<T> {
  private items = new Map<string, QuarantinedItem<T>>();
  private maxItems = 10000;
  private autoRejectDays = 7;

  add(
    data: T, 
    reason: QuarantineReason, 
    score: number, 
    warnings: string[]
  ): string {
    const id = `q_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
    
    this.items.set(id, {
      id,
      data,
      reason,
      qualityScore: score,
      warnings,
      quarantinedAt: new Date().toISOString(),
      reviewStatus: 'pending',
    });

    this.enforceLimit();
    return id;
  }

  approve(id: string): T | null {
    const item = this.items.get(id);
    if (!item || item.reviewStatus !== 'pending') return null;
    
    item.reviewStatus = 'approved';
    return item.data;
  }

  reject(id: string): boolean {
    const item = this.items.get(id);
    if (!item) return false;
    
    item.reviewStatus = 'rejected';
    return true;
  }

  getPending(limit = 100): QuarantinedItem<T>[] {
    return Array.from(this.items.values())
      .filter(i => i.reviewStatus === 'pending')
      .slice(0, limit);
  }

  releaseApproved(): T[] {
    const approved: T[] = [];
    for (const [id, item] of this.items) {
      if (item.reviewStatus === 'approved') {
        approved.push(item.data);
        this.items.delete(id);
      }
    }
    return approved;
  }

  autoRejectStale(): number {
    const cutoff = Date.now() - this.autoRejectDays * 24 * 60 * 60 * 1000;
    let count = 0;
    
    for (const item of this.items.values()) {
      if (item.reviewStatus === 'pending' && 
          new Date(item.quarantinedAt).getTime() < cutoff) {
        item.reviewStatus = 'rejected';
        count++;
      }
    }
    return count;
  }

  private enforceLimit(): void {
    if (this.items.size <= this.maxItems) return;
    
    const sorted = Array.from(this.items.entries())
      .sort((a, b) => {
        if (a[1].reviewStatus === 'rejected' && b[1].reviewStatus !== 'rejected') return -1;
        return new Date(a[1].quarantinedAt).getTime() - new Date(b[1].quarantinedAt).getTime();
      });

    while (sorted.length > this.maxItems) {
      const [id] = sorted.shift()!;
      this.items.delete(id);
    }
  }
}
typescript
import { z, ZodError } from 'zod';

interface ValidationResult<T> {
  success: boolean;
  data?: T;
  errors?: { path: string; message: string; code: string }[];
  qualityScore: number;
  warnings: string[];
}

interface BatchResult<T> {
  valid: T[];
  invalid: { original: unknown; errors: any[] }[];
  quarantined: { original: unknown; score: number; warnings: string[] }[];
  metrics: {
    totalProcessed: number;
    validPercent: number;
    avgQualityScore: number;
    processingTimeMs: number;
  };
}

class DataValidator<T> {
  constructor(
    private schema: z.ZodSchema<T>,
    private qualityScorer: (data: T) => { score: number; warnings: string[] },
    private quarantineThreshold = 50
  ) {}

  validate(raw: unknown): ValidationResult<T> {
    try {
      const parsed = this.schema.parse(raw);
      const { score, warnings } = this.qualityScorer(parsed);
      return { success: true, data: parsed, qualityScore: score, warnings };
    } catch (error) {
      if (error instanceof ZodError) {
        return {
          success: false,
          errors: error.errors.map(e => ({
            path: e.path.join('.'),
            message: e.message,
            code: e.code,
          })),
          qualityScore: 0,
          warnings: [],
        };
      }
      throw error;
    }
  }

  validateBatch(items: unknown[]): BatchResult<T> {
    const start = Date.now();
    const valid: T[] = [];
    const invalid: any[] = [];
    const quarantined: any[] = [];
    let totalScore = 0;

    for (const item of items) {
      const result = this.validate(item);
      
      if (!result.success) {
        invalid.push({ original: item, errors: result.errors });
      } else if (result.qualityScore < this.quarantineThreshold) {
        quarantined.push({
          original: item,
          score: result.qualityScore,
          warnings: result.warnings,
        });
      } else {
        valid.push(result.data!);
        totalScore += result.qualityScore;
      }
    }

    return {
      valid,
      invalid,
      quarantined,
      metrics: {
        totalProcessed: items.length,
        validPercent: items.length > 0 ? (valid.length / items.length) * 100 : 0,
        avgQualityScore: valid.length > 0 ? totalScore / valid.length : 0,
        processingTimeMs: Date.now() - start,
      },
    };
  }
}

// ============================================
// Quality Scorer Example
// ============================================

interface Article {
  title: string;
  url: string;
  domain: string;
  publishedAt: string;
}

function scoreArticle(article: Article): { score: number; warnings: string[] } {
  let score = 100;
  const warnings: string[] = [];

  // Title checks
  if (article.title.length < 20) {
    score -= 10;
    warnings.push('Short title');
  }
  if (/\b(click|subscribe|newsletter)\b/i.test(article.title)) {
    score -= 15;
    warnings.push('Promotional language');
  }

  // Source checks
  const ugcPlatforms = ['blogspot', 'wordpress', 'medium'];
  if (ugcPlatforms.some(p => article.domain.includes(p))) {
    score -= 10;
    warnings.push('User-generated content platform');
  }

  // Freshness check
  const ageMs = Date.now() - new Date(article.publishedAt).getTime();
  if (ageMs > 365 * 24 * 60 * 60 * 1000) {
    score -= 20;
    warnings.push('Article over 1 year old');
  }

  return { score: Math.max(0, score), warnings };
}

// ============================================
// Quarantine Store
// ============================================

type QuarantineReason = 
  | 'low_quality_score'
  | 'suspicious_content'
  | 'duplicate_detected'
  | 'source_blacklisted';

interface QuarantinedItem<T> {
  id: string;
  data: T;
  reason: QuarantineReason;
  qualityScore: number;
  warnings: string[];
  quarantinedAt: string;
  reviewStatus: 'pending' | 'approved' | 'rejected';
}

class QuarantineStore<T> {
  private items = new Map<string, QuarantinedItem<T>>();
  private maxItems = 10000;
  private autoRejectDays = 7;

  add(
    data: T, 
    reason: QuarantineReason, 
    score: number, 
    warnings: string[]
  ): string {
    const id = `q_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
    
    this.items.set(id, {
      id,
      data,
      reason,
      qualityScore: score,
      warnings,
      quarantinedAt: new Date().toISOString(),
      reviewStatus: 'pending',
    });

    this.enforceLimit();
    return id;
  }

  approve(id: string): T | null {
    const item = this.items.get(id);
    if (!item || item.reviewStatus !== 'pending') return null;
    
    item.reviewStatus = 'approved';
    return item.data;
  }

  reject(id: string): boolean {
    const item = this.items.get(id);
    if (!item) return false;
    
    item.reviewStatus = 'rejected';
    return true;
  }

  getPending(limit = 100): QuarantinedItem<T>[] {
    return Array.from(this.items.values())
      .filter(i => i.reviewStatus === 'pending')
      .slice(0, limit);
  }

  releaseApproved(): T[] {
    const approved: T[] = [];
    for (const [id, item] of this.items) {
      if (item.reviewStatus === 'approved') {
        approved.push(item.data);
        this.items.delete(id);
      }
    }
    return approved;
  }

  autoRejectStale(): number {
    const cutoff = Date.now() - this.autoRejectDays * 24 * 60 * 60 * 1000;
    let count = 0;
    
    for (const item of this.items.values()) {
      if (item.reviewStatus === 'pending' && 
          new Date(item.quarantinedAt).getTime() < cutoff) {
        item.reviewStatus = 'rejected';
        count++;
      }
    }
    return count;
  }

  private enforceLimit(): void {
    if (this.items.size <= this.maxItems) return;
    
    const sorted = Array.from(this.items.entries())
      .sort((a, b) => {
        if (a[1].reviewStatus === 'rejected' && b[1].reviewStatus !== 'rejected') return -1;
        return new Date(a[1].quarantinedAt).getTime() - new Date(b[1].quarantinedAt).getTime();
      });

    while (sorted.length > this.maxItems) {
      const [id] = sorted.shift()!;
      this.items.delete(id);
    }
  }
}

Usage Examples

使用示例

Basic Validation Pipeline

基础验证管道

typescript
const ArticleSchema = z.object({
  title: z.string().min(1),
  url: z.string().url(),
  domain: z.string(),
  publishedAt: z.string().datetime(),
});

type Article = z.infer<typeof ArticleSchema>;

const validator = new DataValidator(ArticleSchema, scoreArticle, 50);
const quarantine = new QuarantineStore<Article>();

// Process batch
const result = validator.validateBatch(rawArticles);

// Handle quarantined items
for (const q of result.quarantined) {
  quarantine.add(q.original as Article, 'low_quality_score', q.score, q.warnings);
}

// Use valid data
await saveToDatabase(result.valid);

console.log(`Processed: ${result.metrics.totalProcessed}`);
console.log(`Valid: ${result.valid.length} (${result.metrics.validPercent.toFixed(1)}%)`);
console.log(`Quarantined: ${result.quarantined.length}`);
typescript
const ArticleSchema = z.object({
  title: z.string().min(1),
  url: z.string().url(),
  domain: z.string(),
  publishedAt: z.string().datetime(),
});

type Article = z.infer<typeof ArticleSchema>;

const validator = new DataValidator(ArticleSchema, scoreArticle, 50);
const quarantine = new QuarantineStore<Article>();

// Process batch
const result = validator.validateBatch(rawArticles);

// Handle quarantined items
for (const q of result.quarantined) {
  quarantine.add(q.original as Article, 'low_quality_score', q.score, q.warnings);
}

// Use valid data
await saveToDatabase(result.valid);

console.log(`Processed: ${result.metrics.totalProcessed}`);
console.log(`Valid: ${result.valid.length} (${result.metrics.validPercent.toFixed(1)}%)`);
console.log(`Quarantined: ${result.quarantined.length}`);

Best Practices

最佳实践

  1. Never block the pipeline for bad data - quarantine instead
  2. Quality scores are domain-specific - tune thresholds based on your data
  3. Auto-reject stale quarantined items - don't let the queue grow forever
  4. Expose quarantine for manual review via admin UI
  5. Track metrics to identify data quality trends
  1. 永远不要因不良数据阻塞管道——改用隔离方式
  2. 质量评分是领域特定的——根据你的数据调整阈值
  3. 自动拒绝过期的隔离条目——不要让队列无限增长
  4. 通过管理界面开放隔离数据供人工审核
  5. 跟踪指标以识别数据质量趋势

Common Mistakes

常见错误

  • Blocking the entire pipeline when validation fails
  • Using only schema validation without quality scoring
  • Not setting limits on quarantine store size
  • Forgetting to auto-reject stale items
  • Not exposing quarantine for manual review
  • 验证失败时阻塞整个管道
  • 仅使用 schema 验证而不进行质量评分
  • 未设置隔离存储的大小限制
  • 忘记自动拒绝过期条目
  • 未开放隔离数据供人工审核

Related Patterns

相关模式

  • batch-processing - Process validated items efficiently
  • deduplication - Deduplicate before validation
  • error-sanitization - Sanitize validation error messages
  • batch-processing - 高效处理已验证的条目
  • deduplication - 验证前进行去重
  • error-sanitization - 清理验证错误信息