sparkstreaming 2.1.0 kafka 0.10.1.1 topic三个partition
相关配置代码:
val kafkaParam = Map[String,Object]( "key.deserializer"->classOf[StringDeserializer] ,"value.deserializer"->classOf[StringDeserializer] , "group.id"->"test" , "auto.offset.reset"->"latest" ,"enable.auto.commit"->true ,"bootstrap.servers"->"sy-002.hadoop:9092,sy-001.hadoop:9092,sy-003.hadoop:9092" ) //args : test 10.173.249.68:2181 10.173.249.68:9092 test-consumer-group val topics = Array("flume_kafka2") val ds = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParam)) ds.foreachRDD(rdd=>{ val off = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition(iter=>{ val o = off(TaskContext.get().partitionId()) //打印分区的offset消费情况 println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") })
flume_kafka2 2 56195 56195
flume_kafka2 0 56283 56283
flume_kafka2 1 56220 56279
重复多次总是只有一个分区的offset变化
日志:
"Beginning offset 56195 is the same as ending offset skipping.....")
上网找的解决方案 https://issues.apache.org/jira/browse/KAFKA-4547
这是0.10.1.1的bug 使用0.10.0.1版本即可解决。
希望以后遇到此问题的童鞋少浪费时间在这个问题上!