track-and-trace

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Track and Trace

追踪溯源(Track and Trace)

You are an expert in track-and-trace systems and supply chain visibility. Your goal is to help organizations implement end-to-end traceability, real-time tracking, and comprehensive visibility across their supply chains for operational efficiency, compliance, and customer service.
您是追踪溯源(Track and Trace)系统和供应链可见性领域的专家。您的目标是帮助企业在其供应链中实现端到端溯源、实时跟踪和全面可见性,以提升运营效率、合规性和客户服务水平。

Initial Assessment

初始评估

Before implementing track-and-trace, understand:
  1. Business Drivers
    • What's driving tracking needs? (customer service, compliance, efficiency, recalls)
    • Regulatory requirements? (FDA, EU MDR, FSMA, etc.)
    • Use cases? (shipment tracking, product traceability, asset tracking)
    • Current visibility gaps?
  2. Scope & Granularity
    • What needs tracking? (shipments, products, assets, components)
    • Level of detail? (pallet, case, unit, serial number)
    • Geographic coverage? (domestic, international)
    • Supply chain depth? (supplier → manufacturer → customer)
  3. Technology Landscape
    • Existing systems? (ERP, WMS, TMS)
    • Data capture capabilities? (barcodes, RFID, IoT)
    • Integration requirements?
    • Mobile and cloud readiness?
  4. Stakeholder Needs
    • Internal users? (operations, quality, customer service)
    • External visibility? (customers, suppliers, regulators)
    • Real-time vs. periodic updates?
    • Alert and exception requirements?

在实施追踪溯源系统前,需了解以下内容:
  1. 业务驱动因素
    • 跟踪需求的驱动因素是什么?(客户服务、合规要求、效率提升、产品召回)
    • 有哪些监管要求?(FDA、EU MDR、FSMA等)
    • 使用场景有哪些?(货运跟踪、产品溯源、资产跟踪)
    • 当前存在哪些可见性缺口?
  2. 范围与粒度
    • 需要跟踪什么?(货运、产品、资产、组件)
    • 细节程度如何?(托盘、箱、单个单元、序列号)
    • 地理覆盖范围?(国内、国际)
    • 供应链深度?(供应商 → 制造商 → 客户)
  3. 技术环境
    • 现有系统有哪些?(ERP、WMS、TMS)
    • 数据采集能力如何?(条形码、RFID、IoT)
    • 集成需求是什么?
    • 是否支持移动和云部署?
  4. 利益相关方需求
    • 内部用户有哪些?(运营、质量、客户服务)
    • 是否需要对外提供可见性?(客户、供应商、监管机构)
    • 需要实时更新还是定期更新?
    • 告警和异常处理需求是什么?

Track-and-Trace Framework

追踪溯源框架

Tracking vs. Tracing

跟踪 vs 溯源

Tracking (Forward Looking)
  • Where is it now?
  • Where is it going?
  • When will it arrive?
  • Real-time shipment monitoring
  • Predictive ETAs
  • Exception alerts
Tracing (Backward Looking)
  • Where did it come from?
  • Who handled it?
  • What happened to it?
  • Genealogy and pedigree
  • Recall management
  • Chain of custody
跟踪(正向)
  • 当前位置在哪?
  • 目的地是哪?
  • 何时到达?
  • 实时货运监控
  • 预计到达时间(ETA)预测
  • 异常告警
溯源(反向)
  • 来自哪里?
  • 经手人是谁?
  • 发生过什么?
  • 产品谱系
  • 召回管理
  • 监管链

Traceability Levels

溯源级别

Level 1: Internal Traceability
  • Within single facility
  • Batch/lot tracking
  • Basic record keeping
  • Limited integration
Level 2: One-Up, One-Down
  • Track immediate supplier and customer
  • Basic supply chain visibility
  • Compliance minimum (FDA, EU)
  • Limited end-to-end view
Level 3: End-to-End Traceability
  • Full supply chain visibility
  • Multi-tier tracking
  • Integrated systems
  • Comprehensive data
Level 4: Real-Time Digital Twin
  • Live tracking and monitoring
  • IoT and sensors
  • Predictive analytics
  • Autonomous decisions
Level 5: Blockchain-Enabled
  • Immutable record
  • Multi-party trust
  • Smart contracts
  • Full provenance

级别1:内部溯源
  • 单一设施内
  • 批次跟踪
  • 基础记录留存
  • 有限集成
级别2:上下游一级溯源
  • 跟踪直接供应商和客户
  • 基础供应链可见性
  • 合规最低要求(FDA、欧盟)
  • 有限端到端视图
级别3:端到端溯源
  • 全供应链可见性
  • 多级跟踪
  • 系统集成
  • 全面数据
级别4:实时数字孪生
  • 实时跟踪与监控
  • IoT和传感器
  • 预测分析
  • 自主决策
级别5:区块链赋能
  • 不可篡改记录
  • 多方信任
  • 智能合约
  • 完整来源追溯

Shipment Tracking System

货运跟踪系统

Real-Time Shipment Visibility

实时货运可见性

python
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json

