0%

pb和parquet及TFRecord格式介绍与对比

大规模数据分析、异构数据源场景、数据、模型等IO要求高场景需要 parquet 数据格式,同样 pb 等序列化结构数据,在数据传输、存储中占用资源较少,因此 PB 文件也广泛的应用于算法之中。

pb(Protocol Buffers)

序列化结构化数据(二进制)适用于数据量大、数据传输效率高的场景

  • 序列化后,数据流小,占用网络单款少
  • 序列化和反序列化的性能,占用CPU资源少
  • 数据本身不可读,需要反序列化后可读数据
  • 解析速度快(比 XML、JSON 快20-100倍)
  • 适配算法场景

定义pb文件规范

  • proto文件
    创建proto文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    syntax = "proto3"; # proto3语法
    # 用户服务请求
    message SearchRequest {
    string query = 1;
    int32 page_number = 2;
    int32 result_per_page = 3;
    }
    # 服务请求返回
    message MyRequest {
    string name = 1;
    int32 age = 2;
    }
  • 文件中定义message类型的数据格式
    message类型数据格式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 一级视图,可作为业务
    message MyRequest {
    string name = 1;
    int32 gender = 2;
    optional bytes predict_gender = 3;  # 算法场景bytes
    }

    # 二级视图,可作为特征
    message gender {
    string male = 1;
    string female = 2;
    }

数据流-Data2protobuf Function

  • pb 类名(pb 所在完整类名,存储于自定义资产中)
  • 数据类型支持
    • byteArray(byte数组转pb)
    • JSON(json字符串转pb)

动态PB

hive/topic 增加字段,spark/flink 调用 SDK 的入参增加新字段 <dataId, value>, SDK 读取 DUCC 配置感知新增字段,SDK 判断入参增加字段,进行数据更新时,动态增加对字段的序列化/反序列化处理逻辑。

  • 键-值 序列化

Parquet

parquet 采用类似 PB 协议来描述存储结果schema。列存储、大数据存储格式、Hadoop 生态圈

  • 适用于 OLAP 场景
  • 适用于 模型训练 场景
  • 优势:更高压缩比、更小 IO 操作,适配多语言和组件
  • 行组 + 列块 + 页

Parquet 通常和 挂盘功能一并使用,挂载(mounting)是指由操作系统使一个存储设备(诸如硬盘、CD-ROM或共享资源)上的计算机文件和目录可供用户通过计算机的文件系统访问的一个过程。 一般来说,当计算机关机时,每个已挂载存储都将经历一次卸载,以确保所有排队的数据被写入,并保证介质上文件系统结构的完整性。 访问这个目录来访问存储设备。

1
2
3
4
5
6
7
8
message AddressBook {
required string owner;
repeated string ownerPhoneNumbers;
repeated group contacts {
required string name;
optional string phoneNumber;
}
}

TF Record

Tensorflow 自带的数据格式,将数据存储为二进制文件(二进制存储具有占用空间少,拷贝和读取(from disk)更加高效的特点),而且不需要单独的标签文件了,其本质是一行行字节字符串构成的样本数据。

  • 训练方式:1、内存; 2、使用 queue/tf.data
  • 优势
    • 1、支持延迟加载
    • 2、原生tensorflow支持
    • 3、protocol buffer 数据标准
生成 TFRecord文件
1
2
3
4
5
6
7
import tensorflow as tf

writer = tf.python_io.TFRecordWriter("./1.tfrecords")
for i in range(200): # 假设我们在一个文件中保存200个Example
example = tf.train.Example(...)
writer.write(example.SerializeToString())
writer.close()
读取TFRecord文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def read_demo(filepath):
# 定义schema
schema = {
'user_id': tf.FixedLenFeature([], tf.int64),
'city_id': tf.FixedLenFeature([], tf.int64),
'app_type': tf.FixedLenFeature([], tf.int64),
'viewed_pois': tf.VarLenFeature(tf.int64),
'avg_paid': tf.FixedLenFeature([], tf.float32, default_value=0.0),
'comment': tf.FixedLenFeature([], tf.string, default_value=''),
}

# 使用相关api,按照schema解析dataset中的样本
def _parse_function(example_proto):
return tf.parse_single_example(example_proto, schema)

# 读取TFRecord文件来创建dataset
dataset = tf.data.TFRecordDataset(filepath)
#按照schema解析dataset中的每个样本
parsed_dataset = dataset.map(_parse_function)
#创建Iterator并迭代Iterator即可访问dataset中的样本
next = parsed_dataset.make_one_shot_iterator().get_next()

# 这里直接利用session,打印dataset中的样本
with tf.Session() as sess:
while True:
try:
print sess.run(next)
except:
print "out of data"
break

Json 和 XML 和 ORC

Json,数据支持好,支持所有编程语言,但不适用于大数据量的传输场景

XML,用于序列化和封装数据。采用键值对的方式,可读性高,压缩数据空间

ORC,列存

  • ORC 嵌套和损耗较大
  • 支持 update 和 ACID

Avro,基于行的存储格式,被广泛用作序列化过程。

  • 可拆分的
  • 可压缩的
  • 以 Json 定义的 Avro 模式,便于阅读和解析

Welcome to my other publishing channels