tensorflow-data-pipelines

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

TensorFlow Data Pipelines

TensorFlow 数据管道

Build efficient, scalable data pipelines using the tf.data API for optimal training performance. This skill covers dataset creation, transformations, batching, shuffling, prefetching, and advanced optimization techniques to maximize GPU/TPU utilization.
使用tf.data API构建高效、可扩展的数据管道,以实现最佳训练性能。本技能涵盖数据集创建、转换、批处理、打乱、预取以及高级优化技术,最大限度提升GPU/TPU的利用率。

Dataset Creation

数据集创建

From Tensor Slices

从张量切片创建

python
import tensorflow as tf
import numpy as np
python
import tensorflow as tf
import numpy as np

Create dataset from numpy arrays

Create dataset from numpy arrays

x_train = np.random.rand(1000, 28, 28, 1) y_train = np.random.randint(0, 10, 1000)
x_train = np.random.rand(1000, 28, 28, 1) y_train = np.random.randint(0, 10, 1000)

Method 1: from_tensor_slices

Method 1: from_tensor_slices

dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))

Apply transformations

Apply transformations

dataset = dataset.shuffle(buffer_size=1024) dataset = dataset.batch(32) dataset = dataset.prefetch(tf.data.AUTOTUNE)
dataset = dataset.shuffle(buffer_size=1024) dataset = dataset.batch(32) dataset = dataset.prefetch(tf.data.AUTOTUNE)

Iterate through dataset

Iterate through dataset

for batch_x, batch_y in dataset.take(2): print(f"Batch shape: {batch_x.shape}, Labels shape: {batch_y.shape}")
undefined
for batch_x, batch_y in dataset.take(2): print(f"Batch shape: {batch_x.shape}, Labels shape: {batch_y.shape}")
undefined

From Generator Functions

从生成器函数创建

python
def data_generator():
    """Generator function for custom data loading."""
    for i in range(1000):
        # Simulate loading data from disk or API
        x = np.random.rand(28, 28, 1).astype(np.float32)
        y = np.random.randint(0, 10)
        yield x, y
python
def data_generator():
    """Generator function for custom data loading."""
    for i in range(1000):
        # Simulate loading data from disk or API
        x = np.random.rand(28, 28, 1).astype(np.float32)
        y = np.random.randint(0, 10)
        yield x, y

Create dataset from generator

Create dataset from generator

dataset = tf.data.Dataset.from_generator( data_generator, output_signature=( tf.TensorSpec(shape=(28, 28, 1), dtype=tf.float32), tf.TensorSpec(shape=(), dtype=tf.int32) ) )
dataset = dataset.batch(32).prefetch(tf.data.AUTOTUNE)
undefined
dataset = tf.data.Dataset.from_generator( data_generator, output_signature=( tf.TensorSpec(shape=(28, 28, 1), dtype=tf.float32), tf.TensorSpec(shape=(), dtype=tf.int32) ) )
dataset = dataset.batch(32).prefetch(tf.data.AUTOTUNE)
undefined

From Dataset Range

从数据集范围创建

python
undefined
python
undefined

Create simple range dataset

Create simple range dataset

dataset = tf.data.Dataset.range(1000)
dataset = tf.data.Dataset.range(1000)

Use with custom mapping

Use with custom mapping

dataset = dataset.map(lambda x: (tf.random.normal([28, 28, 1]), x % 10)) dataset = dataset.batch(32)
undefined
dataset = dataset.map(lambda x: (tf.random.normal([28, 28, 1]), x % 10)) dataset = dataset.batch(32)
undefined

Data Transformation

数据转换

Normalization Pipeline

归一化管道

python
def normalize(image, label):
    """Normalize pixel values."""
    image = tf.cast(image, tf.float32) / 255.0
    return image, label
python
def normalize(image, label):
    """Normalize pixel values."""
    image = tf.cast(image, tf.float32) / 255.0
    return image, label

Apply normalization