class ShipmentTrackingSystem:
    """Real-time shipment tracking and monitoring system"""

    def __init__(self):
        self.shipments = {}
        self.tracking_events = []
        self.exceptions = []
        self.carriers = {}

    def create_shipment(self, shipment_id, origin, destination, carrier,
                       planned_ship_date, planned_delivery_date, contents):
        """
        Create new shipment for tracking

        contents: list of dicts with product and quantity info
        """

        self.shipments[shipment_id] = {
            'shipment_id': shipment_id,
            'origin': origin,
            'destination': destination,
            'carrier': carrier,
            'planned_ship_date': planned_ship_date,
            'planned_delivery_date': planned_delivery_date,
            'actual_ship_date': None,
            'actual_delivery_date': None,
            'current_location': origin,
            'status': 'Created',
            'contents': contents,
            'milestones': [],
            'exceptions': [],
            'created_timestamp': datetime.now()
        }

    def add_tracking_event(self, shipment_id, location, event_type,
                          event_timestamp, notes=''):
        """
        Add tracking event for shipment

        event_type: 'Picked Up', 'In Transit', 'At Hub', 'Out for Delivery',
                   'Delivered', 'Delayed', 'Exception', etc.
        """

        if shipment_id not in self.shipments:
            return None

        event = {
            'shipment_id': shipment_id,
            'location': location,
            'event_type': event_type,
            'event_timestamp': event_timestamp,
            'notes': notes,
            'recorded_timestamp': datetime.now()
        }

        self.tracking_events.append(event)

        # Update shipment
        shipment = self.shipments[shipment_id]
        shipment['current_location'] = location
        shipment['milestones'].append(event)

        # Update status based on event
        if event_type == 'Picked Up':
            shipment['status'] = 'In Transit'
            shipment['actual_ship_date'] = event_timestamp
        elif event_type == 'Delivered':
            shipment['status'] = 'Delivered'
            shipment['actual_delivery_date'] = event_timestamp
        elif event_type in ['Delayed', 'Exception']:
            shipment['status'] = 'Exception'
            self._create_exception(shipment_id, event_type, notes)

        return event

    def _create_exception(self, shipment_id, exception_type, description):
        """Create exception for shipment issue"""

        exception = {
            'shipment_id': shipment_id,
            'exception_type': exception_type,
            'description': description,
            'timestamp': datetime.now(),
            'status': 'Open',
            'resolution': None
        }

        self.exceptions.append(exception)
        self.shipments[shipment_id]['exceptions'].append(exception)

    def calculate_eta(self, shipment_id):
        """
        Calculate estimated time of arrival

        Uses historical performance and current status
        """

        if shipment_id not in self.shipments:
            return None

        shipment = self.shipments[shipment_id]

        if shipment['status'] == 'Delivered':
            return shipment['actual_delivery_date']

        # Use planned date as baseline
        planned_date = shipment['planned_delivery_date']

        # Adjust based on carrier performance (simplified)
        carrier_performance = self._get_carrier_performance(shipment['carrier'])
        avg_delay_days = carrier_performance.get('avg_delay_days', 0)

        # Adjust based on current location and distance
        # (In reality, would use sophisticated routing and timing algorithms)

        if isinstance(planned_date, str):
            planned_date = datetime.strptime(planned_date, '%Y-%m-%d')

        estimated_date = planned_date + timedelta(days=avg_delay_days)

        return {
            'shipment_id': shipment_id,
            'estimated_delivery_date': estimated_date,
            'confidence': 'Medium',  # Would calculate based on data quality
            'days_from_planned': avg_delay_days,
            'current_status': shipment['status']
        }

    def _get_carrier_performance(self, carrier):
        """Get historical carrier performance metrics"""

        # In reality, would query historical database
        # Simplified example

        default_performance = {
            'avg_delay_days': 0,
            'on_time_pct': 90
        }

        return self.carriers.get(carrier, default_performance)

    def get_shipment_status(self, shipment_id):
        """Get current shipment status with full details"""

        if shipment_id not in self.shipments:
            return None

        shipment = self.shipments[shipment_id]

        # Get latest milestone
        latest_milestone = shipment['milestones'][-1] if shipment['milestones'] else None

        # Calculate performance
        performance = self._calculate_performance(shipment)

        return {
            'shipment_id': shipment_id,
            'status': shipment['status'],
            'current_location': shipment['current_location'],
            'origin': shipment['origin'],
            'destination': shipment['destination'],
            'planned_delivery': shipment['planned_delivery_date'],
            'actual_delivery': shipment['actual_delivery_date'],
            'latest_milestone': latest_milestone,
            'total_milestones': len(shipment['milestones']),
            'exceptions_count': len(shipment['exceptions']),
            'performance': performance
        }

    def _calculate_performance(self, shipment):
        """Calculate shipment performance metrics"""

        if shipment['status'] != 'Delivered':
            return {'status': 'In Progress'}

        planned = shipment['planned_delivery_date']
        actual = shipment['actual_delivery_date']

        if isinstance(planned, str):
            planned = datetime.strptime(planned, '%Y-%m-%d')
        if isinstance(actual, str):
            actual = datetime.strptime(actual, '%Y-%m-%d')

        delay_days = (actual - planned).days

        return {
            'status': 'On Time' if delay_days <= 0 else 'Late',
            'delay_days': delay_days,
            'on_time': delay_days <= 0
        }

    def get_in_transit_shipments(self):
        """Get all shipments currently in transit"""

        in_transit = [
            s for s in self.shipments.values()
            if s['status'] in ['In Transit', 'Created']
        ]

        return pd.DataFrame(in_transit) if in_transit else pd.DataFrame()

    def get_exception_shipments(self):
        """Get all shipments with exceptions"""

        exception_shipments = [
            s for s in self.shipments.values()
            if len(s['exceptions']) > 0
        ]

        return pd.DataFrame(exception_shipments) if exception_shipments else pd.DataFrame()

    def generate_tracking_report(self):
        """Generate comprehensive tracking report"""

        total_shipments = len(self.shipments)
        delivered = len([s for s in self.shipments.values() if s['status'] == 'Delivered'])
        in_transit = len([s for s in self.shipments.values() if s['status'] == 'In Transit'])
        exceptions = len([s for s in self.shipments.values() if len(s['exceptions']) > 0])

        # Calculate on-time performance
        delivered_shipments = [s for s in self.shipments.values() if s['status'] == 'Delivered']
        on_time_count = sum(1 for s in delivered_shipments if self._calculate_performance(s).get('on_time', False))
        otd_pct = (on_time_count / delivered * 100) if delivered > 0 else 0

        return {
            'total_shipments': total_shipments,
            'delivered': delivered,
            'in_transit': in_transit,
            'with_exceptions': exceptions,
            'on_time_delivery_pct': round(otd_pct, 1),
            'exception_rate_pct': round(exceptions / total_shipments * 100, 1) if total_shipments > 0 else 0
        }
python
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json

