浅谈 Flink - JobGraph 之 JobVertexID

 

上一篇笔记看到 restore state 时找不到 operator id 的问题:

Cannot map checkpoint/savepoint state for operator 77fec41789154996bfa76055dea29472 to the new program

这些数字的变化乍看非常奇怪,这篇笔记尝试分析下这些数字是如何生成的并且修复上个例子。

1. 唯一OperatorID

OperatorID 对应的就是 StreamGraph 里的节点 id,不过在 Transformation->StreamGraph 阶段,都还是自增的 id。在生成 JobGraph 时,会首先为每个 StreamNode 生成 hashID,代码入口在createJobGraph,通过StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes得到。

定义唯一 id 最简单朴素高效的方式大概就是 UUID,但考虑最基础的情况,如果只是程序重启或者某个 transform 内部修改,id 可以保持不变,也就是当这个 JobGraph 没有发生改变时节点 id 无须改变。当节点只有后续节点改变,且后续节点改变对该节点没有影响时,该节点的 id 也无须变化。

因此,节点的 id 取决于 3 个因素:

  1. 节点在图中的位置,用于判断前序节点的部分图是否发生了变化
  2. 输入节点的id,判断输入节点是否发生变化
  3. 输出节点(chainable)的个数

其实不同的用途,id 取决的因素也会不同。这里也只是大概的猜测,flink 从通用性的角度,实现上也会更严格些,感兴趣的可以直接看下traverseStreamGraphAndGenerateHashes这块代码。

为了提高易用性,flink 还提供了手动设置Assigning Operator IDs的方式,uid接口用于生成节点 id.

2. traverseStreamGraphAndGenerateHashes

我们模仿traverseStreamGraphAndGenerateHashes实现一个简单的版本:

MicroStreamGraph记录了影响 hash 值的几个元素,如果没有设置uid,通过(输入节点id, chainable的出边个数,节点位置)计算,如果设置了uid,则基于设置值计算。

其中构造了两个StreamNodes,对应前面的两个例子:StreamWithStateSampleWord Count

object JobVertexGenerator extends App {
  val hashFunction = Hashing.murmur3_128(0)
  val hashes = mutable.HashMap.empty[Int, Array[Byte]]

  // Plan
  // {"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":4},{"id":2,"type":"Map","pact":"Operator","contents":"Map","parallelism":4,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Map","pact":"Operator","contents":"Map","parallelism":4,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":4,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}

  case class MicroStreamGraph(nodeId: Int,
                              userSpecifiedUID: String,
                              chainableOutEdgeCnt: Int,
                              inEdgeNodeIds: List[Int])

  // https://izualzhy.cn/flink-source-kafka-checkpoint-init#3-reading-state
  val streamNodes = List(
    MicroStreamGraph(1, "source_uid", 1, List.empty[Int]),
    MicroStreamGraph(2, null, 0, List(1)),
    MicroStreamGraph(4, "count_uid", 1, List(2)),
    MicroStreamGraph(5, null, 0, List(4))
  )
  /*
  // https://izualzhy.cn/flink-source-job-graph
  val streamNodes = List(
    MicroStreamGraph(1, null, 1, List.empty[Int]),
    MicroStreamGraph(2, null, 0, List(1)),
    MicroStreamGraph(4, null, 0, List(2)),
    MicroStreamGraph(5, null, 0, List(4))
  )
   */

  // StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes
  streamNodes.map{
    case streamNode if streamNode.userSpecifiedUID == null =>
      // StreamGraphHasherV2.generateDeterministicHash
      val hasher = hashFunction.newHasher()

      hasher.putInt(hashes.size)
      (0 until streamNode.chainableOutEdgeCnt).foreach(_ => hasher.putInt(hashes.size))

      val hash = hasher.hash().asBytes()

      streamNode.inEdgeNodeIds.foreach{inEdgeNodeId =>
        val inEdgeNodeHash = hashes(inEdgeNodeId)

        println(s"inEdgeNodeHash:${byteToHexString(inEdgeNodeHash)}")
        (0 until hash.length).foreach(i =>
          hash(i) = (hash(i) * 37 ^ inEdgeNodeHash(i)).toByte)
      }
      println(s"hash:${byteToHexString(hash)}")
      hashes.update(streamNode.nodeId, hash)

    case streamNode if streamNode.userSpecifiedUID != null =>
      // StreamGraphHasherV2.generateUserSpecifiedHash
      // OperatorIDGenerator.fromUid
      val hasher = hashFunction.newHasher()
      hasher.putString(streamNode.userSpecifiedUID.toString, Charset.forName("UTF-8"))
      hashes.update(streamNode.nodeId, hasher.hash().asBytes())
  }