Apply normalization

train_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(normalize, num_parallel_calls=tf.data.AUTOTUNE) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined
train_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(normalize, num_parallel_calls=tf.data.AUTOTUNE) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined

Data Augmentation Pipeline

数据增强管道

python
def augment(image, label):
    """Apply random augmentations."""
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_brightness(image, 0.2)
    image = tf.image.random_contrast(image, 0.8, 1.2)
    return image, label

def normalize(image, label):
    """Normalize pixel values."""
    image = tf.cast(image, tf.float32) / 255.0
    return image, label
python
def augment(image, label):
    """Apply random augmentations."""
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_brightness(image, 0.2)
    image = tf.image.random_contrast(image, 0.8, 1.2)
    return image, label

def normalize(image, label):
    """Normalize pixel values."""
    image = tf.cast(image, tf.float32) / 255.0
    return image, label

Build complete pipeline

Build complete pipeline

train_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(normalize, num_parallel_calls=tf.data.AUTOTUNE) .cache() # Cache after normalization .shuffle(1000) .map(augment, num_parallel_calls=tf.data.AUTOTUNE) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined
train_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(normalize, num_parallel_calls=tf.data.AUTOTUNE) .cache() # Cache after normalization .shuffle(1000) .map(augment, num_parallel_calls=tf.data.AUTOTUNE) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined

Multiple Transformations

多步转换

python
def resize_image(image, label):
    """Resize images to target size."""
    image = tf.image.resize(image, [224, 224])
    return image, label

def apply_random_rotation(image, label):
    """Apply random rotation augmentation."""
    angle = tf.random.uniform([], -0.2, 0.2)
    image = tfa.image.rotate(image, angle)
    return image, label
python
def resize_image(image, label):
    """Resize images to target size."""
    image = tf.image.resize(image, [224, 224])
    return image, label

def apply_random_rotation(image, label):
    """Apply random rotation augmentation."""
    angle = tf.random.uniform([], -0.2, 0.2)
    image = tfa.image.rotate(image, angle)
    return image, label

Chain multiple transformations

Chain multiple transformations

dataset = ( tf.data.Dataset.from_tensor_slices((images, labels)) .map(resize_image, num_parallel_calls=tf.data.AUTOTUNE) .map(normalize, num_parallel_calls=tf.data.AUTOTUNE) .cache() .shuffle(10000) .map(augment, num_parallel_calls=tf.data.AUTOTUNE) .map(apply_random_rotation, num_parallel_calls=tf.data.AUTOTUNE) .batch(64) .prefetch(tf.data.AUTOTUNE) )
undefined
dataset = ( tf.data.Dataset.from_tensor_slices((images, labels)) .map(resize_image, num_parallel_calls=tf.data.AUTOTUNE) .map(normalize, num_parallel_calls=tf.data.AUTOTUNE) .cache() .shuffle(10000) .map(augment, num_parallel_calls=tf.data.AUTOTUNE) .map(apply_random_rotation, num_parallel_calls=tf.data.AUTOTUNE) .batch(64) .prefetch(tf.data.AUTOTUNE) )
undefined

Batching and Shuffling

批处理与打乱

Basic Batching Configuration

基础批处理配置

python
undefined
python
undefined

Batch size

Batch size

BATCH_SIZE = 64
BATCH_SIZE = 64

Buffer size to shuffle the dataset

Buffer size to shuffle the dataset

(TF data is designed to work with possibly infinite sequences,

(TF data is designed to work with possibly infinite sequences,

so it doesn't attempt to shuffle the entire sequence in memory. Instead,

so it doesn't attempt to shuffle the entire sequence in memory. Instead,

it maintains a buffer in which it shuffles elements).

it maintains a buffer in which it shuffles elements).

BUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
undefined
BUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
undefined

Dynamic Batching

动态批处理

python
undefined
python
undefined

Variable batch sizes based on sequence length

Variable batch sizes based on sequence length