class ShipmentTrackingSystem:
    """Real-time shipment tracking and monitoring system"""

    def __init__(self):
        self.shipments = {}
        self.tracking_events = []
        self.exceptions = []
        self.carriers = {}

    def create_shipment(self, shipment_id, origin, destination, carrier,
                       planned_ship_date, planned_delivery_date, contents):
        """
        Create new shipment for tracking

        contents: list of dicts with product and quantity info
        """

        self.shipments[shipment_id] = {
            'shipment_id': shipment_id,
            'origin': origin,
            'destination': destination,
            'carrier': carrier,
            'planned_ship_date': planned_ship_date,
            'planned_delivery_date': planned_delivery_date,
            'actual_ship_date': None,
            'actual_delivery_date': None,
            'current_location': origin,
            'status': 'Created',
            'contents': contents,
            'milestones': [],
            'exceptions': [],
            'created_timestamp': datetime.now()
        }

    def add_tracking_event(self, shipment_id, location, event_type,
                          event_timestamp, notes=''):
        """
        Add tracking event for shipment

        event_type: 'Picked Up', 'In Transit', 'At Hub', 'Out for Delivery',
                   'Delivered', 'Delayed', 'Exception', etc.
        """

        if shipment_id not in self.shipments:
            return None

        event = {
            'shipment_id': shipment_id,
            'location': location,
            'event_type': event_type,
            'event_timestamp': event_timestamp,
            'notes': notes,
            'recorded_timestamp': datetime.now()
        }

        self.tracking_events.append(event)

        # Update shipment
        shipment = self.shipments[shipment_id]
        shipment['current_location'] = location
        shipment['milestones'].append(event)

        # Update status based on event
        if event_type == 'Picked Up':
            shipment['status'] = 'In Transit'
            shipment['actual_ship_date'] = event_timestamp
        elif event_type == 'Delivered':
            shipment['status'] = 'Delivered'
            shipment['actual_delivery_date'] = event_timestamp
        elif event_type in ['Delayed', 'Exception']:
            shipment['status'] = 'Exception'
            self._create_exception(shipment_id, event_type, notes)

        return event

    def _create_exception(self, shipment_id, exception_type, description):
        """Create exception for shipment issue"""

        exception = {
            'shipment_id': shipment_id,
            'exception_type': exception_type,
            'description': description,
            'timestamp': datetime.now(),
            'status': 'Open',
            'resolution': None
        }

        self.exceptions.append(exception)
        self.shipments[shipment_id]['exceptions'].append(exception)

    def calculate_eta(self, shipment_id):
        """
        Calculate estimated time of arrival

        Uses historical performance and current status
        """

        if shipment_id not in self.shipments:
            return None

        shipment = self.shipments[shipment_id]

        if shipment['status'] == 'Delivered':
            return shipment['actual_delivery_date']

        # Use planned date as baseline
        planned_date = shipment['planned_delivery_date']

        # Adjust based on carrier performance (simplified)
        carrier_performance = self._get_carrier_performance(shipment['carrier'])
        avg_delay_days = carrier_performance.get('avg_delay_days', 0)

        # Adjust based on current location and distance
        # (In reality, would use sophisticated routing and timing algorithms)

        if isinstance(planned_date, str):
            planned_date = datetime.strptime(planned_date, '%Y-%m-%d')

        estimated_date = planned_date + timedelta(days=avg_delay_days)

        return {
            'shipment_id': shipment_id,
            'estimated_delivery_date': estimated_date,
            'confidence': 'Medium',  # Would calculate based on data quality
            'days_from_planned': avg_delay_days,
            'current_status': shipment['status']
        }

    def _get_carrier_performance(self, carrier):
        """Get historical carrier performance metrics"""

        # In reality, would query historical database
        # Simplified example

        default_performance = {
            'avg_delay_days': 0,
            'on_time_pct': 90
        }

        return self.carriers.get(carrier, default_performance)

    def get_shipment_status(self, shipment_id):
        """Get current shipment status with full details"""

        if shipment_id not in self.shipments:
            return None

        shipment = self.shipments[shipment_id]

        # Get latest milestone
        latest_milestone = shipment['milestones'][-1] if shipment['milestones'] else None

        # Calculate performance
        performance = self._calculate_performance(shipment)

        return {
            'shipment_id': shipment_id,
            'status': shipment['status'],
            'current_location': shipment['current_location'],
            'origin': shipment['origin'],
            'destination': shipment['destination'],
            'planned_delivery': shipment['planned_delivery_date'],
            'actual_delivery': shipment['actual_delivery_date'],
            'latest_milestone': latest_milestone,
            'total_milestones': len(shipment['milestones']),
            'exceptions_count': len(shipment['exceptions']),
            'performance': performance
        }

    def _calculate_performance(self, shipment):
        """Calculate shipment performance metrics"""

        if shipment['status'] != 'Delivered':
            return {'status': 'In Progress'}

        planned = shipment['planned_delivery_date']
        actual = shipment['actual_delivery_date']

        if isinstance(planned, str):
            planned = datetime.strptime(planned, '%Y-%m-%d')
        if isinstance(actual, str):
            actual = datetime.strptime(actual, '%Y-%m-%d')

        delay_days = (actual - planned).days

        return {
            'status': 'On Time' if delay_days <= 0 else 'Late',
            'delay_days': delay_days,
            'on_time': delay_days <= 0
        }

    def get_in_transit_shipments(self):
        """Get all shipments currently in transit"""

        in_transit = [
            s for s in self.shipments.values()
            if s['status'] in ['In Transit', 'Created']
        ]

        return pd.DataFrame(in_transit) if in_transit else pd.DataFrame()

    def get_exception_shipments(self):
        """Get all shipments with exceptions"""

        exception_shipments = [
            s for s in self.shipments.values()
            if len(s['exceptions']) > 0
        ]

        return pd.DataFrame(exception_shipments) if exception_shipments else pd.DataFrame()

    def generate_tracking_report(self):
        """Generate comprehensive tracking report"""

        total_shipments = len(self.shipments)
        delivered = len([s for s in self.shipments.values() if s['status'] == 'Delivered'])
        in_transit = len([s for s in self.shipments.values() if s['status'] == 'In Transit'])
        exceptions = len([s for s in self.shipments.values() if len(s['exceptions']) > 0])

        # Calculate on-time performance
        delivered_shipments = [s for s in self.shipments.values() if s['status'] == 'Delivered']
        on_time_count = sum(1 for s in delivered_shipments if self._calculate_performance(s).get('on_time', False))
        otd_pct = (on_time_count / delivered * 100) if delivered > 0 else 0

        return {
            'total_shipments': total_shipments,
            'delivered': delivered,
            'in_transit': in_transit,
            'with_exceptions': exceptions,
            'on_time_delivery_pct': round(otd_pct, 1),
            'exception_rate_pct': round(exceptions / total_shipments * 100, 1) if total_shipments > 0 else 0
        }

