首页 > 编程知识 正文

JavaScript程序设计教程,javascript和java

时间:2023-05-05 18:57:00 阅读:120715 作者:4996

【实例介绍】

文件: 590 m.com/f/25127180-489056218-c2ed 13 (访问密码: 551685 ) )。

以下内容无关。

---------分割线---------

几乎实时写入

减少碎片工具的使用

CDC增量导入RDBMS数据

限制小文件的大小和数量

接近实时分析

为每秒存储(Druid、OpenTSDB )节省资源

提供分钟时效性,支持更高效的查询

Hudi作为lib非常轻量

增量pipeline

区分活动时间和事件时间处理延迟数据

时间表间隔缩短端到端延迟(小时-分钟)=增量处理

增量导出

数据将导出到在线服务存储e.g. ES,而不是某些Kafka场景

2 .概念/术语

33559 hudi.Apache.org/docs/concepts.html

2.1 Timeline

Timeline是HUDI用于管理提交(commit )的抽象,每个commit都有一个固定的时间戳,并分布在时间轴上。 在Timeline中,每个commit都抽象为一个HoodieInstant,一个instant记录一次提交的行为、时间戳和状态。

HUDI读写API可以通过Timeline界面方便地通过commits进行条件筛选,对history和on-going commits应用多种策略,快速筛选操作目标commit。

2.2时间

Arrival time:数据到达Hudi的时间,提交时间

事件时间:记录中记录的时间

在上图中,时间(时间)被用作分区字段,并且各种类型的commits从10:00相继发生,即使从10:20到9336000的数据到达,该数据也落入与9336000对应的分区中,并且TTT

2.3文件管理

2.3.1文件版本

新的base commit time支持新的文件片,实际上是新的数据版本。 HUDI通过表文件系统视图抽象来管理与表对应的文件。 例如,在所有最新版本的文件片段中找到基本文件(以copyonwritesnapshot开头)或基本日志文件(以mergeonread结尾)。

通过Timeline和TableFileSystemView抽象,HUDI提供了非常方便高效的表文件搜索。

2.3.3文件格式

Hoodie中的每个文件片段都包含一个基本文件(可能不在合并模式中)、多个日志文件(不在复制模式中)。

每个文件的文件名都具有其所属的FileID,即文件组标识符和基本提交时间,即实例时间。 用文件名的组id组织文件组的逻辑关系用文件名的基本提交时间组织文件片的逻辑关系。

HUDI的basefile(parquet文件)为了在基于文件的索引的实现中实现高效的key contains检测,在footer的元中记录了由record key构成的BloomFilter。 只有不在bloom过滤器的key需要扫描整个文件来消灭假阳。

HUDI的log(avro文件)是自己编码的,通过存储数据,buffer以LogBlock为单位写,各LogBlock中有幻灯片编号、大小、内容、footer等信息

2.4索引

由于hood iekey (记录密钥分区)和文件组(fileid )之间的映射关系在首次将数据写入文件后保持不变,因此一个文件组中包含所有记录索引用于区分消息是插入还是更新。

2.4.1索引创建流程

bloom过滤器索引

添加记录以找到映射关系。 记录密钥=target partition

当前最新数量

据 找到映射关系:partition => (fileID, minRecordKey, maxRecordKey) LIST (如果是 base files 可加速)
新增 records 找到需要搜索的映射关系:fileID => HoodieKey(record key + partition path) LIST,key 是候选的 fileID
通过 HoodieKeyLookupHandle 查找目标文件(通过 BloomFilter 加速)Flink State-based Index
HUDI 在 0.8.0 版本中实现的 Flink witer,采用了 Flink 的 state 作为底层的 index 存储,每个 records 在写入之前都会先计算目标 bucket ID,不同于 BloomFilter Index,避免了每次重复的文件 index 查找。

2.5 Table 类型

2.5.1 Copy On Write
Copy On Write 类型表每次写入都会生成一个新的持有 base file(对应写入的 instant time ) 的 FileSlice。

用户在 snapshot 读取的时候会扫描所有最新的 FileSlice 下的 base file。

2.5.2 Merge On Read
Merge On Read 表的写入行为,依据 index 的不同会有细微的差别:

对于 BloomFilter 这种无法对 log file 生成 index 的索引方案,对于 INSERT 消息仍然会写 base file (parquet format),只有 UPDATE 消息会 append log 文件(因为 base file 总已经记录了该 UPDATE 消息的 FileGroup ID)。
对于可以对 log file 生成 index 的索引方案,例如 Flink writer 中基于 state 的索引,每次写入都是 log format,并且会不断追加和 roll over。
Merge On Read 表的读在 READ OPTIMIZED 模式下,只会读最近的经过 compaction 的 commit。