def batch_by_sequence_length(dataset, batch_size, max_length): """Batch sequences by length for efficient padding.""" def key_func(x, y): # Bucket by length return tf.cast(tf.size(x) / max_length * 10, tf.int64)
def reduce_func(key, dataset):
    return dataset.batch(batch_size)

return dataset.group_by_window(
    key_func=key_func,
    reduce_func=reduce_func,
    window_size=batch_size
)
undefined
def batch_by_sequence_length(dataset, batch_size, max_length): """Batch sequences by length for efficient padding.""" def key_func(x, y): # Bucket by length return tf.cast(tf.size(x) / max_length * 10, tf.int64)
def reduce_func(key, dataset):
    return dataset.batch(batch_size)

return dataset.group_by_window(
    key_func=key_func,
    reduce_func=reduce_func,
    window_size=batch_size
)
undefined

Stratified Sampling

分层采样

python
def create_stratified_dataset(features, labels, batch_size):
    """Create dataset with balanced class sampling."""
    # Separate by class
    datasets = []
    for class_id in range(num_classes):
        mask = labels == class_id
        class_dataset = tf.data.Dataset.from_tensor_slices(
            (features[mask], labels[mask])
        )
        datasets.append(class_dataset)

    # Sample equally from each class
    balanced_dataset = tf.data.Dataset.sample_from_datasets(
        datasets,
        weights=[1.0/num_classes] * num_classes
    )

    return balanced_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
python
def create_stratified_dataset(features, labels, batch_size):
    """Create dataset with balanced class sampling."""
    # Separate by class
    datasets = []
    for class_id in range(num_classes):
        mask = labels == class_id
        class_dataset = tf.data.Dataset.from_tensor_slices(
            (features[mask], labels[mask])
        )
        datasets.append(class_dataset)

    # Sample equally from each class
    balanced_dataset = tf.data.Dataset.sample_from_datasets(
        datasets,
        weights=[1.0/num_classes] * num_classes
    )

    return balanced_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

Performance Optimization

性能优化

Caching Strategies

缓存策略

python
undefined
python
undefined

Cache in memory (for small datasets)

Cache in memory (for small datasets)

dataset = dataset.cache()
dataset = dataset.cache()

Cache to disk (for larger datasets)

Cache to disk (for larger datasets)

dataset = dataset.cache('/tmp/dataset_cache')
dataset = dataset.cache('/tmp/dataset_cache')

Optimal caching placement

Optimal caching placement

dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(expensive_preprocessing, num_parallel_calls=tf.data.AUTOTUNE) .cache() # Cache after expensive operations .shuffle(buffer_size) .map(cheap_augmentation, num_parallel_calls=tf.data.AUTOTUNE) .batch(batch_size) .prefetch(tf.data.AUTOTUNE) )
undefined
dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(expensive_preprocessing, num_parallel_calls=tf.data.AUTOTUNE) .cache() # Cache after expensive operations .shuffle(buffer_size) .map(cheap_augmentation, num_parallel_calls=tf.data.AUTOTUNE) .batch(batch_size) .prefetch(tf.data.AUTOTUNE) )
undefined

Prefetching

预取

python
undefined
python
undefined

Automatic prefetching

Automatic prefetching

dataset = dataset.prefetch(tf.data.AUTOTUNE)
dataset = dataset.prefetch(tf.data.AUTOTUNE)

Manual prefetch buffer size

Manual prefetch buffer size

dataset = dataset.prefetch(buffer_size=2)
dataset = dataset.prefetch(buffer_size=2)

Complete optimized pipeline

Complete optimized pipeline

optimized_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(preprocess, num_parallel_calls=tf.data.AUTOTUNE) .cache() .shuffle(10000) .batch(64) .prefetch(tf.data.AUTOTUNE) )
undefined
optimized_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(preprocess, num_parallel_calls=tf.data.AUTOTUNE) .cache() .shuffle(10000) .batch(64) .prefetch(tf.data.AUTOTUNE) )
undefined