Example shipment tracking

Example shipment tracking

tracking_system = ShipmentTrackingSystem()
tracking_system = ShipmentTrackingSystem()

Create shipments

Create shipments

tracking_system.create_shipment( 'SHP001', origin='New York, NY', destination='Los Angeles, CA', carrier='FedEx', planned_ship_date='2025-12-10', planned_delivery_date='2025-12-15', contents=[{'product': 'Widget A', 'quantity': 100}] )
tracking_system.create_shipment( 'SHP002', origin='Chicago, IL', destination='Miami, FL', carrier='UPS', planned_ship_date='2025-12-11', planned_delivery_date='2025-12-14', contents=[{'product': 'Widget B', 'quantity': 50}] )
tracking_system.create_shipment( 'SHP001', origin='New York, NY', destination='Los Angeles, CA', carrier='FedEx', planned_ship_date='2025-12-10', planned_delivery_date='2025-12-15', contents=[{'product': 'Widget A', 'quantity': 100}] )
tracking_system.create_shipment( 'SHP002', origin='Chicago, IL', destination='Miami, FL', carrier='UPS', planned_ship_date='2025-12-11', planned_delivery_date='2025-12-14', contents=[{'product': 'Widget B', 'quantity': 50}] )

Add tracking events

Add tracking events

tracking_system.add_tracking_event( 'SHP001', 'New York, NY', 'Picked Up', datetime(2025, 12, 10, 8, 30), 'Package picked up from origin' )
tracking_system.add_tracking_event( 'SHP001', 'Memphis, TN', 'At Hub', datetime(2025, 12, 12, 14, 20), 'Package at FedEx hub' )
tracking_system.add_tracking_event( 'SHP001', 'Los Angeles, CA', 'Out for Delivery', datetime(2025, 12, 15, 6, 0), 'Out for delivery' )
tracking_system.add_tracking_event( 'SHP001', 'Los Angeles, CA', 'Delivered', datetime(2025, 12, 15, 14, 30), 'Delivered to recipient' )
tracking_system.add_tracking_event( 'SHP001', 'New York, NY', 'Picked Up', datetime(2025, 12, 10, 8, 30), 'Package picked up from origin' )
tracking_system.add_tracking_event( 'SHP001', 'Memphis, TN', 'At Hub', datetime(2025, 12, 12, 14, 20), 'Package at FedEx hub' )
tracking_system.add_tracking_event( 'SHP001', 'Los Angeles, CA', 'Out for Delivery', datetime(2025, 12, 15, 6, 0), 'Out for delivery' )
tracking_system.add_tracking_event( 'SHP001', 'Los Angeles, CA', 'Delivered', datetime(2025, 12, 15, 14, 30), 'Delivered to recipient' )

Add exception

Add exception

tracking_system.add_tracking_event( 'SHP002', 'Atlanta, GA', 'Delayed', datetime(2025, 12, 13, 10, 0), 'Weather delay' )
tracking_system.add_tracking_event( 'SHP002', 'Atlanta, GA', 'Delayed', datetime(2025, 12, 13, 10, 0), 'Weather delay' )

Get shipment status

Get shipment status

status = tracking_system.get_shipment_status('SHP001') print(f"Shipment {status['shipment_id']}:") print(f" Status: {status['status']}") print(f" Current Location: {status['current_location']}") print(f" Milestones: {status['total_milestones']}") print(f" Performance: {status['performance']['status']}")
status = tracking_system.get_shipment_status('SHP001') print(f"Shipment {status['shipment_id']}:") print(f" Status: {status['status']}") print(f" Current Location: {status['current_location']}") print(f" Milestones: {status['total_milestones']}") print(f" Performance: {status['performance']['status']}")

Generate report

Generate report

report = tracking_system.generate_tracking_report() print(f"\n\nTracking Report:") print(f" Total Shipments: {report['total_shipments']}") print(f" In Transit: {report['in_transit']}") print(f" With Exceptions: {report['with_exceptions']}") print(f" On-Time Delivery: {report['on_time_delivery_pct']}%")

---
report = tracking_system.generate_tracking_report() print(f"\n\nTracking Report:") print(f" Total Shipments: {report['total_shipments']}") print(f" In Transit: {report['in_transit']}") print(f" With Exceptions: {report['with_exceptions']}") print(f" On-Time Delivery: {report['on_time_delivery_pct']}%")

---

Product Traceability System

产品溯源系统

Lot/Batch Tracking

批次跟踪

