如何高效使用TFrecords进行数据分片操作?

2026-05-22 10:511阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1132个文字,预计阅读时间需要5分钟。

如何高效使用TFrecords进行数据分片操作?

快速开始:多进程分片写入tfrecord,读取def feature_transform(file):

python快速启动:多进程分片写入tfrecord,并读取特征转换函数

如何高效使用TFrecords进行数据分片操作?

quick start
  • 多进程分片写入 tfrecord
  • 读取

def feature_transform(file): …… # 写入 tfrecord def serialize_example(sha256, data, label): """ Creates a tf.Example message ready to be written to a file :param data: [float,float] :param label: int :return: """ feature = { "sha256": tf.train.Feature(bytes_list=tf.train.BytesList(value=[sha256.encode('UTF-8')])), 'feature': tf.train.Feature(float_list=tf.train.FloatList(value=data)), 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[label])) } example = tf.train.Example(features=tf.train.Features(feature=feature)) return example.SerializeToString() def write_to_tfrecords(filepath,labels_filepath,tfrecords_filepath): tfwriter = tf.io.TFRecordWriter(tfrecords_filepath) for file,label in tqdm(zip(filepath,labels_filepath)): # serialize example sha256 = file.split("/")[-1] data = feature_transform(file) example = serialize_example(sha256, data, label) # write tfwriter.write(example) from multiprocessing import Process def write_shard_tfrecords(all_file_paths, all_file_labels, tfrecord_dir): n_shards = int(0.8*os.cpu_count())+1 all_file_paths,all_file_labels = np.array(all_file_paths),np.array(all_file_labels) for i in range(n_shards): shard_indexs = np.arange(len(all_file_paths))[i::n_shards] shard_file_paths,shard_file_labels = all_file_paths[shard_indexs], all_file_labels[shard_indexs] p = Process(target=write_to_tfrecords, args=(shard_file_paths,shard_file_labels,os.path.join(tfrecord_dir,"shard_"+str(i)))) p.start() p.join() # 父进程等待子进程结束 # 读取 tfrecord def _parse_tfrecord_function(example): example_fmt = { "sha256": tf.io.FixedLenFeature([], tf.string), 'feature': tf.io.FixedLenFeature([], tf.float32), 'label': tf.io.FixedLenFeature([], tf.int64) } parsed = tf.io.parse_single_example(example, example_fmt) return parsed["feature"], parsed["label"] def make_dataset(files, SHUFFLE_BUFFER_SIZE=1024, BATCH_SIZE=32, EPOCHS=5): shards = tf.data.Dataset.from_tensor_slices(files) dataset = shards.interleave(tf.data.TFRecordDataset) dataset = dataset.shuffle(SHUFFLE_BUFFER_SIZE) dataset = dataset.repeat(EPOCHS) dataset = dataset.map(lambda x: _parse_tfrecord_function(x), num_parallel_calls=tf.data.AUTOTUNE) dataset = dataset.batch(batch_size=BATCH_SIZE) return dataset def split_train_val(tfrecord_dir, BATCH_SIZE, EPOCHS): tfrecords_pattern_path = os.path.join(tfrecord_dir,"shard_*") files = tf.io.matching_files(tfrecords_pattern_path) files = tf.random.shuffle(files) train_ds = make_dataset(files[:int(len(files)*0.9)], SHUFFLE_BUFFER_SIZE=1024, BATCH_SIZE=32, EPOCHS=5) val_ds = make_dataset(files[int(len(files)*0.9):], SHUFFLE_BUFFER_SIZE=1024, BATCH_SIZE=32, EPOCHS=5) return train_ds,val_ds TF record 相关概念

# dataset.tfrecords [ { # example 1 (tf.train.Example) 'feature_1': tf.train.Feature, ... 'feature_k': tf.train.Feature }, ... { # example N (tf.train.Example) 'feature_1': tf.train.Feature, ... 'feature_k': tf.train.Feature } ]

为了将形式各样的数据集整理为 TFRecord 格式,我们可以对数据集中的每个元素进行以下步骤:

  • 读取该数据元素到内存;
  • 将该元素转换为 tf.train.Example 对象(每一个 tf.train.Example 由若干个 tf.train.Feature 的字典组成,因此需要先建立 Feature 的字典);
  • 将该 tf.train.Example 对象序列化为字符串,并通过一个预先定义的 tf.io.TFRecordWriter 写入 TFRecord 文件。

而读取 TFRecord 数据则可按照以下步骤:

  • 通过 tf.data.TFRecordDataset 读入原始的 TFRecord 文件(此时文件中的 tf.train.Example 对象尚未被反序列化),获得一个 tf.data.Dataset 数据集对象;
  • 通过 Dataset.map 方法,对该数据集对象中的每一个序列化的 tf.train.Example 字符串执行 tf.io.parse_single_example 函数,从而实现反序列化。

