Loading...
Loading...
Guidance for recovering data from corrupted or truncated SQLite database files through binary analysis and manual parsing. This skill applies when working with damaged SQLite databases that cannot be opened with standard tools, particularly when corruption is due to binary truncation, incomplete writes, or filesystem errors.
npx skill4agent add letta-ai/skills sqlite-db-truncatesqlite3.recover# Check file size and basic properties
ls -lh database.db
file database.db
# Create hex dump for analysis
hexdump -C database.db | head -100# Check if sqlite3 can read the file
sqlite3 database.db ".schema" 2>&1
sqlite3 database.db "SELECT * FROM sqlite_master" 2>&1
# Try built-in recovery
sqlite3 database.db ".recover" > recovered.sql 2>&1
# Try integrity check
sqlite3 database.db "PRAGMA integrity_check;"Offset Size Description
------ ---- -----------
0 1 Page type (0x0d for table leaf)
1 2 First freeblock offset (big-endian)
3 2 Number of cells on page (big-endian)
5 2 Cell content area start offset (big-endian)
7 1 Fragmented free bytes count
8+ varies Cell pointer array (2 bytes per cell, big-endian)
... [Gap/free space]
End Cell data (grows backward from page end)[Payload size: varint]
[Row ID: varint]
[Header size: varint]
[Serial type 1: varint]
[Serial type 2: varint]
...
[Column 1 value]
[Column 2 value]
...| Type | Size | Meaning |
|---|---|---|
| 0 | 0 | NULL |
| 1 | 1 | 8-bit signed integer |
| 2 | 2 | 16-bit big-endian signed integer |
| 3 | 3 | 24-bit big-endian signed integer |
| 4 | 4 | 32-bit big-endian signed integer |
| 7 | 8 | IEEE 754 64-bit float (big-endian) |
| 8 | 0 | Integer constant 0 |
| 9 | 0 | Integer constant 1 |
| N >= 12, even | (N-12)/2 | BLOB |
| N >= 13, odd | (N-13)/2 | Text string (UTF-8) |
import struct
import json
DEBUG = True
def read_varint(data, offset):
"""Read SQLite variable-length integer."""
value = 0
for i in range(9):
if offset + i >= len(data):
return None, offset
byte = data[offset + i]
if i == 8:
value = (value << 8) | byte
return value, offset + i + 1
value = (value << 7) | (byte & 0x7f)
if (byte & 0x80) == 0:
return value, offset + i + 1
return value, offset
def decode_value(data, offset, serial_type):
"""Decode value based on serial type."""
if serial_type == 0:
return None, offset
elif serial_type == 1:
return struct.unpack('>b', data[offset:offset+1])[0], offset + 1
elif serial_type == 2:
return struct.unpack('>h', data[offset:offset+2])[0], offset + 2
elif serial_type == 4:
return struct.unpack('>i', data[offset:offset+4])[0], offset + 4
elif serial_type == 7:
return struct.unpack('>d', data[offset:offset+8])[0], offset + 8
elif serial_type == 8:
return 0, offset
elif serial_type == 9:
return 1, offset
elif serial_type >= 12:
if serial_type % 2 == 0:
length = (serial_type - 12) // 2
return data[offset:offset+length], offset + length
else:
length = (serial_type - 13) // 2
return data[offset:offset+length].decode('utf-8', errors='replace'), offset + length
return None, offsetdef parse_cell(data, cell_offset, debug=DEBUG):
"""Parse a single cell with detailed debug output."""
if debug:
print(f"\nParsing cell at offset {cell_offset} (0x{cell_offset:04x})")
# Read payload size
payload_size, offset = read_varint(data, cell_offset)
if debug:
print(f" Payload size: {payload_size}")
# Read row ID
row_id, offset = read_varint(data, offset)
if debug:
print(f" Row ID: {row_id}")
# Read header size
header_size, header_start = read_varint(data, offset)
if debug:
print(f" Header size: {header_size}")
# Parse serial types
serial_types = []
current = header_start
header_end = offset + header_size
while current < header_end:
st, current = read_varint(data, current)
serial_types.append(st)
if debug:
print(f" Serial types: {serial_types}")
# Parse values
values = []
for st in serial_types:
val, current = decode_value(data, current, st)
values.append(val)
if debug:
print(f" Values: {values}")
return {'row_id': row_id, 'values': values}length = (serial_type - 13) // 2if48if 4812and12 andpython3 -m py_compile recovery_script.pystruct.unpack('>...', data)>def safe_read(data, offset, length):
if offset + length > len(data):
return None
return data[offset:offset+length]# Validate output structure
for record in recovered_data:
assert 'word' in record and 'value' in record
assert isinstance(record['word'], str)
assert isinstance(record['value'], (int, float))def generate_output(recovered_rows, output_path):
"""Format and save recovered data."""
results = []
for row in recovered_rows:
if len(row['values']) >= 2:
results.append({
'word': row['values'][0],
'value': row['values'][1]
})
with open(output_path, 'w') as f:
json.dump(results, f, indent=2)
print(f"Recovered {len(results)} records to {output_path}")
return results