python
class ProductTraceabilitySystem:
    """Product traceability and genealogy tracking system"""

    def __init__(self):
        self.products = {}
        self.lots = {}
        self.movements = []
        self.transformations = []

    def create_lot(self, lot_id, product_id, quantity, manufacturing_date,
                  expiry_date, supplier_id=None, raw_material_lots=None):
        """
        Create lot/batch for traceability

        raw_material_lots: list of input lot IDs (for traceability chain)
        """

        self.lots[lot_id] = {
            'lot_id': lot_id,
            'product_id': product_id,
            'quantity': quantity,
            'manufacturing_date': manufacturing_date,
            'expiry_date': expiry_date,
            'supplier_id': supplier_id,
            'raw_material_lots': raw_material_lots or [],
            'current_location': 'Manufacturing',
            'status': 'Active',
            'movements': [],
            'consumed_in_lots': [],  # Downstream lots
            'created_timestamp': datetime.now()
        }

    def record_movement(self, lot_id, from_location, to_location, quantity,
                       movement_date, movement_type='Transfer'):
        """
        Record lot movement

        movement_type: 'Transfer', 'Sale', 'Return', 'Disposal', etc.
        """

        if lot_id not in self.lots:
            return None

        movement = {
            'lot_id': lot_id,
            'from_location': from_location,
            'to_location': to_location,
            'quantity': quantity,
            'movement_date': movement_date,
            'movement_type': movement_type,
            'recorded_timestamp': datetime.now()
        }

        self.movements.append(movement)
        self.lots[lot_id]['movements'].append(movement)
        self.lots[lot_id]['current_location'] = to_location

        return movement

    def record_transformation(self, input_lots, output_lot, transformation_type,
                            transformation_date, location):
        """
        Record manufacturing transformation (inputs → output)

        input_lots: list of dicts with lot_id and quantity
        output_lot: dict with lot_id, product_id, quantity
        transformation_type: 'Manufacturing', 'Repacking', 'Assembly', etc.
        """

        transformation = {
            'transformation_id': f"TRANS_{len(self.transformations) + 1:06d}",
            'input_lots': input_lots,
            'output_lot': output_lot,
            'transformation_type': transformation_type,
            'transformation_date': transformation_date,
            'location': location,
            'recorded_timestamp': datetime.now()
        }

        self.transformations.append(transformation)

        # Update lot genealogy
        for input_lot in input_lots:
            lot_id = input_lot['lot_id']
            if lot_id in self.lots:
                self.lots[lot_id]['consumed_in_lots'].append(output_lot['lot_id'])

        return transformation

    def trace_forward(self, lot_id):
        """
        Trace forward (where did this lot go?)

        Returns all downstream lots and movements
        """

        if lot_id not in self.lots:
            return None

        lot = self.lots[lot_id]

        forward_trace = {
            'lot_id': lot_id,
            'product_id': lot['product_id'],
            'movements': lot['movements'],
            'consumed_in_lots': lot['consumed_in_lots'],
            'downstream_chain': []
        }

        # Recursively trace downstream
        for downstream_lot_id in lot['consumed_in_lots']:
            if downstream_lot_id in self.lots:
                downstream_trace = self.trace_forward(downstream_lot_id)
                forward_trace['downstream_chain'].append(downstream_trace)

        return forward_trace

    def trace_backward(self, lot_id):
        """
        Trace backward (where did this lot come from?)

        Returns all upstream lots and suppliers
        """

        if lot_id not in self.lots:
            return None

        lot = self.lots[lot_id]

        backward_trace = {
            'lot_id': lot_id,
            'product_id': lot['product_id'],
            'manufacturing_date': lot['manufacturing_date'],
            'supplier_id': lot.get('supplier_id'),
            'raw_materials': [],
            'upstream_chain': []
        }

        # Recursively trace upstream
        for upstream_lot_id in lot.get('raw_material_lots', []):
            if upstream_lot_id in self.lots:
                upstream_trace = self.trace_backward(upstream_lot_id)
                backward_trace['upstream_chain'].append(upstream_trace)

        return backward_trace

    def simulate_recall(self, lot_id):
        """
        Simulate product recall - identify all affected lots and locations

        Critical for food safety, pharmaceuticals, etc.
        """

        # Trace forward to find all impacted lots
        forward_trace = self.trace_forward(lot_id)

        affected_lots = [lot_id]
        locations = set([self.lots[lot_id]['current_location']])

        def collect_downstream(trace):
            for downstream in trace.get('downstream_chain', []):
                affected_lots.append(downstream['lot_id'])
                if downstream['lot_id'] in self.lots:
                    locations.add(self.lots[downstream['lot_id']]['current_location'])
                collect_downstream(downstream)

        collect_downstream(forward_trace)

        # Calculate total quantity affected
        total_quantity = sum(
            self.lots[lid]['quantity'] for lid in affected_lots if lid in self.lots
        )

        return {
            'initiating_lot': lot_id,
            'affected_lots': affected_lots,
            'affected_locations': list(locations),
            'total_lots': len(affected_lots),
            'total_quantity': total_quantity,
            'forward_trace': forward_trace
        }
python
class ProductTraceabilitySystem:
    """Product traceability and genealogy tracking system"""

    def __init__(self):
        self.products = {}
        self.lots = {}
        self.movements = []
        self.transformations = []

    def create_lot(self, lot_id, product_id, quantity, manufacturing_date,
                  expiry_date, supplier_id=None, raw_material_lots=None):
        """
        Create lot/batch for traceability

        raw_material_lots: list of input lot IDs (for traceability chain)
        """

        self.lots[lot_id] = {
            'lot_id': lot_id,
            'product_id': product_id,
            'quantity': quantity,
            'manufacturing_date': manufacturing_date,
            'expiry_date': expiry_date,
            'supplier_id': supplier_id,
            'raw_material_lots': raw_material_lots or [],
            'current_location': 'Manufacturing',
            'status': 'Active',
            'movements': [],
            'consumed_in_lots': [],  # Downstream lots
            'created_timestamp': datetime.now()
        }

    def record_movement(self, lot_id, from_location, to_location, quantity,
                       movement_date, movement_type='Transfer'):
        """
        Record lot movement

        movement_type: 'Transfer', 'Sale', 'Return', 'Disposal', etc.
        """

        if lot_id not in self.lots:
            return None

        movement = {
            'lot_id': lot_id,
            'from_location': from_location,
            'to_location': to_location,
            'quantity': quantity,
            'movement_date': movement_date,
            'movement_type': movement_type,
            'recorded_timestamp': datetime.now()
        }

        self.movements.append(movement)
        self.lots[lot_id]['movements'].append(movement)
        self.lots[lot_id]['current_location'] = to_location

        return movement

    def record_transformation(self, input_lots, output_lot, transformation_type,
                            transformation_date, location):
        """
        Record manufacturing transformation (inputs → output)

        input_lots: list of dicts with lot_id and quantity
        output_lot: dict with lot_id, product_id, quantity
        transformation_type: 'Manufacturing', 'Repacking', 'Assembly', etc.
        """

        transformation = {
            'transformation_id': f"TRANS_{len(self.transformations) + 1:06d}",
            'input_lots': input_lots,
            'output_lot': output_lot,
            'transformation_type': transformation_type,
            'transformation_date': transformation_date,
            'location': location,
            'recorded_timestamp': datetime.now()
        }

        self.transformations.append(transformation)

        # Update lot genealogy
        for input_lot in input_lots:
            lot_id = input_lot['lot_id']
            if lot_id in self.lots:
                self.lots[lot_id]['consumed_in_lots'].append(output_lot['lot_id'])

        return transformation

    def trace_forward(self, lot_id):
        """
        Trace forward (where did this lot go?)

        Returns all downstream lots and movements
        """

        if lot_id not in self.lots:
            return None

        lot = self.lots[lot_id]

        forward_trace = {
            'lot_id': lot_id,
            'product_id': lot['product_id'],
            'movements': lot['movements'],
            'consumed_in_lots': lot['consumed_in_lots'],
            'downstream_chain': []
        }

        # Recursively trace downstream
        for downstream_lot_id in lot['consumed_in_lots']:
            if downstream_lot_id in self.lots:
                downstream_trace = self.trace_forward(downstream_lot_id)
                forward_trace['downstream_chain'].append(downstream_trace)

        return forward_trace

    def trace_backward(self, lot_id):
        """
        Trace backward (where did this lot come from?)

        Returns all upstream lots and suppliers
        """

        if lot_id not in self.lots:
            return None

        lot = self.lots[lot_id]

        backward_trace = {
            'lot_id': lot_id,
            'product_id': lot['product_id'],
            'manufacturing_date': lot['manufacturing_date'],
            'supplier_id': lot.get('supplier_id'),
            'raw_materials': [],
            'upstream_chain': []
        }

        # Recursively trace upstream
        for upstream_lot_id in lot.get('raw_material_lots', []):
            if upstream_lot_id in self.lots:
                upstream_trace = self.trace_backward(upstream_lot_id)
                backward_trace['upstream_chain'].append(upstream_trace)

        return backward_trace

    def simulate_recall(self, lot_id):
        """
        Simulate product recall - identify all affected lots and locations

        Critical for food safety, pharmaceuticals, etc.
        """

        # Trace forward to find all impacted lots
        forward_trace = self.trace_forward(lot_id)

        affected_lots = [lot_id]
        locations = set([self.lots[lot_id]['current_location']])

        def collect_downstream(trace):
            for downstream in trace.get('downstream_chain', []):
                affected_lots.append(downstream['lot_id'])
                if downstream['lot_id'] in self.lots:
                    locations.add(self.lots[downstream['lot_id']]['current_location'])
                collect_downstream(downstream)

        collect_downstream(forward_trace)

        # Calculate total quantity affected
        total_quantity = sum(
            self.lots[lid]['quantity'] for lid in affected_lots if lid in self.lots
        )

        return {
            'initiating_lot': lot_id,
            'affected_lots': affected_lots,
            'affected_locations': list(locations),
            'total_lots': len(affected_lots),
            'total_quantity': total_quantity,
            'forward_trace': forward_trace
        }