with tf.io.TFRecordWriter(tfrecord_file) as writer: for filename, label in zip(train_filenames, train_labels): image = open(filename, 'rb').read() # 读取数据集图片到内存,image 为一个 Byte 类型的字符串 feature = { # 建立 tf.train.Feature 字典 'image': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])), # 图片是一个 Bytes 对象 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[label])) # 标签是一个 Int 对象 } example = tf.train.Example(features=tf.train.Features(feature=feature)) # 通过字典建立 Example writer.write(example.SerializeToString()) # 将Example序列化并写入 TFRecord 文件

值得注意的是, tf.train.Feature 支持三种数据格式:

  • tf.train.BytesList :字符串或原始 Byte 文件(如图片),通过 bytes_list 参数传入一个由字符串数组初始化的 tf.train.BytesList 对象;
  • tf.train.FloatList :浮点数,通过 float_list 参数传入一个由浮点数数组初始化的 tf.train.FloatList 对象;
  • tf.train.Int64List :整数,通过 int64_list 参数传入一个由整数数组初始化的 tf.train.Int64List 对象。

"sha256": tf.train.Feature(bytes_list=tf.train.BytesList(value=[sha256.encode('UTF-8')])), 'feature': tf.train.Feature(float_list=tf.train.FloatList(value=data)), 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))

raw_dataset = tf.data.TFRecordDataset(tfrecord_file) # 读取 TFRecord 文件 feature_description = { # 定义Feature结构,告诉解码器每个Feature的类型是什么 'image': tf.io.FixedLenFeature([], tf.string), 'label': tf.io.FixedLenFeature([], tf.int64), } def _parse_example(example_string): # 将 TFRecord 文件中的每一个序列化的 tf.train.Example 解码 feature_dict = tf.io.parse_single_example(example_string, feature_description) feature_dict['image'] = tf.io.decode_jpeg(feature_dict['image']) # 解码JPEG图片 return feature_dict['image'], feature_dict['label'] dataset = raw_dataset.map(_parse_example)

tf.wiki/zh_hans/basic/tools.html#tfrecord-tensorflow

tensorflow.google.cn/tutorials/load_data/tfrecord?hl=zh-cn#python_中的_tfrecord_文件

medium.com/@rodrigobrechard/tfrecords-how-to-use-sharding-94059e2b2c6b

本文共计1132个文字,预计阅读时间需要5分钟。

如何高效使用TFrecords进行数据分片操作?

快速开始:多进程分片写入tfrecord,读取def feature_transform(file):

python快速启动:多进程分片写入tfrecord,并读取特征转换函数

如何高效使用TFrecords进行数据分片操作?

quick start
  • 多进程分片写入 tfrecord
  • 读取

def feature_transform(file): …… # 写入 tfrecord def serialize_example(sha256, data, label): """ Creates a tf.Example message ready to be written to a file :param data: [float,float] :param label: int :return: """ feature = { "sha256": tf.train.Feature(bytes_list=tf.train.BytesList(value=[sha256.encode('UTF-8')])), 'feature': tf.train.Feature(float_list=tf.train.FloatList(value=data)), 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[label])) } example = tf.train.Example(features=tf.train.Features(feature=feature)) return example.SerializeToString() def write_to_tfrecords(filepath,labels_filepath,tfrecords_filepath): tfwriter = tf.io.TFRecordWriter(tfrecords_filepath) for file,label in tqdm(zip(filepath,labels_filepath)): # serialize example sha256 = file.split("/")[-1] data = feature_transform(file) example = serialize_example(sha256, data, label) # write tfwriter.write(example) from multiprocessing import Process def write_shard_tfrecords(all_file_paths, all_file_labels, tfrecord_dir): n_shards = int(0.8*os.cpu_count())+1 all_file_paths,all_file_labels = np.array(all_file_paths),np.array(all_file_labels) for i in range(n_shards): shard_indexs = np.arange(len(all_file_paths))[i::n_shards] shard_file_paths,shard_file_labels = all_file_paths[shard_indexs], all_file_labels[shard_indexs] p = Process(target=write_to_tfrecords, args=(shard_file_paths,shard_file_labels,os.path.join(tfrecord_dir,"shard_"+str(i)))) p.start() p.join() # 父进程等待子进程结束 # 读取 tfrecord def _parse_tfrecord_function(example): example_fmt = { "sha256": tf.io.FixedLenFeature([], tf.string), 'feature': tf.io.FixedLenFeature([], tf.float32), 'label': tf.io.FixedLenFeature([], tf.int64) } parsed = tf.io.parse_single_example(example, example_fmt) return parsed["feature"], parsed["label"] def make_dataset(files, SHUFFLE_BUFFER_SIZE=1024, BATCH_SIZE=32, EPOCHS=5): shards = tf.data.Dataset.from_tensor_slices(files) dataset = shards.interleave(tf.data.TFRecordDataset) dataset = dataset.shuffle(SHUFFLE_BUFFER_SIZE) dataset = dataset.repeat(EPOCHS) dataset = dataset.map(lambda x: _parse_tfrecord_function(x), num_parallel_calls=tf.data.AUTOTUNE) dataset = dataset.batch(batch_size=BATCH_SIZE) return dataset def split_train_val(tfrecord_dir, BATCH_SIZE, EPOCHS): tfrecords_pattern_path = os.path.join(tfrecord_dir,"shard_*") files = tf.io.matching_files(tfrecords_pattern_path) files = tf.random.shuffle(files) train_ds = make_dataset(files[:int(len(files)*0.9)], SHUFFLE_BUFFER_SIZE=1024, BATCH_SIZE=32, EPOCHS=5) val_ds = make_dataset(files[int(len(files)*0.9):], SHUFFLE_BUFFER_SIZE=1024, BATCH_SIZE=32, EPOCHS=5) return train_ds,val_ds TF record 相关概念