  streamNodes.foreach(streamNode => println(s"${streamNode.nodeId} ${byteToHexString(hashes(streamNode.nodeId))}"))
}

对于第一个 StreamNodes,输出结果为

1 64248066b88fd35e9203cd469ffb4a53
2 d216482dd1005af6d275607ff9eabe2c
4 77fec41789154996bfa76055dea29472
5 f0bb9ed0d20321fef7413e1942e21550

可以看到这里跟读取 State时得到的 operatorID 是一致的。 也可以确认operatorID:77fec41789154996bfa76055dea29472的 state size = 2114,对应的就是map(new CountFunction)

对于第二个 StreamNodes,输出结果为

1 cbc357ccb763df2852fee8c4fc7d55f2
2 7df19f87deec5680128845fd9a6ca18d
4 9dd63673dd41ea021b896d5203f3ba7c
5 1a936cb48657826a536f331e9fb33b5e

这里的日志一致。
注:搜 cbc357ccb763df2852fee8c4fc7d55f2,大部分都是关于 flink 的结果😅

事实上,启动StreamWithStateSample的日志里也记录了非uid的hash值(对于uid的 hash 值,flink默认不会记录到日志)

... DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2      - Generated hash 'd216482dd1005af6d275607ff9eabe2c' for node 'Map-2' {id: 2, parallelism: 1, user function: org.apache.flink.streaming.api.scala.DataStream$$anon$4}
... DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2      - Generated hash 'f0bb9ed0d20321fef7413e1942e21550' for node 'Sink: Print to Std. Out-5' {id: 5, parallelism: 1, user function: org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}

到了这里,关于OperatorID报错的修复也就比较简单了,增加相同的uid即可。

//  env.addSource(new MockKafkaSource).print()
env.addSource(new MockKafkaSource).uid("source_uid").print()

3. OperatorID 应用

3.1. State

flink 支持有状态的计算,状态是跟每一个 JobVertexID 关联的,通过SavePoint.getOperatorStates接口声明也可以说明这点。从 state 恢复时也依赖逐个遍历且映射每个 OperatorID.

3.2. metrics

比如我们通过REST API查看例子里的 job 状态,可以看到这么一组 vertices:

{
    ...
    "vertices":[
        {
            "id":"64248066b88fd35e9203cd469ffb4a53",
            "name":"Source: Custom Source -> Map",
            ...
        },
        {
            "id":"77fec41789154996bfa76055dea29472",
            "name":"Map -> Sink: Print to Std. Out",
            ...
        }
    ]
    ...
}

也就是实际运行时节点 chain 优化为了两条:

StreamWithStateSample_running

metrics 看到的节点都是每条 chain 的首节点,开启了Latency tracking后,也可以看到每个 OperatorID 之间的 latency,形如「latency.source_id.X.source_subtask_index.0.operator_id.Y.operator_subtask_index.3.latency_p99」.

4. 总结

StreamGraph 生成 JobGraph 的过程中,首先为每个节点生成 hash id.JobGraph chain 时,符合条件的节点被 chain 到首节点,相关监控都使用首节点 id。不过全部节点 id 也都记录下来,用于 latency 这类 metrics,state 存储等。在 latency 这里,明显使用uid name可读性更高,最开始接触 flink 时在社区里也提过疑问,不过没有回应。

5. Ref

  1. Add operator name to latency metrics
  2. LatencyMetric scope should include operator names
  3. latency metrics 里使用 operator name
  4. 16年ci的版本还带着 OperatorName