漫谈 Flink - State 之 Kafka 写入 HBase
前面介绍了读取 state,这篇笔记介绍下 state 是如何触发写入的。
1. Why
考虑订阅 Kafka 写 HBase 的场景:
写入 HBase 时,为了获取最大的写入性能,可能会先缓存到内存然后批量写入
消费 Kafka 时,需要记录当前的 offsets,方便重启时继续消费。
因此,就需要有数据同步的机制,在上传 kafka 的 offsets 前,确保 hbase 收到了全部数据并且将内存的数据持久化,这就是 flink 里 checkpoint 的作用。
2. How
Fault Tolerance via State Snapshots
里这张图从理论上解释的非常清楚:
简单来讲,相比 spark 物理上微批处理的方式,flink 在逻辑...
漫谈 Flink - JobGraph 之 JobVertexID
上一篇笔记看到 restore state 时找不到 operator id 的问题:
Cannot map checkpoint/savepoint state for operator 77fec41789154996bfa76055dea29472 to the new program
这些数字的变化乍看非常奇怪,这篇笔记尝试分析下这些数字是如何生成的并且修复上个例子。
1. 唯一OperatorID
OperatorID 对应的就是 StreamGraph 里的节点 id,不过在 Transformation->StreamGraph 阶段,都还是自增的 id。在生成 JobGraph 时,会首先为每个 StreamNode 生成 hashID,代码入口在cre...
漫谈 Flink - State 之读取 Kafka Offsets
上篇学习笔记通过 Kafka Offsets 以及用户 Count 函数分别介绍了 OperatorState 以及 KeyedState,但当我们使用 flink-SQL 时无法Assigning Operator IDs
,而实际场景中总免不了要查看 state 的需求。
这篇笔记从这个实际需求出发,实现一个 demo 的解决方案,进而相比文档更深入的了解下 flink state 的设计。
1. Mock FlinkKafkaConsumerBase
在不想深入了解 flink state 实现细节的情况下,一个比较自然的想法是:既然Flinkkafkaconsumerbase可以从 state 恢复 offsets,那么我们不妨直接利用该类或者Mock该类去尝试读取 st...
漫谈 Flink - State 之 Kafka Offsets
1. 概念
Flink 支持有状态的计算。状态即历史数据,例如计算过程中的 pv uv,此外还有数据源例如 Kafka Connector 的 offsets,数据存储比如接收后缓存未持久化的数据。
计算 uv,就需要存储键为 u,值为 count/明细的数据,可以使用外部存储,也可以在计算引擎中存储。在计算引擎中存储的好处是可以做到对用户无感知,例如SELECT user, count(distinct url) GROUP BY user,如果我们只需要写出这样的逻辑,而不用关注distinct url是如何存储的,会是一件很美好的事情。当然同样需要解决接口上的易用性、数据不丢不重的可靠性这类基础问题。
Flink 支持这类需求的机制就是 State.
网上介绍 state...
漫谈 Flink - KafkaTable 解析
flink 提供多种消费 Kafka 数据的方式,由于不同层级接口支持的功能范围、粒度不同,同时 flink 版本迭代频繁,接口也在不断发生变化,因此使用起来容易混淆。
当我们定义了一个 Kafka 的 DDL,例如:
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11'
...
)
在 DDL 背后都做了什么,使得我们能够通过 SQL 读写这张表?flink 如何组织其代码结构,如何复用 streaming 相关代码的?接口从 API 到 SQL,方式更加简洁的同时,又有哪些功能被忽略掉了?
这些疑问,在刚...
漫谈 Flink - JobGraph
生成StreamGraph后,接下来就是构造 JobGraph 了,这一步主要的变化是将尽可能合并多个相邻的 StreamNode.
1. Why
在分布式计算中,多个节点交换数据都通过 rpc 完成,这其中就少不了网络传输、序列化与反序列化的一系列过程,如果能够优化为硬盘或者内存处理,例如对于map(...).filt(...),理论上完全可以在本地串行计算完成,避免全量传输map的结果,在性能上明显就会有提高。
Flink 从 StreamGraph 转化为 JobGrpah 的过程,主要就是这个目的。例如对于 A → B → C → D 的调用顺序,如果 A B 可以合并为本地处理,那么就可以生成 A’ → C → D 新的 DAG 图,这就是 StreamGraph → ...
漫谈 Flink - StreamGraph
上一篇笔记介绍由 API 生成StreamExecutionEnvironment.transformations,接下来就是生成 StreamGraph.
StreamExecutionEnvironment.execute里包含了诸如 StreamGraph、JobGraph、?等流程。
具体在StreamGraphGenerator.generate:
StreamGraph generate() {
streamGraph = new StreamGraph(executionConfig, checkpointConfig);
...
alreadyTransformed = new HashMap<>();
for (Transformat...
漫谈 Flink - Transformations
年前开始接触 flink,到现在已经有三个月的时间了,除了最开始简单看了下 flink 的启动过程,最近一直被其 scala 及 SQL API 搞的很虚。这个假期得空,终于开始盘点下。
这篇笔记介绍下 transformations 的生成过程。
其实 flink 相关介绍网上比比皆是,为了避免拾人牙慧,本文主要介绍下自己的理解,参考文章附录在文末。
1. 开篇
以 flink 里的 Hello World 为例:
val text = env.socketTextStream("127.0.0.1", 8011)
text.flatMap(new FlatMapFunction[String, (String, Int)] {
override def flatMa...
213 post articles, 27 pages.