# dataset.tfrecords [ { # example 1 (tf.train.Example) 'feature_1': tf.train.Feature, ... 'feature_k': tf.train.Feature }, ... { # example N (tf.train.Example) 'feature_1': tf.train.Feature, ... 'feature_k': tf.train.Feature } ]

为了将形式各样的数据集整理为 TFRecord 格式,我们可以对数据集中的每个元素进行以下步骤:

  • 读取该数据元素到内存;
  • 将该元素转换为 tf.train.Example 对象(每一个 tf.train.Example 由若干个 tf.train.Feature 的字典组成,因此需要先建立 Feature 的字典);
  • 将该 tf.train.Example 对象序列化为字符串,并通过一个预先定义的 tf.io.TFRecordWriter 写入 TFRecord 文件。

而读取 TFRecord 数据则可按照以下步骤:

  • 通过 tf.data.TFRecordDataset 读入原始的 TFRecord 文件(此时文件中的 tf.train.Example 对象尚未被反序列化),获得一个 tf.data.Dataset 数据集对象;
  • 通过 Dataset.map 方法,对该数据集对象中的每一个序列化的 tf.train.Example 字符串执行 tf.io.parse_single_example 函数,从而实现反序列化。

with tf.io.TFRecordWriter(tfrecord_file) as writer: for filename, label in zip(train_filenames, train_labels): image = open(filename, 'rb').read() # 读取数据集图片到内存,image 为一个 Byte 类型的字符串 feature = { # 建立 tf.train.Feature 字典 'image': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])), # 图片是一个 Bytes 对象 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[label])) # 标签是一个 Int 对象 } example = tf.train.Example(features=tf.train.Features(feature=feature)) # 通过字典建立 Example writer.write(example.SerializeToString()) # 将Example序列化并写入 TFRecord 文件

值得注意的是, tf.train.Feature 支持三种数据格式:

  • tf.train.BytesList :字符串或原始 Byte 文件(如图片),通过 bytes_list 参数传入一个由字符串数组初始化的 tf.train.BytesList 对象;
  • tf.train.FloatList :浮点数,通过 float_list 参数传入一个由浮点数数组初始化的 tf.train.FloatList 对象;
  • tf.train.Int64List :整数,通过 int64_list 参数传入一个由整数数组初始化的 tf.train.Int64List 对象。

"sha256": tf.train.Feature(bytes_list=tf.train.BytesList(value=[sha256.encode('UTF-8')])), 'feature': tf.train.Feature(float_list=tf.train.FloatList(value=data)), 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))

raw_dataset = tf.data.TFRecordDataset(tfrecord_file) # 读取 TFRecord 文件 feature_description = { # 定义Feature结构,告诉解码器每个Feature的类型是什么 'image': tf.io.FixedLenFeature([], tf.string), 'label': tf.io.FixedLenFeature([], tf.int64), } def _parse_example(example_string): # 将 TFRecord 文件中的每一个序列化的 tf.train.Example 解码 feature_dict = tf.io.parse_single_example(example_string, feature_description) feature_dict['image'] = tf.io.decode_jpeg(feature_dict['image']) # 解码JPEG图片 return feature_dict['image'], feature_dict['label'] dataset = raw_dataset.map(_parse_example)

tf.wiki/zh_hans/basic/tools.html#tfrecord-tensorflow

tensorflow.google.cn/tutorials/load_data/tfrecord?hl=zh-cn#python_中的_tfrecord_文件

medium.com/@rodrigobrechard/tfrecords-how-to-use-sharding-94059e2b2c6b