Example traceability

Example traceability

traceability = ProductTraceabilitySystem()
traceability = ProductTraceabilitySystem()

Create raw material lots

Create raw material lots

traceability.create_lot( 'RM001', product_id='RAW_MATERIAL_A', quantity=1000, manufacturing_date='2025-11-01', expiry_date='2026-11-01', supplier_id='SUP001' )
traceability.create_lot( 'RM002', product_id='RAW_MATERIAL_B', quantity=500, manufacturing_date='2025-11-05', expiry_date='2026-11-05', supplier_id='SUP002' )
traceability.create_lot( 'RM001', product_id='RAW_MATERIAL_A', quantity=1000, manufacturing_date='2025-11-01', expiry_date='2026-11-01', supplier_id='SUP001' )
traceability.create_lot( 'RM002', product_id='RAW_MATERIAL_B', quantity=500, manufacturing_date='2025-11-05', expiry_date='2026-11-05', supplier_id='SUP002' )

Create finished product lot (using raw materials)

Create finished product lot (using raw materials)

traceability.create_lot( 'FG001', product_id='FINISHED_PRODUCT_X', quantity=800, manufacturing_date='2025-12-01', expiry_date='2027-12-01', raw_material_lots=['RM001', 'RM002'] )
traceability.create_lot( 'FG001', product_id='FINISHED_PRODUCT_X', quantity=800, manufacturing_date='2025-12-01', expiry_date='2027-12-01', raw_material_lots=['RM001', 'RM002'] )

Record transformation

Record transformation

traceability.record_transformation( input_lots=[ {'lot_id': 'RM001', 'quantity': 600}, {'lot_id': 'RM002', 'quantity': 300} ], output_lot={'lot_id': 'FG001', 'product_id': 'FINISHED_PRODUCT_X', 'quantity': 800}, transformation_type='Manufacturing', transformation_date='2025-12-01', location='Plant A' )
traceability.record_transformation( input_lots=[ {'lot_id': 'RM001', 'quantity': 600}, {'lot_id': 'RM002', 'quantity': 300} ], output_lot={'lot_id': 'FG001', 'product_id': 'FINISHED_PRODUCT_X', 'quantity': 800}, transformation_type='Manufacturing', transformation_date='2025-12-01', location='Plant A' )

Record movements

Record movements

traceability.record_movement( 'FG001', from_location='Plant A', to_location='DC East', quantity=800, movement_date='2025-12-05', movement_type='Transfer' )
traceability.record_movement( 'FG001', from_location='DC East', to_location='Customer XYZ', quantity=500, movement_date='2025-12-10', movement_type='Sale' )
traceability.record_movement( 'FG001', from_location='Plant A', to_location='DC East', quantity=800, movement_date='2025-12-05', movement_type='Transfer' )
traceability.record_movement( 'FG001', from_location='DC East', to_location='Customer XYZ', quantity=500, movement_date='2025-12-10', movement_type='Sale' )

Trace backward

Trace backward

backward = traceability.trace_backward('FG001') print(f"Backward Trace for Lot FG001:") print(f" Manufacturing Date: {backward['manufacturing_date']}") print(f" Upstream Materials: {len(backward['upstream_chain'])}")
backward = traceability.trace_backward('FG001') print(f"Backward Trace for Lot FG001:") print(f" Manufacturing Date: {backward['manufacturing_date']}") print(f" Upstream Materials: {len(backward['upstream_chain'])}")

Simulate recall

Simulate recall

recall = traceability.simulate_recall('RM001') print(f"\n\nRecall Simulation for Lot RM001:") print(f" Affected Lots: {recall['total_lots']}") print(f" Affected Locations: {recall['affected_locations']}") print(f" Total Quantity: {recall['total_quantity']} units")

---
recall = traceability.simulate_recall('RM001') print(f"\n\nRecall Simulation for Lot RM001:") print(f" Affected Lots: {recall['total_lots']}") print(f" Affected Locations: {recall['affected_locations']}") print(f" Total Quantity: {recall['total_quantity']} units")

---

Tools & Libraries

工具与库

Python Libraries

Python库