Parallel Data Loading

并行数据加载

python
undefined
python
undefined

Use num_parallel_calls for CPU-bound operations

Use num_parallel_calls for CPU-bound operations

dataset = dataset.map( preprocessing_function, num_parallel_calls=tf.data.AUTOTUNE )
dataset = dataset.map( preprocessing_function, num_parallel_calls=tf.data.AUTOTUNE )

Interleave for parallel file reading

Interleave for parallel file reading

def make_dataset_from_file(filename): return tf.data.TextLineDataset(filename)
filenames = tf.data.Dataset.list_files('/path/to/data/*.csv') dataset = filenames.interleave( make_dataset_from_file, cycle_length=4, num_parallel_calls=tf.data.AUTOTUNE )
undefined
def make_dataset_from_file(filename): return tf.data.TextLineDataset(filename)
filenames = tf.data.Dataset.list_files('/path/to/data/*.csv') dataset = filenames.interleave( make_dataset_from_file, cycle_length=4, num_parallel_calls=tf.data.AUTOTUNE )
undefined

Memory Management

内存管理

python
undefined
python
undefined

Use take() and skip() for train/val split without loading all data

Use take() and skip() for train/val split without loading all data

total_size = 10000 train_size = int(0.8 * total_size)
full_dataset = tf.data.Dataset.from_tensor_slices((x, y))
train_dataset = ( full_dataset .take(train_size) .shuffle(1000) .batch(32) .prefetch(tf.data.AUTOTUNE) )
val_dataset = ( full_dataset .skip(train_size) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined
total_size = 10000 train_size = int(0.8 * total_size)
full_dataset = tf.data.Dataset.from_tensor_slices((x, y))
train_dataset = ( full_dataset .take(train_size) .shuffle(1000) .batch(32) .prefetch(tf.data.AUTOTUNE) )
val_dataset = ( full_dataset .skip(train_size) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined

Advanced Patterns

高级模式

Iterating with For Loops

使用For循环迭代

python
undefined
python
undefined

Basic iteration

Basic iteration

for i in tf.data.Dataset.range(3): tf.print('iteration:', i)
for i in tf.data.Dataset.range(3): tf.print('iteration:', i)

With dataset iterator

With dataset iterator

for i in iter(tf.data.Dataset.range(3)): tf.print('iteration:', i)
undefined
for i in iter(tf.data.Dataset.range(3)): tf.print('iteration:', i)
undefined

Distributed Dataset

分布式数据集

python
undefined
python
undefined

Distribute dataset across devices

Distribute dataset across devices

for i in tf.distribute.OneDeviceStrategy('cpu').experimental_distribute_dataset( tf.data.Dataset.range(3)): tf.print('iteration:', i)
for i in tf.distribute.OneDeviceStrategy('cpu').experimental_distribute_dataset( tf.data.Dataset.range(3)): tf.print('iteration:', i)

Multi-GPU distribution

Multi-GPU distribution

strategy = tf.distribute.MirroredStrategy() distributed_dataset = strategy.experimental_distribute_dataset(dataset)
undefined
strategy = tf.distribute.MirroredStrategy() distributed_dataset = strategy.experimental_distribute_dataset(dataset)
undefined

Training Loop Integration

训练循环集成

python
undefined
python
undefined

Execute training loop over dataset

Execute training loop over dataset

for images, labels in train_ds: if optimizer.iterations > TRAIN_STEPS: break train_step(images, labels)
undefined
for images, labels in train_ds: if optimizer.iterations > TRAIN_STEPS: break train_step(images, labels)
undefined

Vectorized Operations

向量化操作

python
def f(args):
    embeddings, index = args
    # embeddings [vocab_size, embedding_dim]
    # index []
    # desired result: [embedding_dim]
    return tf.gather(params=embeddings, indices=index)

@tf.function
def f_auto_vectorized(embeddings, indices):
    # embeddings [num_heads, vocab_size, embedding_dim]
    # indices [num_heads]
    # desired result: [num_heads, embedding_dim]
    return tf.vectorized_map(f, [embeddings, indices])

concrete_vectorized = f_auto_vectorized.get_concrete_function(
    tf.TensorSpec(shape=[None, 100, 16], dtype=tf.float32),
    tf.TensorSpec(shape=[None], dtype=tf.int32))
python
def f(args):
    embeddings, index = args
    # embeddings [vocab_size, embedding_dim]
    # index []
    # desired result: [embedding_dim]
    return tf.gather(params=embeddings, indices=index)

@tf.function
def f_auto_vectorized(embeddings, indices):
    # embeddings [num_heads, vocab_size, embedding_dim]
    # indices [num_heads]
    # desired result: [num_heads, embedding_dim]
    return tf.vectorized_map(f, [embeddings, indices])

concrete_vectorized = f_auto_vectorized.get_concrete_function(
    tf.TensorSpec(shape=[None, 100, 16], dtype=tf.float32),
    tf.TensorSpec(shape=[None], dtype=tf.int32))

Model Integration

模型集成

Training with tf.data

使用tf.data进行训练

python
undefined
python
undefined

Use dataset with model

Use dataset with model

model = tf.keras.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28, 1)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dense(10, activation='softmax') ]) model.compile(optimizer='adam', loss='sparse_categorical_crossentropy') model.fit(train_dataset, epochs=1)
undefined
model = tf.keras.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28, 1)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dense(10, activation='softmax') ]) model.compile(optimizer='adam', loss='sparse_categorical_crossentropy') model.fit(train_dataset, epochs=1)
undefined

Validation Dataset

验证数据集

python
undefined
python
undefined

Create separate train and validation datasets

Create separate train and validation datasets

train_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000) .batch(32) .prefetch(tf.data.AUTOTUNE) )
val_dataset = ( tf.data.Dataset.from_tensor_slices((x_val, y_val)) .batch(32) .prefetch(tf.data.AUTOTUNE) )
train_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000) .batch(32) .prefetch(tf.data.AUTOTUNE) )
val_dataset = ( tf.data.Dataset.from_tensor_slices((x_val, y_val)) .batch(32) .prefetch(tf.data.AUTOTUNE) )

Train with validation

Train with validation

history = model.fit( train_dataset, validation_data=val_dataset, epochs=10 )
undefined
history = model.fit( train_dataset, validation_data=val_dataset, epochs=10 )
undefined

Custom Training Loop

自定义训练循环

python
@tf.function
def train_step(images, labels):
    with tf.GradientTape() as tape:
        predictions = model(images, training=True)
        loss = loss_fn(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss
python
@tf.function
def train_step(images, labels):
    with tf.GradientTape() as tape:
        predictions = model(images, training=True)
        loss = loss_fn(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss

Training loop with dataset

Training loop with dataset

for epoch in range(epochs): for images, labels in train_dataset: loss = train_step(images, labels) print(f'Epoch {epoch}, Loss: {loss.numpy():.4f}')
undefined
for epoch in range(epochs): for images, labels in train_dataset: loss = train_step(images, labels) print(f'Epoch {epoch}, Loss: {loss.numpy():.4f}')
undefined

File-Based Datasets

基于文件的数据集

TFRecord Files

TFRecord文件

python
undefined
python
undefined

Reading TFRecord files

Reading TFRecord files

def parse_tfrecord(example_proto): feature_description = { 'image': tf.io.FixedLenFeature([], tf.string), 'label': tf.io.FixedLenFeature([], tf.int64), } parsed = tf.io.parse_single_example(example_proto, feature_description) image = tf.io.decode_raw(parsed['image'], tf.float32) image = tf.reshape(image, [28, 28, 1]) label = parsed['label'] return image, label
def parse_tfrecord(example_proto): feature_description = { 'image': tf.io.FixedLenFeature([], tf.string), 'label': tf.io.FixedLenFeature([], tf.int64), } parsed = tf.io.parse_single_example(example_proto, feature_description) image = tf.io.decode_raw(parsed['image'], tf.float32) image = tf.reshape(image, [28, 28, 1]) label = parsed['label'] return image, label

Load TFRecord dataset

Load TFRecord dataset

tfrecord_dataset = ( tf.data.TFRecordDataset(['data_shard_1.tfrecord', 'data_shard_2.tfrecord']) .map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE) .shuffle(10000) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined
tfrecord_dataset = ( tf.data.TFRecordDataset(['data_shard_1.tfrecord', 'data_shard_2.tfrecord']) .map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE) .shuffle(10000) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined

CSV Files

CSV文件

python
undefined
python
undefined

Load CSV dataset

Load CSV dataset

def parse_csv(line): columns = tf.io.decode_csv(line, record_defaults=[0.0] * 785) label = tf.cast(columns[0], tf.int32) features = tf.stack(columns[1:]) features = tf.reshape(features, [28, 28, 1]) return features, label
csv_dataset = ( tf.data.TextLineDataset(['data.csv']) .skip(1) # Skip header .map(parse_csv, num_parallel_calls=tf.data.AUTOTUNE) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined
def parse_csv(line): columns = tf.io.decode_csv(line, record_defaults=[0.0] * 785) label = tf.cast(columns[0], tf.int32) features = tf.stack(columns[1:]) features = tf.reshape(features, [28, 28, 1]) return features, label
csv_dataset = ( tf.data.TextLineDataset(['data.csv']) .skip(1) # Skip header .map(parse_csv, num_parallel_calls=tf.data.AUTOTUNE) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined

Image Files

图像文件

python
def load_and_preprocess_image(path, label):
    """Load image from file and preprocess."""
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [224, 224])
    image = tf.cast(image, tf.float32) / 255.0
    return image, label
python
def load_and_preprocess_image(path, label):
    """Load image from file and preprocess."""
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [224, 224])
    image = tf.cast(image, tf.float32) / 255.0
    return image, label

Create dataset from image paths

Create dataset from image paths

image_paths = ['/path/to/image1.jpg', '/path/to/image2.jpg', ...] labels = [0, 1, ...]
image_dataset = ( tf.data.Dataset.from_tensor_slices((image_paths, labels)) .map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE) .cache() .shuffle(1000) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined
image_paths = ['/path/to/image1.jpg', '/path/to/image2.jpg', ...] labels = [0, 1, ...]
image_dataset = ( tf.data.Dataset.from_tensor_slices((image_paths, labels)) .map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE) .cache() .shuffle(1000) .batch(32) .prefetch(tf.data.AUTOTUNE) )
undefined

Data Validation

数据验证

DataLoader Generation

DataLoader生成

python
undefined
python
undefined

Generate TensorFlow dataset with batching

Generate TensorFlow dataset with batching

def gen_dataset( batch_size=1, is_training=False, shuffle=False, input_pipeline_context=None, preprocess=None, drop_remainder=True, total_steps=None ): """Generate dataset with specified configuration.""" dataset = tf.data.Dataset.from_tensor_slices((features, labels))
if shuffle:
    dataset = dataset.shuffle(buffer_size=10000)

if preprocess:
    dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)

