频道栏目
首页 > 资讯 > 云计算 > 正文

kafka0.10版本spark只能读到一个分区的数据,错误Beginningoffsetxxxisthesameasendingoffset的解决办法

18-06-27        来源:[db:作者]  
收藏   我要投稿

sparkstreaming 2.1.0 kafka 0.10.1.1 topic三个partition

相关配置代码:

val kafkaParam = Map[String,Object](
  "key.deserializer"->classOf[StringDeserializer]
  ,"value.deserializer"->classOf[StringDeserializer]
  , "group.id"->"test"
  , "auto.offset.reset"->"latest"
  ,"enable.auto.commit"->true
  ,"bootstrap.servers"->"sy-002.hadoop:9092,sy-001.hadoop:9092,sy-003.hadoop:9092"
)
//args : test 10.173.249.68:2181 10.173.249.68:9092 test-consumer-group
val topics = Array("flume_kafka2")
val ds = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParam))
ds.foreachRDD(rdd=>{
        val off = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition(iter=>{
        val o = off(TaskContext.get().partitionId())
          //打印分区的offset消费情况
          println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        })

flume_kafka2 2 56195 56195

flume_kafka2 0 56283 56283

flume_kafka2 1 56220 56279

重复多次总是只有一个分区的offset变化

日志:

"Beginning offset 56195 is the same as ending offset skipping.....")
上网找的解决方案 https://issues.apache.org/jira/browse/KAFKA-4547
这是0.10.1.1的bug 使用0.10.0.1版本即可解决。

希望以后遇到此问题的童鞋少浪费时间在这个问题上!

相关TAG标签
上一篇:window系统搭建linux虚拟机的步骤教程
下一篇:文本处理及正则表达式常用命令总结
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站