2. 数据默认就被分布到了多个Executor上。Receiver-based Approach 你需要做特定的
处理,才能让 Receiver分不到多个Executor上。 3. Receiver-based Approach 的方式,一旦你的Batch Processing 被delay了,或者被delay
了很多个batch,那估计你的Spark Streaming程序离奔溃也就不远了。 Direct Approach (No Receivers) 则完全不会存在类似问题。就算你delay了很多个batch time,你内存中的数据只有这次处理的。 4. Direct Approach (No Receivers) 直接维护了 Kafka offset,可以保证数据只有被执行
成功了,才会被记录下来,透过 checkpoint机制。如果采用Receiver-based
Approach,消费Kafka和数据处理是被分开的,这样就很不好做容错机制,比如系统当掉了。所以你需要开启WAL,但是开启WAL带来一个问题是,数据量很大,对HDFS是个很大的负担,而且也会对实时程序带来比较大延迟。 我原先以为Direct Approach 因为只有在计算的时候才拉取数据,可能会比Receiver-based Approach 的方式慢,但是经过我自己的实际测试,总体性能 Direct Approach会更快些,因为Receiver-based Approach可能会有较大的内存隐患,GC也会影响整体处理速度。
如何保证数据接受的可靠性
SS 自身可以做到 at least once 语义,具体方式是通过CheckPoint机制。 * CheckPoint 机制 *
CheckPoint 会涉及到一些类,以及他们之间的关系:
DStreamGraph类负责生成任务执行图,而JobGenerator则是任务真实的提交者。任务的
数据源则来源于DirectKafkaInputDStream,checkPoint 一些相关信息则是由类DirectKafkaInputDStreamCheckpointData 负责。
好像涉及的类有点多,其实没关系,我们完全可以不用关心他们。先看看checkpoint都干了些啥,checkpoint 其实就序列化了一个类而已:
org.apache.spark.streaming.Checkpoint
看看类成员都有哪些:
val master = ssc.sc.master val framework = ssc.sc.appName val jars = ssc.sc.jars val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDurationval pendingTimes = ssc.scheduler.getPendingTimes().toArray
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConfPairs = ssc.conf.getAll
? ? ? ? ? ? ? ? 1 2 3 4 5 6 7 8 其他的都比较容易理解,最重要的是 graph,该类全路径名是:
org.apache.spark.streaming.DStreamGraph
里面有两个核心的数据结构是:
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
? ? 1 2 inputStreams 对应的就是 DirectKafkaInputDStream 了。 再进一步,DirectKafkaInputDStream 有一个重要的对象
protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData
? 1 checkpointData 里则有一个data 对象,里面存储的内容也很简单
data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
? 1 就是每个batch 的唯一标识 time 对象,以及每个KafkaRDD对应的的Kafka偏移信息。 而 outputStreams 里则是RDD,如果你存储的时候做了foreach操作,那么应该就是 ForEachRDD了,他被序列化的时候是不包含数据的。
而downtime由checkpoint 时间决定,pending time之类的也会被序列化。 经过上面的分析,我们发现:
1. checkpoint 是非常高效的。没有涉及到实际数据的存储。一般大小只有几十K,因
为只存了Kafka的偏移量等信息。
2. checkpoint 采用的是序列化机制,尤其是DStreamGraph的引入,里面包含了可能如
ForeachRDD等,而ForeachRDD里面的函数应该也会被序列化。如果采用了CheckPoint机制,而你的程序包做了做了变更,恢复后可能会有一定的问题。 接着我们看看JobGenerator是怎么提交一个真实的batch任务的,分析在什么时间做checkpoint 操作,从而保证数据的高可用:
1. 2. 3. 4. 5.
产生jobs
成功则提交jobs 然后异步执行 失败则会发出一个失败的事件
无论成功或者失败,都会发出一个 DoCheckpoint 事件。 当任务运行完成后,还会再调用一次DoCheckpoint 事件。
只要任务运行完成后没能顺利执行完DoCheckpoint前crash,都会导致这次Batch被重新调度。也就说无论怎样,不存在丢数据的问题,而这种稳定性是靠checkpoint 机制以及Kafka的可回溯性来完成的。
那现在会产生一个问题,假设我们的业务逻辑会对每一条数据都处理,则
1. 我们没有处理一条数据 2. 我们可能只处理了部分数据 3. 我们处理了全部数据
根据我们上面的分析,无论如何,这次失败了,都会被重新调度,那么我们可能会重复处理数据,可能最后失败的那一次数据的一部分,也可能是全部,但不会更多了。 * 业务需要做事务,保证 Exactly Once 语义 * 这里业务场景被区分为两个:
1. 幂等操作
2. 业务代码需要自身添加事物操作
所谓幂等操作就是重复执行不会产生问题,如果是这种场景下,你不需要额外做任何工作。但如果你的应用场景是不允许数据被重复执行的,那只能通过业务自身的逻辑代码来解决了。
这个SS 倒是也给出了官方方案:
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator } }
? ? ? ? ? ? ? 1 2 3 4 5 6 7 这代码啥含义呢? 就是说针对每个partition的数据,产生一个uniqueId,只有这个partion的所有数据被完全消费,则算成功,否则算失败,要回滚。下次重复执行这个uniqueId 时,如果已经被执行成功过的,则skip掉。 这样,就能保证数据 Exactly Once 语义啦。
总结
根据我的实际经验,目前Direct Approach 稳定性个人感觉比 Receiver-based Approach 更好些,推荐使用 Direct Approach 方式和Kafka进行集成,并且开启响应的checkpoint 功能,保证数据接收的稳定性,Direct Approach 模式本身可以保证数据 at least once语义,如果你需要Exactly Once 语义时,需要保证你的业务是幂等,或者保证了相应的事务。