dataset = dataset.batch(batch_size, drop_remainder=drop_remainder)

if is_training:
    dataset = dataset.repeat()

dataset = dataset.prefetch(tf.data.AUTOTUNE)

if total_steps:
    dataset = dataset.take(total_steps)

return dataset
undefined
def gen_dataset( batch_size=1, is_training=False, shuffle=False, input_pipeline_context=None, preprocess=None, drop_remainder=True, total_steps=None ): """Generate dataset with specified configuration.""" dataset = tf.data.Dataset.from_tensor_slices((features, labels))
if shuffle:
    dataset = dataset.shuffle(buffer_size=10000)

if preprocess:
    dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)

dataset = dataset.batch(batch_size, drop_remainder=drop_remainder)

if is_training:
    dataset = dataset.repeat()

dataset = dataset.prefetch(tf.data.AUTOTUNE)

if total_steps:
    dataset = dataset.take(total_steps)

return dataset
undefined

When to Use This Skill

何时使用本技能

Use the tensorflow-data-pipelines skill when you need to:
  • Load and preprocess large datasets that don't fit in memory
  • Implement data augmentation for training robustness
  • Optimize data loading to prevent GPU/TPU idle time
  • Create custom data generators for specialized formats
  • Build multi-modal pipelines with images, text, and audio
  • Implement efficient batching strategies for variable-length sequences
  • Cache preprocessed data to speed up training
  • Handle distributed training across multiple devices
  • Parse TFRecord, CSV, or other file formats
  • Implement stratified sampling for imbalanced datasets
  • Create reproducible data shuffling
  • Build real-time data augmentation pipelines
  • Optimize memory usage with streaming datasets
  • Implement prefetching for pipeline parallelism
  • Create validation and test splits efficiently
