前言
列式文件,顾名思义就是按列存储到文件,和行式存储文件对应。保证了一列在一个文件中是连续的。下面从parquet常见术语,核心schema和文件结构来深入理解。最后通过java api完成write和read。
术语
block
parquet层面和row group是一个意思
row group
逻辑概念,用于对row进行分区。由数据集中每个column的column chunk组成。是读写过程中的缓存单元,一般在hdfs上推荐一个block为1GB,一个HDFS文件1个bolock
column chunk
某个column的所有数据被称为column chunk,存在与row group,并保证在文件中是连续的
page
多个column chunk之间用page分开,也就是说一个page只会包含一个column的数据,一个page是一个独立的单元(可以被编码或者压缩)
dictionary page
每个page之前都可以选择是否需要dictionary page。dictionary page记录了该page所有不同的值。这可以增强处理速度提高压缩率。
总结
一个文件由多个row group组成,一个row group包括了多个column chunk,一个column chunck就是某个column的所有数据集, 被分割成多个page,一个page是最小的处理单元,可以被编码或者压缩。
schema
每种文件都有自己特有的规则,像csv文件,是用分隔符分隔开的一个个列。parquet文件也有自己独特的schema格式。
这是一个parquet文件的schema例子,对应的api是MessageType
message person{ required binary name (UTF8); required int age; repeated group family{ required binary father (UTF8); required binary mother (UTF8); optional binary sister (UTF8); }}
message
固定声明,就像结构体中的struct一样。
person
message name,可以粗暴的理解为表名,因为里面都是field。
optional,required,repeated
这是三种field的关键字,分别表示可选,必选,可重复选
可选和必选类似数据库中的nullable,可重复选是为了支持复杂的嵌套结构。
field类型
目前parquet支持int32,int64,int96(有些系统会把时间戳存成int96如老版本hive),float,double,boolean,binary,fixed_len_byte_array。
参考类org.apache.parquet.schema. PrimitiveType.PrimitiveTypeName
UTF8
field的原始类型(Original Type),可以辅助field的type进行细粒度的类型判断。
参考类 org.apache.parquet.schema. OriginalType
group
嵌套结构声明,类似json对象
schema&数据
schema有了,那如何把schema和数据关联起来,也就是说可以通过schema构建或者解析出相应的数据。那就引出了嵌套关系,definition level和repetitional level。用于定位数据到底出现在嵌套中(如果有嵌套的话)的哪一层。值得注意的是,嵌套关系是针对列而言的,不同列有各自的嵌套关系。
definition level
optional字段定位,如果实际没有数据就为0,有数据就为1。涉及到嵌套optional,那么可以这么理解,如果从某一层开始没有该数据,那么该层之前肯定是有数据的,该层之后肯定没有数据。举个简单的例子
message ExampleDefinitionLevel { optional group a { optional group b { optional string c; } }}
这个schema对应的definition level所有的可能性如表所示
repetition level
repeated字段定位,如果在嵌套中某一层出现了值,那么就记录该层。那一个例子来说:
message AddressBook { required string owner; repeated string ownerPhoneNumbers; repeated group contacts { required string name; optional string phoneNumber; }}
针对不同的列,defnition level和repetition level的最大值如表
文件结构
结构图
详细
一个parquet文件由3部分组成,header,blocks,footer。类似一般文档中的页眉,正文,页脚。
header
只包含4个字节的魔数,PAR1
blocks
block定义参考“术语”
footer
记录了该parquet文件正文所有metadata,
文件物理格式
通过 cat -v 查看一个parquet,会看到很多的non-printable字符,比如:^U^@^U^P^U^P,^U^B^U^@^
这些字符其实是可以和ascii互相映射,比如^@就是ascii中的0,详细可以看这篇文档
其实就是八进制的ascii,小于100的+100,大于100的减100。
所有的列,包括嵌套结构,例如test.c1和test.c2属于两个列,都是连续存储在parquet文件中。
参考资料
// twitter对parquet的概述
// parquet的github
// 很详细的parquet文件解析
coding
public static MessageType getMessageTypeFromCode(){ MessageType messageType = Types.buildMessage() .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("id") .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name") .required(PrimitiveType.PrimitiveTypeName.INT32).named("age") .requiredGroup() .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("test1") .required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("test2") .named("group1") .named("trigger"); return messageType;}public static void writeParquet(String name){ // 1. 声明parquet的messageType MessageType messageType = getMessageTypeFromCode(); System.out.println(messageType.toString()); // 2. 声明parquetWriter Path path = new Path("/tmp/etl/"+ name); Configuration configuration = new Configuration(); GroupWriteSupport.setSchema(messageType, configuration); GroupWriteSupport writeSupport = new GroupWriteSupport(); // 3. 写数据 ParquetWriterwriter = null; try { writer = new ParquetWriter (path, ParquetFileWriter.Mode.CREATE, writeSupport, CompressionCodecName.UNCOMPRESSED, 128*1024*1024, 5*1024*1024, 5*1024*1024, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION, configuration); Random random = new Random(); for(int i=0; i<10; i++){ // 4. 构建parquet数据,封装成group Group group = new SimpleGroupFactory(messageType).newGroup(); group.append("name", i+"@qq.com") .append("id",i+"@id") .append("age",i) .addGroup("group1") .append("test1", "test1"+i) .append("test2","test2"+i); writer.write(group); } } catch (IOException e) { e.printStackTrace(); } finally { if(writer != null){ try { writer.close(); } catch (IOException e) { e.printStackTrace(); } } }}public static void readParquet(String name){ // 1. 声明readSupport GroupReadSupport groupReadSupport = new GroupReadSupport(); Path path = new Path("/tmp/etl/"+name); // 2.通过parquetReader读文件 ParquetReader reader = null; try { reader = ParquetReader.builder(groupReadSupport, path).build(); Group group = null; while ((group = reader.read()) != null){ System.out.println(group); } } catch (IOException e) { e.printStackTrace(); } finally { if(reader != null){ try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } }}