Loading...
Loading...
Compare original and translation side by side
Source_Rows == Success_Rows + Quarantine_RowsSource_Rows == Success_Rows + Quarantine_Rowslambdaimportexecevaloslambdaimportexecevalos"John Doe ID:101""Jon Doe ID:102""John Doe ID:101""Jon Doe ID:102"[Row_ID, Old_Value, New_Value, Lambda_Applied, Confidence_Score, Model_Version, Timestamp][Row_ID, Old_Value, New_Value, Lambda_Applied, Confidence_Score, Model_Version, Timestamp]NEEDS_AINEEDS_AIfrom sentence_transformers import SentenceTransformer
import chromadb
def cluster_anomalies(suspect_rows: list[str]) -> chromadb.Collection:
"""
Compress N anomalous rows into semantic clusters.
50,000 date format errors → ~12 pattern groups.
SLM gets 12 calls, not 50,000.
"""
model = SentenceTransformer('all-MiniLM-L6-v2') # local, no API
embeddings = model.encode(suspect_rows).tolist()
collection = chromadb.Client().create_collection("anomaly_clusters")
collection.add(
embeddings=embeddings,
documents=suspect_rows,
ids=[str(i) for i in range(len(suspect_rows))]
)
return collectionfrom sentence_transformers import SentenceTransformer
import chromadb
def cluster_anomalies(suspect_rows: list[str]) -> chromadb.Collection:
"""
Compress N anomalous rows into semantic clusters.
50,000 date format errors → ~12 pattern groups.
SLM gets 12 calls, not 50,000.
"""
model = SentenceTransformer('all-MiniLM-L6-v2') # local, no API
embeddings = model.encode(suspect_rows).tolist()
collection = chromadb.Client().create_collection("anomaly_clusters")
collection.add(
embeddings=embeddings,
documents=suspect_rows,
ids=[str(i) for i in range(len(suspect_rows))]
)
return collectionimport ollama, json
SYSTEM_PROMPT = """You are a data transformation assistant.
Respond ONLY with this exact JSON structure:
{
"transformation": "lambda x: <valid python expression>",
"confidence_score": <float 0.0-1.0>,
"reasoning": "<one sentence>",
"pattern_type": "<date_format|encoding|type_cast|string_clean|null_handling>"
}
No markdown. No explanation. No preamble. JSON only."""
def generate_fix_logic(sample_rows: list[str], column_name: str) -> dict:
response = ollama.chat(
model='phi3', # local, air-gapped — zero external calls
messages=[
{'role': 'system', 'content': SYSTEM_PROMPT},
{'role': 'user', 'content': f"Column: '{column_name}'\nSamples:\n" + "\n".join(sample_rows)}
]
)
result = json.loads(response['message']['content'])
# Safety gate — reject anything that isn't a simple lambda
forbidden = ['import', 'exec', 'eval', 'os.', 'subprocess']
if not result['transformation'].startswith('lambda'):
raise ValueError("Rejected: output must be a lambda function")
if any(term in result['transformation'] for term in forbidden):
raise ValueError("Rejected: forbidden term in lambda")
return resultimport ollama, json
SYSTEM_PROMPT = """You are a data transformation assistant.
Respond ONLY with this exact JSON structure:
{
"transformation": "lambda x: <valid python expression>",
"confidence_score": <float 0.0-1.0>,
"reasoning": "<one sentence>",
"pattern_type": "<date_format|encoding|type_cast|string_clean|null_handling>"
}
No markdown. No explanation. No preamble. JSON only."""
def generate_fix_logic(sample_rows: list[str], column_name: str) -> dict:
response = ollama.chat(
model='phi3', # local, air-gapped — zero external calls
messages=[
{'role': 'system', 'content': SYSTEM_PROMPT},
{'role': 'user', 'content': f"Column: '{column_name}'\nSamples:\n" + "\n".join(sample_rows)}
]
)
result = json.loads(response['message']['content'])
# Safety gate — reject anything that isn't a simple lambda
forbidden = ['import', 'exec', 'eval', 'os.', 'subprocess']
if not result['transformation'].startswith('lambda'):
raise ValueError("Rejected: output must be a lambda function")
if any(term in result['transformation'] for term in forbidden):
raise ValueError("Rejected: forbidden term in lambda")
return resultimport pandas as pd
def apply_fix_to_cluster(df: pd.DataFrame, column: str, fix: dict) -> pd.DataFrame:
"""Apply AI-generated lambda across entire cluster — vectorized, not looped."""
if fix['confidence_score'] < 0.75:
# Low confidence → quarantine, don't auto-fix
df['validation_status'] = 'HUMAN_REVIEW'
df['quarantine_reason'] = f"Low confidence: {fix['confidence_score']}"
return df
transform_fn = eval(fix['transformation']) # safe — evaluated only after strict validation gate (lambda-only, no imports/exec/os)
df[column] = df[column].map(transform_fn)
df['validation_status'] = 'AI_FIXED'
df['ai_reasoning'] = fix['reasoning']
df['confidence_score'] = fix['confidence_score']
return dfimport pandas as pd
def apply_fix_to_cluster(df: pd.DataFrame, column: str, fix: dict) -> pd.DataFrame:
"""Apply AI-generated lambda across entire cluster — vectorized, not looped."""
if fix['confidence_score'] < 0.75:
# Low confidence → quarantine, don't auto-fix
df['validation_status'] = 'HUMAN_REVIEW'
df['quarantine_reason'] = f"Low confidence: {fix['confidence_score']}"
return df
transform_fn = eval(fix['transformation']) # safe — evaluated only after strict validation gate (lambda-only, no imports/exec/os)
df[column] = df[column].map(transform_fn)
df['validation_status'] = 'AI_FIXED'
df['ai_reasoning'] = fix['reasoning']
df['confidence_score'] = fix['confidence_score']
return dfdef reconciliation_check(source: int, success: int, quarantine: int):
"""
Mathematical zero-data-loss guarantee.
Any mismatch > 0 is an immediate Sev-1.
"""
if source != success + quarantine:
missing = source - (success + quarantine)
trigger_alert( # PagerDuty / Slack / webhook — configure per environment
severity="SEV1",
message=f"DATA LOSS DETECTED: {missing} rows unaccounted for"
)
raise DataLossException(f"Reconciliation failed: {missing} missing rows")
return Truedef reconciliation_check(source: int, success: int, quarantine: int):
"""
Mathematical zero-data-loss guarantee.
Any mismatch > 0 is an immediate Sev-1.
"""
if source != success + quarantine:
missing = source - (success + quarantine)
trigger_alert( # PagerDuty / Slack / webhook — configure per environment
severity="SEV1",
message=f"DATA LOSS DETECTED: {missing} rows unaccounted for"
)
raise DataLossException(f"Reconciliation failed: {missing} missing rows")
return TrueSource == Success + QuarantineSource == Success + Quarantine