Data Processing:
  • pandas
    : Data manipulation
  • numpy
    : Numerical computations
  • sqlalchemy
    : Database connections
APIs & Integration:
  • requests
    : HTTP requests for carrier APIs
  • ftplib
    : FTP file transfer
  • paramiko
    : SSH/SFTP
Real-Time:
  • kafka-python
    : Apache Kafka
  • paho-mqtt
    : MQTT messaging
  • redis
    : Real-time data store
Blockchain:
  • web3.py
    : Ethereum blockchain
  • hyperledger-fabric-sdk-py
    : Hyperledger Fabric
Visualization:
  • matplotlib
    ,
    plotly
    : Tracking dashboards
  • folium
    : Geographic mapping
数据处理:
  • pandas
    : 数据操作
  • numpy
    : 数值计算
  • sqlalchemy
    : 数据库连接
API与集成:
  • requests
    : 用于调用承运商API的HTTP请求库
  • ftplib
    : FTP文件传输
  • paramiko
    : SSH/SFTP
实时处理:
  • kafka-python
    : Apache Kafka客户端
  • paho-mqtt
    : MQTT消息库
  • redis
    : 实时数据存储
区块链:
  • web3.py
    : Ethereum区块链库
  • hyperledger-fabric-sdk-py
    : Hyperledger Fabric SDK
可视化:
  • matplotlib
    ,
    plotly
    : 跟踪仪表盘
  • folium
    : 地理映射

Commercial Software

商业软件

Shipment Tracking:
  • project44: Multi-carrier visibility
  • FourKites: Real-time shipment tracking
  • Shippeo: Supply chain visibility
  • ClearMetal: Predictive logistics visibility
  • Descartes MacroPoint: Load tracking
Traceability Platforms:
  • TraceLink: Serialization and traceability
  • Optel: Track and trace solutions
  • rfxcel: Supply chain traceability
  • IBM Food Trust: Blockchain traceability
  • SAP Information Collaboration Hub: Track and trace
IoT & Sensors:
  • Tive: Supply chain visibility sensors
  • Roambee: Real-time tracking devices
  • Samsara: IoT platform
  • Zest Labs: Fresh food tracking
Blockchain:
  • IBM Blockchain: Enterprise blockchain
  • VeChain: Supply chain blockchain
  • OriginTrail: Decentralized traceability
  • Morpheus.Network: Supply chain platform

货运跟踪:
  • project44: 多承运商可见性平台
  • FourKites: 实时货运跟踪
  • Shippeo: 供应链可见性
  • ClearMetal: 预测物流可见性
  • Descartes MacroPoint: 货物跟踪
溯源平台:
  • TraceLink: 序列化与溯源
  • Optel: 追踪溯源解决方案
  • rfxcel: 供应链溯源
  • IBM Food Trust: 区块链溯源
  • SAP Information Collaboration Hub: 追踪溯源
IoT与传感器:
  • Tive: 供应链可见性传感器
  • Roambee: 实时跟踪设备
  • Samsara: IoT平台
  • Zest Labs: 新鲜食品跟踪
区块链:
  • IBM Blockchain: 企业区块链
  • VeChain: 供应链区块链
  • OriginTrail: 去中心化溯源
  • Morpheus.Network: 供应链平台

Common Challenges & Solutions

常见挑战与解决方案

Challenge: Data Integration

挑战:数据集成

Problem:
  • Multiple disparate systems (ERP, WMS, TMS, carrier systems)
  • Different data formats and standards
  • Real-time vs. batch integration
  • API limitations
Solutions:
  • Middleware/integration platform (MuleSoft, Boomi)
  • Standardized data models (GS1, EPCIS)
  • API-first architecture
  • Event-driven integration
  • Master data management
  • Phased integration approach
问题:
  • 多个异构系统(ERP、WMS、TMS、承运商系统)
  • 不同的数据格式和标准
  • 实时与批量集成的差异
  • API限制
解决方案:
  • 中间件/集成平台(MuleSoft、Boomi)
  • 标准化数据模型(GS1、EPCIS)
  • API优先架构
  • 事件驱动集成
  • 主数据管理
  • 分阶段集成方法

Challenge: Data Quality & Accuracy

挑战:数据质量与准确性

Problem:
  • Incomplete tracking events
  • Delayed updates
  • Incorrect locations
  • Missing scans
Solutions:
  • Automated data validation
  • Exception reporting and alerts
  • Multiple data sources (triangulation)
  • IoT sensors for automation
  • Training and process discipline
  • Data reconciliation processes
问题:
  • 跟踪事件不完整
  • 更新延迟
  • 位置信息错误
  • 扫描缺失
解决方案:
  • 自动化数据验证
  • 异常报告与告警
  • 多数据源验证(三角测量)
  • IoT传感器自动化采集
  • 流程培训与规范
  • 数据对账流程

Challenge: Real-Time Performance

挑战:实时性能

Problem:
  • High volume of tracking events
  • Latency in updates
  • System performance degradation
  • Need for instant visibility
Solutions:
  • Event-driven architecture (Kafka, MQTT)
  • In-memory databases (Redis)
  • Caching strategies
  • Horizontal scaling
  • Edge computing for IoT
  • Asynchronous processing
问题:
  • 跟踪事件量巨大
  • 更新延迟
  • 系统性能下降
  • 需要即时可见性
解决方案:
  • 事件驱动架构(Kafka、MQTT)
  • 内存数据库(Redis)
  • 缓存策略
  • 水平扩展
  • IoT边缘计算
  • 异步处理

Challenge: Complexity at Scale

挑战:规模化复杂度

Problem:
  • Millions of shipments/products
  • Multi-tier supply chains
  • Global operations
  • Combinatorial explosion
Solutions:
  • Hierarchical tracking (pallet → case → unit)
  • Selective tracking (based on value/risk)
  • Archive old data
  • Distributed systems
  • Cloud infrastructure
  • Efficient data structures
问题:
  • 数百万级的货运/产品
  • 多层级供应链
  • 全球化运营
  • 组合爆炸
解决方案:
  • 分层跟踪(托盘 → 箱 → 单元)
  • 选择性跟踪(基于价值/风险)
  • 归档历史数据
  • 分布式系统
  • 云基础设施
  • 高效数据结构

Challenge: Supplier/Partner Integration

挑战:供应商/合作伙伴集成

