tensorflow-data-pipelines
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseTensorFlow Data Pipelines
TensorFlow 数据管道
Build efficient, scalable data pipelines using the tf.data API for optimal training performance. This skill covers dataset creation, transformations, batching, shuffling, prefetching, and advanced optimization techniques to maximize GPU/TPU utilization.
使用tf.data API构建高效、可扩展的数据管道,以实现最佳训练性能。本技能涵盖数据集创建、转换、批处理、打乱、预取以及高级优化技术,最大限度提升GPU/TPU的利用率。
Dataset Creation
数据集创建
From Tensor Slices
从张量切片创建
python
import tensorflow as tf
import numpy as nppython
import tensorflow as tf
import numpy as npCreate dataset from numpy arrays
Create dataset from numpy arrays
x_train = np.random.rand(1000, 28, 28, 1)
y_train = np.random.randint(0, 10, 1000)
x_train = np.random.rand(1000, 28, 28, 1)
y_train = np.random.randint(0, 10, 1000)
Method 1: from_tensor_slices
Method 1: from_tensor_slices
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
Apply transformations
Apply transformations
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
Iterate through dataset
Iterate through dataset
for batch_x, batch_y in dataset.take(2):
print(f"Batch shape: {batch_x.shape}, Labels shape: {batch_y.shape}")
undefinedfor batch_x, batch_y in dataset.take(2):
print(f"Batch shape: {batch_x.shape}, Labels shape: {batch_y.shape}")
undefinedFrom Generator Functions
从生成器函数创建
python
def data_generator():
"""Generator function for custom data loading."""
for i in range(1000):
# Simulate loading data from disk or API
x = np.random.rand(28, 28, 1).astype(np.float32)
y = np.random.randint(0, 10)
yield x, ypython
def data_generator():
"""Generator function for custom data loading."""
for i in range(1000):
# Simulate loading data from disk or API
x = np.random.rand(28, 28, 1).astype(np.float32)
y = np.random.randint(0, 10)
yield x, yCreate dataset from generator
Create dataset from generator
dataset = tf.data.Dataset.from_generator(
data_generator,
output_signature=(
tf.TensorSpec(shape=(28, 28, 1), dtype=tf.float32),
tf.TensorSpec(shape=(), dtype=tf.int32)
)
)
dataset = dataset.batch(32).prefetch(tf.data.AUTOTUNE)
undefineddataset = tf.data.Dataset.from_generator(
data_generator,
output_signature=(
tf.TensorSpec(shape=(28, 28, 1), dtype=tf.float32),
tf.TensorSpec(shape=(), dtype=tf.int32)
)
)
dataset = dataset.batch(32).prefetch(tf.data.AUTOTUNE)
undefinedFrom Dataset Range
从数据集范围创建
python
undefinedpython
undefinedCreate simple range dataset
Create simple range dataset
dataset = tf.data.Dataset.range(1000)
dataset = tf.data.Dataset.range(1000)
Use with custom mapping
Use with custom mapping
dataset = dataset.map(lambda x: (tf.random.normal([28, 28, 1]), x % 10))
dataset = dataset.batch(32)
undefineddataset = dataset.map(lambda x: (tf.random.normal([28, 28, 1]), x % 10))
dataset = dataset.batch(32)
undefinedData Transformation
数据转换
Normalization Pipeline
归一化管道
python
def normalize(image, label):
"""Normalize pixel values."""
image = tf.cast(image, tf.float32) / 255.0
return image, labelpython
def normalize(image, label):
"""Normalize pixel values."""
image = tf.cast(image, tf.float32) / 255.0
return image, labelApply normalization
Apply normalization
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefinedtrain_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefinedData Augmentation Pipeline
数据增强管道
python
def augment(image, label):
"""Apply random augmentations."""
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, 0.2)
image = tf.image.random_contrast(image, 0.8, 1.2)
return image, label
def normalize(image, label):
"""Normalize pixel values."""
image = tf.cast(image, tf.float32) / 255.0
return image, labelpython
def augment(image, label):
"""Apply random augmentations."""
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, 0.2)
image = tf.image.random_contrast(image, 0.8, 1.2)
return image, label
def normalize(image, label):
"""Normalize pixel values."""
image = tf.cast(image, tf.float32) / 255.0
return image, labelBuild complete pipeline
Build complete pipeline
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.cache() # Cache after normalization
.shuffle(1000)
.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefinedtrain_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.cache() # Cache after normalization
.shuffle(1000)
.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefinedMultiple Transformations
多步转换
python
def resize_image(image, label):
"""Resize images to target size."""
image = tf.image.resize(image, [224, 224])
return image, label
def apply_random_rotation(image, label):
"""Apply random rotation augmentation."""
angle = tf.random.uniform([], -0.2, 0.2)
image = tfa.image.rotate(image, angle)
return image, labelpython
def resize_image(image, label):
"""Resize images to target size."""
image = tf.image.resize(image, [224, 224])
return image, label
def apply_random_rotation(image, label):
"""Apply random rotation augmentation."""
angle = tf.random.uniform([], -0.2, 0.2)
image = tfa.image.rotate(image, angle)
return image, labelChain multiple transformations
Chain multiple transformations
dataset = (
tf.data.Dataset.from_tensor_slices((images, labels))
.map(resize_image, num_parallel_calls=tf.data.AUTOTUNE)
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(10000)
.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
.map(apply_random_rotation, num_parallel_calls=tf.data.AUTOTUNE)
.batch(64)
.prefetch(tf.data.AUTOTUNE)
)
undefineddataset = (
tf.data.Dataset.from_tensor_slices((images, labels))
.map(resize_image, num_parallel_calls=tf.data.AUTOTUNE)
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(10000)
.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
.map(apply_random_rotation, num_parallel_calls=tf.data.AUTOTUNE)
.batch(64)
.prefetch(tf.data.AUTOTUNE)
)
undefinedBatching and Shuffling
批处理与打乱
Basic Batching Configuration
基础批处理配置
python
undefinedpython
undefinedBatch size
Batch size
BATCH_SIZE = 64
BATCH_SIZE = 64
Buffer size to shuffle the dataset
Buffer size to shuffle the dataset
(TF data is designed to work with possibly infinite sequences,
(TF data is designed to work with possibly infinite sequences,
so it doesn't attempt to shuffle the entire sequence in memory. Instead,
so it doesn't attempt to shuffle the entire sequence in memory. Instead,
it maintains a buffer in which it shuffles elements).
it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
undefinedBUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
undefinedDynamic Batching
动态批处理
python
undefinedpython
undefinedVariable batch sizes based on sequence length
Variable batch sizes based on sequence length
def batch_by_sequence_length(dataset, batch_size, max_length):
"""Batch sequences by length for efficient padding."""
def key_func(x, y):
# Bucket by length
return tf.cast(tf.size(x) / max_length * 10, tf.int64)
def reduce_func(key, dataset):
return dataset.batch(batch_size)
return dataset.group_by_window(
key_func=key_func,
reduce_func=reduce_func,
window_size=batch_size
)undefineddef batch_by_sequence_length(dataset, batch_size, max_length):
"""Batch sequences by length for efficient padding."""
def key_func(x, y):
# Bucket by length
return tf.cast(tf.size(x) / max_length * 10, tf.int64)
def reduce_func(key, dataset):
return dataset.batch(batch_size)
return dataset.group_by_window(
key_func=key_func,
reduce_func=reduce_func,
window_size=batch_size
)undefinedStratified Sampling
分层采样
python
def create_stratified_dataset(features, labels, batch_size):
"""Create dataset with balanced class sampling."""
# Separate by class
datasets = []
for class_id in range(num_classes):
mask = labels == class_id
class_dataset = tf.data.Dataset.from_tensor_slices(
(features[mask], labels[mask])
)
datasets.append(class_dataset)
# Sample equally from each class
balanced_dataset = tf.data.Dataset.sample_from_datasets(
datasets,
weights=[1.0/num_classes] * num_classes
)
return balanced_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)python
def create_stratified_dataset(features, labels, batch_size):
"""Create dataset with balanced class sampling."""
# Separate by class
datasets = []
for class_id in range(num_classes):
mask = labels == class_id
class_dataset = tf.data.Dataset.from_tensor_slices(
(features[mask], labels[mask])
)
datasets.append(class_dataset)
# Sample equally from each class
balanced_dataset = tf.data.Dataset.sample_from_datasets(
datasets,
weights=[1.0/num_classes] * num_classes
)
return balanced_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)Performance Optimization
性能优化
Caching Strategies
缓存策略
python
undefinedpython
undefinedCache in memory (for small datasets)
Cache in memory (for small datasets)
dataset = dataset.cache()
dataset = dataset.cache()
Cache to disk (for larger datasets)
Cache to disk (for larger datasets)
dataset = dataset.cache('/tmp/dataset_cache')
dataset = dataset.cache('/tmp/dataset_cache')
Optimal caching placement
Optimal caching placement
dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(expensive_preprocessing, num_parallel_calls=tf.data.AUTOTUNE)
.cache() # Cache after expensive operations
.shuffle(buffer_size)
.map(cheap_augmentation, num_parallel_calls=tf.data.AUTOTUNE)
.batch(batch_size)
.prefetch(tf.data.AUTOTUNE)
)
undefineddataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(expensive_preprocessing, num_parallel_calls=tf.data.AUTOTUNE)
.cache() # Cache after expensive operations
.shuffle(buffer_size)
.map(cheap_augmentation, num_parallel_calls=tf.data.AUTOTUNE)
.batch(batch_size)
.prefetch(tf.data.AUTOTUNE)
)
undefinedPrefetching
预取
python
undefinedpython
undefinedAutomatic prefetching
Automatic prefetching
dataset = dataset.prefetch(tf.data.AUTOTUNE)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
Manual prefetch buffer size
Manual prefetch buffer size
dataset = dataset.prefetch(buffer_size=2)
dataset = dataset.prefetch(buffer_size=2)
Complete optimized pipeline
Complete optimized pipeline
optimized_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(10000)
.batch(64)
.prefetch(tf.data.AUTOTUNE)
)
undefinedoptimized_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(10000)
.batch(64)
.prefetch(tf.data.AUTOTUNE)
)
undefinedParallel Data Loading
并行数据加载
python
undefinedpython
undefinedUse num_parallel_calls for CPU-bound operations
Use num_parallel_calls for CPU-bound operations
dataset = dataset.map(
preprocessing_function,
num_parallel_calls=tf.data.AUTOTUNE
)
dataset = dataset.map(
preprocessing_function,
num_parallel_calls=tf.data.AUTOTUNE
)
Interleave for parallel file reading
Interleave for parallel file reading
def make_dataset_from_file(filename):
return tf.data.TextLineDataset(filename)
filenames = tf.data.Dataset.list_files('/path/to/data/*.csv')
dataset = filenames.interleave(
make_dataset_from_file,
cycle_length=4,
num_parallel_calls=tf.data.AUTOTUNE
)
undefineddef make_dataset_from_file(filename):
return tf.data.TextLineDataset(filename)
filenames = tf.data.Dataset.list_files('/path/to/data/*.csv')
dataset = filenames.interleave(
make_dataset_from_file,
cycle_length=4,
num_parallel_calls=tf.data.AUTOTUNE
)
undefinedMemory Management
内存管理
python
undefinedpython
undefinedUse take() and skip() for train/val split without loading all data
Use take() and skip() for train/val split without loading all data
total_size = 10000
train_size = int(0.8 * total_size)
full_dataset = tf.data.Dataset.from_tensor_slices((x, y))
train_dataset = (
full_dataset
.take(train_size)
.shuffle(1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
val_dataset = (
full_dataset
.skip(train_size)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefinedtotal_size = 10000
train_size = int(0.8 * total_size)
full_dataset = tf.data.Dataset.from_tensor_slices((x, y))
train_dataset = (
full_dataset
.take(train_size)
.shuffle(1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
val_dataset = (
full_dataset
.skip(train_size)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefinedAdvanced Patterns
高级模式
Iterating with For Loops
使用For循环迭代
python
undefinedpython
undefinedBasic iteration
Basic iteration
for i in tf.data.Dataset.range(3):
tf.print('iteration:', i)
for i in tf.data.Dataset.range(3):
tf.print('iteration:', i)
With dataset iterator
With dataset iterator
for i in iter(tf.data.Dataset.range(3)):
tf.print('iteration:', i)
undefinedfor i in iter(tf.data.Dataset.range(3)):
tf.print('iteration:', i)
undefinedDistributed Dataset
分布式数据集
python
undefinedpython
undefinedDistribute dataset across devices
Distribute dataset across devices
for i in tf.distribute.OneDeviceStrategy('cpu').experimental_distribute_dataset(
tf.data.Dataset.range(3)):
tf.print('iteration:', i)
for i in tf.distribute.OneDeviceStrategy('cpu').experimental_distribute_dataset(
tf.data.Dataset.range(3)):
tf.print('iteration:', i)
Multi-GPU distribution
Multi-GPU distribution
strategy = tf.distribute.MirroredStrategy()
distributed_dataset = strategy.experimental_distribute_dataset(dataset)
undefinedstrategy = tf.distribute.MirroredStrategy()
distributed_dataset = strategy.experimental_distribute_dataset(dataset)
undefinedTraining Loop Integration
训练循环集成
python
undefinedpython
undefinedExecute training loop over dataset
Execute training loop over dataset
for images, labels in train_ds:
if optimizer.iterations > TRAIN_STEPS:
break
train_step(images, labels)
undefinedfor images, labels in train_ds:
if optimizer.iterations > TRAIN_STEPS:
break
train_step(images, labels)
undefinedVectorized Operations
向量化操作
python
def f(args):
embeddings, index = args
# embeddings [vocab_size, embedding_dim]
# index []
# desired result: [embedding_dim]
return tf.gather(params=embeddings, indices=index)
@tf.function
def f_auto_vectorized(embeddings, indices):
# embeddings [num_heads, vocab_size, embedding_dim]
# indices [num_heads]
# desired result: [num_heads, embedding_dim]
return tf.vectorized_map(f, [embeddings, indices])
concrete_vectorized = f_auto_vectorized.get_concrete_function(
tf.TensorSpec(shape=[None, 100, 16], dtype=tf.float32),
tf.TensorSpec(shape=[None], dtype=tf.int32))python
def f(args):
embeddings, index = args
# embeddings [vocab_size, embedding_dim]
# index []
# desired result: [embedding_dim]
return tf.gather(params=embeddings, indices=index)
@tf.function
def f_auto_vectorized(embeddings, indices):
# embeddings [num_heads, vocab_size, embedding_dim]
# indices [num_heads]
# desired result: [num_heads, embedding_dim]
return tf.vectorized_map(f, [embeddings, indices])
concrete_vectorized = f_auto_vectorized.get_concrete_function(
tf.TensorSpec(shape=[None, 100, 16], dtype=tf.float32),
tf.TensorSpec(shape=[None], dtype=tf.int32))Model Integration
模型集成
Training with tf.data
使用tf.data进行训练
python
undefinedpython
undefinedUse dataset with model
Use dataset with model
model = tf.keras.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
model.fit(train_dataset, epochs=1)
undefinedmodel = tf.keras.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
model.fit(train_dataset, epochs=1)
undefinedValidation Dataset
验证数据集
python
undefinedpython
undefinedCreate separate train and validation datasets
Create separate train and validation datasets
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.shuffle(10000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
val_dataset = (
tf.data.Dataset.from_tensor_slices((x_val, y_val))
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.shuffle(10000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
val_dataset = (
tf.data.Dataset.from_tensor_slices((x_val, y_val))
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
Train with validation
Train with validation
history = model.fit(
train_dataset,
validation_data=val_dataset,
epochs=10
)
undefinedhistory = model.fit(
train_dataset,
validation_data=val_dataset,
epochs=10
)
undefinedCustom Training Loop
自定义训练循环
python
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return losspython
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return lossTraining loop with dataset
Training loop with dataset
for epoch in range(epochs):
for images, labels in train_dataset:
loss = train_step(images, labels)
print(f'Epoch {epoch}, Loss: {loss.numpy():.4f}')
undefinedfor epoch in range(epochs):
for images, labels in train_dataset:
loss = train_step(images, labels)
print(f'Epoch {epoch}, Loss: {loss.numpy():.4f}')
undefinedFile-Based Datasets
基于文件的数据集
TFRecord Files
TFRecord文件
python
undefinedpython
undefinedReading TFRecord files
Reading TFRecord files
def parse_tfrecord(example_proto):
feature_description = {
'image': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64),
}
parsed = tf.io.parse_single_example(example_proto, feature_description)
image = tf.io.decode_raw(parsed['image'], tf.float32)
image = tf.reshape(image, [28, 28, 1])
label = parsed['label']
return image, label
def parse_tfrecord(example_proto):
feature_description = {
'image': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64),
}
parsed = tf.io.parse_single_example(example_proto, feature_description)
image = tf.io.decode_raw(parsed['image'], tf.float32)
image = tf.reshape(image, [28, 28, 1])
label = parsed['label']
return image, label
Load TFRecord dataset
Load TFRecord dataset
tfrecord_dataset = (
tf.data.TFRecordDataset(['data_shard_1.tfrecord', 'data_shard_2.tfrecord'])
.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
.shuffle(10000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefinedtfrecord_dataset = (
tf.data.TFRecordDataset(['data_shard_1.tfrecord', 'data_shard_2.tfrecord'])
.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
.shuffle(10000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefinedCSV Files
CSV文件
python
undefinedpython
undefinedLoad CSV dataset
Load CSV dataset
def parse_csv(line):
columns = tf.io.decode_csv(line, record_defaults=[0.0] * 785)
label = tf.cast(columns[0], tf.int32)
features = tf.stack(columns[1:])
features = tf.reshape(features, [28, 28, 1])
return features, label
csv_dataset = (
tf.data.TextLineDataset(['data.csv'])
.skip(1) # Skip header
.map(parse_csv, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefineddef parse_csv(line):
columns = tf.io.decode_csv(line, record_defaults=[0.0] * 785)
label = tf.cast(columns[0], tf.int32)
features = tf.stack(columns[1:])
features = tf.reshape(features, [28, 28, 1])
return features, label
csv_dataset = (
tf.data.TextLineDataset(['data.csv'])
.skip(1) # Skip header
.map(parse_csv, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefinedImage Files
图像文件
python
def load_and_preprocess_image(path, label):
"""Load image from file and preprocess."""
image = tf.io.read_file(path)
image = tf.image.decode_jpeg(image, channels=3)
image = tf.image.resize(image, [224, 224])
image = tf.cast(image, tf.float32) / 255.0
return image, labelpython
def load_and_preprocess_image(path, label):
"""Load image from file and preprocess."""
image = tf.io.read_file(path)
image = tf.image.decode_jpeg(image, channels=3)
image = tf.image.resize(image, [224, 224])
image = tf.cast(image, tf.float32) / 255.0
return image, labelCreate dataset from image paths
Create dataset from image paths
image_paths = ['/path/to/image1.jpg', '/path/to/image2.jpg', ...]
labels = [0, 1, ...]
image_dataset = (
tf.data.Dataset.from_tensor_slices((image_paths, labels))
.map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefinedimage_paths = ['/path/to/image1.jpg', '/path/to/image2.jpg', ...]
labels = [0, 1, ...]
image_dataset = (
tf.data.Dataset.from_tensor_slices((image_paths, labels))
.map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
undefinedData Validation
数据验证
DataLoader Generation
DataLoader生成
python
undefinedpython
undefinedGenerate TensorFlow dataset with batching
Generate TensorFlow dataset with batching
def gen_dataset(
batch_size=1,
is_training=False,
shuffle=False,
input_pipeline_context=None,
preprocess=None,
drop_remainder=True,
total_steps=None
):
"""Generate dataset with specified configuration."""
dataset = tf.data.Dataset.from_tensor_slices((features, labels))
if shuffle:
dataset = dataset.shuffle(buffer_size=10000)
if preprocess:
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.batch(batch_size, drop_remainder=drop_remainder)
if is_training:
dataset = dataset.repeat()
dataset = dataset.prefetch(tf.data.AUTOTUNE)
if total_steps:
dataset = dataset.take(total_steps)
return datasetundefineddef gen_dataset(
batch_size=1,
is_training=False,
shuffle=False,
input_pipeline_context=None,
preprocess=None,
drop_remainder=True,
total_steps=None
):
"""Generate dataset with specified configuration."""
dataset = tf.data.Dataset.from_tensor_slices((features, labels))
if shuffle:
dataset = dataset.shuffle(buffer_size=10000)
if preprocess:
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.batch(batch_size, drop_remainder=drop_remainder)
if is_training:
dataset = dataset.repeat()
dataset = dataset.prefetch(tf.data.AUTOTUNE)
if total_steps:
dataset = dataset.take(total_steps)
return datasetundefinedWhen to Use This Skill
何时使用本技能
Use the tensorflow-data-pipelines skill when you need to:
- Load and preprocess large datasets that don't fit in memory
- Implement data augmentation for training robustness
- Optimize data loading to prevent GPU/TPU idle time
- Create custom data generators for specialized formats
- Build multi-modal pipelines with images, text, and audio
- Implement efficient batching strategies for variable-length sequences
- Cache preprocessed data to speed up training
- Handle distributed training across multiple devices
- Parse TFRecord, CSV, or other file formats
- Implement stratified sampling for imbalanced datasets
- Create reproducible data shuffling
- Build real-time data augmentation pipelines
- Optimize memory usage with streaming datasets
- Implement prefetching for pipeline parallelism
- Create validation and test splits efficiently
当你需要以下操作时,使用tensorflow-data-pipelines技能:
- 加载和预处理无法放入内存的大型数据集
- 实现数据增强以提升训练鲁棒性
- 优化数据加载以避免GPU/TPU空闲
- 为特殊格式创建自定义数据生成器
- 构建包含图像、文本和音频的多模态管道
- 为变长序列实现高效的批处理策略
- 缓存预处理数据以加速训练
- 处理跨多设备的分布式训练
- 解析TFRecord、CSV或其他文件格式
- 为不平衡数据集实现分层采样
- 创建可复现的数据打乱机制
- 构建实时数据增强管道
- 使用流式数据集优化内存使用
- 实现预取以进行管道并行
- 高效创建验证集和测试集拆分
Best Practices
最佳实践
- Always use prefetch() - Add .prefetch(tf.data.AUTOTUNE) at the end of pipeline to overlap data loading with training
- Use num_parallel_calls=AUTOTUNE - Let TensorFlow automatically tune parallelism for map operations
- Cache after expensive operations - Place .cache() after preprocessing but before augmentation and shuffling
- Shuffle before batching - Call .shuffle() before .batch() to ensure random batches
- Use appropriate buffer sizes - Shuffle buffer should be >= dataset size for perfect shuffling, or at least several thousand
- Normalize data in pipeline - Apply normalization in map() function for consistency across train/val/test
- Batch after transformations - Apply .batch() after all element-wise transformations for efficiency
- Use drop_remainder for training - Set drop_remainder=True in batch() to ensure consistent batch sizes
- Leverage AUTOTUNE - Use tf.data.AUTOTUNE for automatic performance tuning instead of manual values
- Apply augmentation after caching - Cache deterministic preprocessing, apply random augmentation after
- Use interleave for file reading - Parallel file reading with interleave() for large multi-file datasets
- Repeat for infinite datasets - Use .repeat() for training datasets to avoid dataset exhaustion
- Use take/skip for splits - Efficiently split datasets without loading all data into memory
- Monitor pipeline performance - Use TensorFlow Profiler to identify bottlenecks in data pipeline
- Shard data for distribution - Use shard() for distributed training across multiple workers
- 始终使用prefetch() - 在管道末尾添加.prefetch(tf.data.AUTOTUNE),使数据加载与训练重叠
- 使用num_parallel_calls=AUTOTUNE - 让TensorFlow自动调整map操作的并行度
- 在昂贵操作后缓存 - 将.cache()放在预处理之后、增强和打乱之前
- 先打乱再批处理 - 在.batch()之前调用.shuffle()以确保批次的随机性
- 使用合适的缓冲区大小 - 打乱缓冲区应大于等于数据集大小以实现完全打乱,或至少设置为几千
- 在管道中归一化数据 - 在map()函数中应用归一化,确保训练/验证/测试集的一致性
- 转换后再批处理 - 在所有元素级转换后应用.batch()以提升效率
- 训练时使用drop_remainder - 在batch()中设置drop_remainder=True以确保批次大小一致
- 利用AUTOTUNE - 使用tf.data.AUTOTUNE进行自动性能调优,而非手动设置数值
- 缓存后应用增强 - 缓存确定性预处理,在缓存后应用随机增强
- 使用interleave读取文件 - 对大型多文件数据集使用interleave()进行并行文件读取
- 对无限数据集使用repeat - 对训练数据集使用.repeat()以避免数据集耗尽
- 使用take/skip进行拆分 - 无需将所有数据加载到内存即可高效拆分数据集
- 监控管道性能 - 使用TensorFlow Profiler识别数据管道中的瓶颈
- 为分布式训练分片数据 - 使用shard()在多工作节点间分片数据
Common Pitfalls
常见陷阱
- Shuffling after batching - Shuffles batches instead of individual samples, reducing randomness
- Not using prefetch - GPU sits idle waiting for data, wasting compute resources
- Cache in wrong position - Caching after augmentation prevents randomness, before preprocessing wastes memory
- Buffer size too small - Insufficient shuffle buffer leads to poor randomization and training issues
- Not using num_parallel_calls - Sequential map operations create bottlenecks in data loading
- Loading entire dataset to memory - Use tf.data instead of loading all data with NumPy
- Applying augmentation deterministically - Same augmentations every epoch reduce training effectiveness
- Not setting random seeds - Irreproducible results and debugging difficulties
- Ignoring batch remainder - Variable batch sizes cause errors in models expecting fixed dimensions
- Repeating validation dataset - Validation should not repeat, only training datasets
- Not using AUTOTUNE - Manual tuning is difficult and suboptimal compared to automatic optimization
- Caching very large datasets - Exceeds memory limits and causes OOM errors
- Too many parallel operations - Excessive parallelism causes thread contention and slowdown
- Not monitoring data loading time - Data pipeline can become training bottleneck without monitoring
- Applying normalization inconsistently - Different normalization for train/val/test causes poor performance
- 批处理后再打乱 - 会打乱批次而非单个样本,降低随机性
- 不使用prefetch - GPU等待数据时处于空闲状态,浪费计算资源
- 缓存位置错误 - 在增强后缓存会失去随机性,在预处理前缓存会浪费内存
- 缓冲区大小过小 - 不足的打乱缓冲区会导致随机化效果差,引发训练问题
- 不使用num_parallel_calls - 顺序执行map操作会造成数据加载瓶颈
- 将整个数据集加载到内存 - 使用tf.data而非NumPy加载所有数据
- 确定性地应用增强 - 每个 epoch 使用相同的增强会降低训练效果
- 不设置随机种子 - 导致结果不可复现,增加调试难度
- 忽略批次剩余数据 - 可变批次大小会导致模型因期望固定维度而出错
- 重复验证数据集 - 验证集不应重复,仅训练数据集需要重复
- 不使用AUTOTUNE - 手动调优难度大,且效果不如自动优化
- 缓存超大型数据集 - 超出内存限制,引发OOM错误
- 并行操作过多 - 过度并行会导致线程竞争,降低速度
- 不监控数据加载时间 - 数据管道可能成为训练瓶颈却未被发现
- 归一化应用不一致 - 训练/验证/测试集使用不同的归一化会导致性能下降