当你需要以下操作时,使用tensorflow-data-pipelines技能:
  • 加载和预处理无法放入内存的大型数据集
  • 实现数据增强以提升训练鲁棒性
  • 优化数据加载以避免GPU/TPU空闲
  • 为特殊格式创建自定义数据生成器
  • 构建包含图像、文本和音频的多模态管道
  • 为变长序列实现高效的批处理策略
  • 缓存预处理数据以加速训练
  • 处理跨多设备的分布式训练
  • 解析TFRecord、CSV或其他文件格式
  • 为不平衡数据集实现分层采样
  • 创建可复现的数据打乱机制
  • 构建实时数据增强管道
  • 使用流式数据集优化内存使用
  • 实现预取以进行管道并行
  • 高效创建验证集和测试集拆分

Best Practices

最佳实践

  1. Always use prefetch() - Add .prefetch(tf.data.AUTOTUNE) at the end of pipeline to overlap data loading with training
  2. Use num_parallel_calls=AUTOTUNE - Let TensorFlow automatically tune parallelism for map operations
  3. Cache after expensive operations - Place .cache() after preprocessing but before augmentation and shuffling
  4. Shuffle before batching - Call .shuffle() before .batch() to ensure random batches
  5. Use appropriate buffer sizes - Shuffle buffer should be >= dataset size for perfect shuffling, or at least several thousand
  6. Normalize data in pipeline - Apply normalization in map() function for consistency across train/val/test
  7. Batch after transformations - Apply .batch() after all element-wise transformations for efficiency
  8. Use drop_remainder for training - Set drop_remainder=True in batch() to ensure consistent batch sizes
  9. Leverage AUTOTUNE - Use tf.data.AUTOTUNE for automatic performance tuning instead of manual values
  10. Apply augmentation after caching - Cache deterministic preprocessing, apply random augmentation after
  11. Use interleave for file reading - Parallel file reading with interleave() for large multi-file datasets
  12. Repeat for infinite datasets - Use .repeat() for training datasets to avoid dataset exhaustion
  13. Use take/skip for splits - Efficiently split datasets without loading all data into memory
  14. Monitor pipeline performance - Use TensorFlow Profiler to identify bottlenecks in data pipeline
  15. Shard data for distribution - Use shard() for distributed training across multiple workers
  1. 始终使用prefetch() - 在管道末尾添加.prefetch(tf.data.AUTOTUNE),使数据加载与训练重叠
  2. 使用num_parallel_calls=AUTOTUNE - 让TensorFlow自动调整map操作的并行度
  3. 在昂贵操作后缓存 - 将.cache()放在预处理之后、增强和打乱之前
  4. 先打乱再批处理 - 在.batch()之前调用.shuffle()以确保批次的随机性
  5. 使用合适的缓冲区大小 - 打乱缓冲区应大于等于数据集大小以实现完全打乱,或至少设置为几千
  6. 在管道中归一化数据 - 在map()函数中应用归一化,确保训练/验证/测试集的一致性
  7. 转换后再批处理 - 在所有元素级转换后应用.batch()以提升效率
  8. 训练时使用drop_remainder - 在batch()中设置drop_remainder=True以确保批次大小一致
  9. 利用AUTOTUNE - 使用tf.data.AUTOTUNE进行自动性能调优,而非手动设置数值
  10. 缓存后应用增强 - 缓存确定性预处理,在缓存后应用随机增强
  11. 使用interleave读取文件 - 对大型多文件数据集使用interleave()进行并行文件读取
  12. 对无限数据集使用repeat - 对训练数据集使用.repeat()以避免数据集耗尽
  13. 使用take/skip进行拆分 - 无需将所有数据加载到内存即可高效拆分数据集
  14. 监控管道性能 - 使用TensorFlow Profiler识别数据管道中的瓶颈
  15. 为分布式训练分片数据 - 使用shard()在多工作节点间分片数据