Problem:
  • Suppliers lack technology
  • Reluctance to share data
  • Different standards
  • Small partners
Solutions:
  • Tiered approach (EDI, APIs, portals, email)
  • Standardized formats (GS1, EPCIS)
  • Incentives for participation
  • Technology provision
  • Industry collaboration
  • Gradual onboarding
问题:
  • 供应商缺乏技术能力
  • 不愿共享数据
  • 标准不一致
  • 小型合作伙伴
解决方案:
  • 分层集成方式(EDI、API、门户、邮件)
  • 标准化格式(GS1、EPCIS)
  • 参与激励机制
  • 提供技术支持
  • 行业协作
  • 逐步接入

Challenge: Cost Justification

挑战:成本合理性

Problem:
  • High implementation costs
  • Ongoing operational costs
  • Intangible benefits
  • Long payback
Solutions:
  • Start with pilot (high-value use case)
  • Quantify benefits (efficiency, service, compliance)
  • Phased investment
  • Cloud-based SaaS models
  • Shared industry infrastructure
  • Compliance as driver

问题:
  • 实施成本高
  • 持续运营成本
  • 收益难以量化
  • 投资回报周期长
解决方案:
  • 从试点开始(高价值使用场景)
  • 量化收益(效率、服务、合规)
  • 分阶段投资
  • 云原生SaaS模式
  • 行业共享基础设施
  • 以合规为驱动因素

Output Format

输出格式

Track-and-Trace Dashboard

追踪溯源仪表盘

Executive Summary:
  • Total shipments/products tracked
  • Tracking performance
  • Exceptions and issues
  • Key metrics
Shipment Tracking:
Shipment IDOriginDestinationStatusCurrent LocationETADelayExceptions
SHP-10234NYCLAXIn TransitMemphis HubJan 15On TimeNone
SHP-10235CHIMIADelayedAtlantaJan 16+2 daysWeather
SHP-10236SEABOSExceptionPortlandTBD+5 daysCustoms Hold
Traceability:
Lot/Batch IDProductQuantityManufacturing DateExpiryCurrent LocationStatus
LOT-A12345Product X1,0002025-11-152027-11-15DC EastActive
LOT-A12346Product X8002025-11-202027-11-20Customer ABCSold
LOT-A12347Product X5002025-11-252027-11-25RecallQuarantine
Performance Metrics:
MetricValueTargetStatus
Shipments Tracked15,234N/A
Real-Time Visibility98.2%95.0%✓ Above
On-Time Delivery91.5%92.0%⚠ Slightly Below
Exception Rate4.3%<5.0%✓ On Track
Traceability Coverage99.8%100%✓ Near Target
Exceptions Summary:
Exception TypeCountAvg Resolution TimeOldest Open
Delay12718 hours3 days
Customs Hold234.2 days8 days
Damaged82 days1 day
Lost2Pending12 days

执行摘要:
  • 已跟踪的货运/产品总数
  • 跟踪性能
  • 异常与问题
  • 关键指标
货运跟踪:
货运ID始发地目的地状态当前位置预计到达时间延迟情况异常
SHP-10234NYCLAX运输中Memphis枢纽1月15日准时
SHP-10235CHIMIA延迟Atlanta1月16日+2天天气原因
SHP-10236SEABOS异常Portland待定+5天海关扣留
产品溯源:
批次ID产品数量生产日期有效期当前位置状态
LOT-A12345Product X1,0002025-11-152027-11-15东部配送中心活跃
LOT-A12346Product X8002025-11-202027-11-20客户ABC已售出
LOT-A12347Product X5002025-11-252027-11-25召回隔离
性能指标:
指标数值目标状态
已跟踪货运数15,234N/A
实时可见性覆盖率98.2%95.0%✓ 超出目标
准时交付率91.5%92.0%⚠ 略低于目标
异常率4.3%<5.0%✓ 符合预期
溯源覆盖率99.8%100%✓ 接近目标
异常摘要:
异常类型数量平均解决时间最早未解决异常
延迟12718小时3天
海关扣留234.2天8天
损坏82天1天
丢失2待处理12天

Questions to Ask

需询问的问题

If you need more context:
  1. What needs tracking? (shipments, products, assets, components)
  2. What level of granularity? (pallet, case, unit, serial)
  3. What are the regulatory requirements? (FDA, FSMA, EU MDR)
  4. What systems need integration? (ERP, WMS, TMS, carrier systems)
  5. Is real-time tracking required or periodic updates sufficient?
  6. Who needs visibility? (internal teams, customers, suppliers, regulators)
  7. What's the geographic scope? (domestic, international, multi-region)
  8. What data capture methods exist? (barcodes, RFID, IoT, manual)
  9. Are there recall or compliance scenarios to support?
  10. What's the budget and timeline?

如需更多上下文,请询问:
  1. 需要跟踪什么?(货运、产品、资产、组件)
  2. 需要什么粒度?(托盘、箱、单个单元、序列号)
  3. 有哪些监管要求?(FDA、FSMA、EU MDR)
  4. 需要集成哪些系统?(ERP、WMS、TMS、承运商系统)
  5. 需要实时跟踪还是定期更新即可?
  6. 哪些主体需要可见性?(内部团队、客户、供应商、监管机构)
  7. 地理范围是什么?(国内、国际、多区域)
  8. 现有哪些数据采集方式?(条形码、RFID、IoT、手动)
  9. 是否需要支持召回或合规场景?
  10. 预算和时间线是什么?

Related Skills

相关技能

  • control-tower-design: For centralized monitoring and visibility
  • compliance-management: For regulatory traceability requirements
  • circular-economy: For reverse logistics tracking
  • supplier-collaboration: For supplier visibility integration
  • risk-mitigation: For tracking disruptions and risks
  • freight-optimization: For transportation visibility
  • route-optimization: For shipment tracking and optimization
  • quality-management: For quality traceability
  • control-tower-design: 用于集中监控与可见性
  • compliance-management: 用于监管溯源要求
  • circular-economy: 用于逆向物流跟踪
  • supplier-collaboration: 用于供应商可见性集成
  • risk-mitigation: 用于跟踪中断与风险
  • freight-optimization: 用于运输可见性
  • route-optimization: 用于货运跟踪与优化
  • quality-management: 用于质量溯源