数据写
3.1 写操作
UPSERT:默认行为,数据先通过 index 打标(INSERT/UPDATE),有一些启发式算法决定消息的组织以优化文件的大小 => CDC 导入
INSERT:跳过 index,写入效率更高 => Log Deduplication
BULK_INSERT:写排序,对大数据量的 Hudi 表初始化友好,对文件大小的限制 best effort(写 HFile)
3.1.1 写流程(UPSERT)Copy On Write
先对 records 按照 record key 去重
首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)
对于 update 消息,会直接找到对应 key 所在的最新 FileSlice 的 base 文件,并做 merge 后写新的 base file (新的 FileSlice)
对于 insert 消息,会扫描当前 partition 的所有 SmallFile(小于一定大小的 base file),然后 merge 写新的 FileSlice;如果没有 SmallFile,直接写新的 FileGroup + FileSliceMerge On Read
先对 records 按照 record key 去重(可选)
首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)
如果是 insert 消息,如果 log file 不可建索引(默认),会尝试 merge 分区内最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果没有 base file 就新写一个 FileGroup + FileSlice + base file;如果 log file 可建索引,尝试 append 小的 log file,如果没有就新写一个 FileGroup + FileSlice + base file
如果是 update 消息,写对应的 file group + file slice,直接 append 最新的 log file(如果碰巧是当前最小的小文件,会 merge base file,生成新的 file slice)log file 大小达到阈值会 roll over 一个新的
3.1.2 写流程(INSERT)Copy On Write
先对 records 按照 record key 去重(可选)
不会创建 Index
如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否则直接写新的 FileSlice + base fileMerge On Read
先对 records 按照 record key 去重(可选)
不会创建 Index
如果 log file 可索引,并且有小的 FileSlice,尝试追加或写最新的 log file;如果 log file 不可索引,写一个新的 FileSlice + base file
3.1.3 工具
DeltaStreamer
Datasource Writer
Flink SQL API
3.1.4 Key 生成策略
用来生成 HoodieKey(record key + partition path),目前支持以下策略:

支持多个字段组合 record keys
支持多个字段组合的 parition path (可定制时间格式,Hive style path name)
非分区表
3.1.5 删除策略
逻辑删:将 value 字段全部标记为 null
物理删:
通过 OPERATION_OPT_KEY 删除所有的输入记录
配置 PAYLOAD_CLASS_OPT_KEY = org.apache.hudi.EmptyHoodieRecordPayload 删除所有的输入记录
在输入记录添加字段:_hoodie_is_deleted
4. 数据读
4.1 Snapshot 读
读取所有 partiiton 下每个 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表读 parquet 文件,Merge On Read 表读 parquet + log 文件

4.2 Incremantal 读
https://hudi.apache.org/docs/querying_data.html#spark-incr-query,当前的 Spark data source 可以指定消费的起始和结束 commit 时间,读取 commit 增量的数据集。但是内部的实现不够高效:拉取每个 commit 的全部目标文件再按照系统字段 hoodie_commit_time apply 过滤条件。

4.3 Streaming 读
0.8.0 版本的 HUDI Flink writer 支持实时的增量订阅,可用于同步 CDC 数据,日常的数据同步 ETL pipeline。Flink 的 streaming 读做到了真正的流式读取,source 定期监控新增的改动文件,将读取任务下派给读 task。

Compaction
没有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 并写 base file
有 base file:走 copy on write upsert 流程,* 先读 log file 建 index,再读 base file,最后读 log file 写新的 base file
Flink 和 Spark streaming 的 writer 都可以 apply 异步的 compaction 策略,按照间隔 commits 数或者时间来触发 compaction 任务,在独立的 pipeline 中执行。

总结
通过对写流程的梳理我们了解到 HUDI 相对于其他数据湖方案的核心优势:

写入过程充分优化了文件存储的小文件问题,Copy On Write 写会一直将一个 bucket (FileGroup)的 base 文件写到设定的阈值大小才会划分新的 bucket;Merge On Read 写在同一个 bucket 中,log file 也是一直 append 直到大小超过设定的阈值 roll over。
对 UPDATE 和 DELETE 的支持非常高效,一条 record 的整个生命周期操作都发生在同一个 bucket,不仅减少小文件数量,也提升了数据读取的效率(不必要的 join 和 merge)。
0.8.0 的 HUDI Flink 支持了 streaming 消费 HUDI 表,在后续版本还会支持 watermark 机制,让 HUDI Flink 承担 streaming ETL pipeline 的中间层,成为数据湖/仓建设中流批一体的中间计算层。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。