Common Pitfalls

常见陷阱

  1. Shuffling after batching - Shuffles batches instead of individual samples, reducing randomness
  2. Not using prefetch - GPU sits idle waiting for data, wasting compute resources
  3. Cache in wrong position - Caching after augmentation prevents randomness, before preprocessing wastes memory
  4. Buffer size too small - Insufficient shuffle buffer leads to poor randomization and training issues
  5. Not using num_parallel_calls - Sequential map operations create bottlenecks in data loading
  6. Loading entire dataset to memory - Use tf.data instead of loading all data with NumPy
  7. Applying augmentation deterministically - Same augmentations every epoch reduce training effectiveness
  8. Not setting random seeds - Irreproducible results and debugging difficulties
  9. Ignoring batch remainder - Variable batch sizes cause errors in models expecting fixed dimensions
  10. Repeating validation dataset - Validation should not repeat, only training datasets
  11. Not using AUTOTUNE - Manual tuning is difficult and suboptimal compared to automatic optimization
  12. Caching very large datasets - Exceeds memory limits and causes OOM errors
  13. Too many parallel operations - Excessive parallelism causes thread contention and slowdown
  14. Not monitoring data loading time - Data pipeline can become training bottleneck without monitoring
  15. Applying normalization inconsistently - Different normalization for train/val/test causes poor performance
  1. 批处理后再打乱 - 会打乱批次而非单个样本,降低随机性
  2. 不使用prefetch - GPU等待数据时处于空闲状态,浪费计算资源
  3. 缓存位置错误 - 在增强后缓存会失去随机性,在预处理前缓存会浪费内存
  4. 缓冲区大小过小 - 不足的打乱缓冲区会导致随机化效果差,引发训练问题
  5. 不使用num_parallel_calls - 顺序执行map操作会造成数据加载瓶颈
  6. 将整个数据集加载到内存 - 使用tf.data而非NumPy加载所有数据
  7. 确定性地应用增强 - 每个 epoch 使用相同的增强会降低训练效果
  8. 不设置随机种子 - 导致结果不可复现,增加调试难度
  9. 忽略批次剩余数据 - 可变批次大小会导致模型因期望固定维度而出错
  10. 重复验证数据集 - 验证集不应重复,仅训练数据集需要重复
  11. 不使用AUTOTUNE - 手动调优难度大,且效果不如自动优化
  12. 缓存超大型数据集 - 超出内存限制,引发OOM错误
  13. 并行操作过多 - 过度并行会导致线程竞争,降低速度
  14. 不监控数据加载时间 - 数据管道可能成为训练瓶颈却未被发现
  15. 归一化应用不一致 - 训练/验证/测试集使用不同的归一化会导致性能下降

Resources

资源