教育行业A股IPO第一股(股票代码 003032)

全国咨询/投诉热线:400-618-4000

天富娱乐注册注册网站 kafka怎样提交偏移量?【kafka消费者详解】

更新时间:2023年06月23日14时18分 来源:传智教育 浏览次数:

kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量)。

消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡。

提交偏移量

正常的情况

消费者组

如果消费者2挂掉以后,会发生再均衡,消费者2负责的分区会被其他消费者进行消费,再均衡后不可避免会出现一些问题。

问题一:

消费者再均衡

如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

问题二:

1687500653044_上次提交的偏移量.png

如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。如果想要解决这些问题,还要知道目前威尼斯足球队官网首页的方式:

提交偏移量的方式有两种,分别是天富娱乐登录彩票和手动提交。

天富娱乐登录彩票

当enable.auto.commit被设置为true,提交方式就是让消费者天富娱乐登录彩票,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去

手动提交 ,当enable.auto.commit被设置为false可以有以下三种提交方式

提交当前偏移量(同步提交)

天富娱乐注册官网平台

同步和异步组合提交

天富娱乐登录网页版

把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。

只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

 while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); try { consumer.commitSync();//同步提交当前最新的偏移量 }catch (CommitFailedException e){ System.out.println("记录提交失败的异常:"+e); } } }

天富娱乐登录官方入口

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用天富娱乐注册官网平台的API。

 while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e!=null){ System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e); } } }); }

天富娱乐注册最新网址

天富娱乐注册官网平台也有个缺点,那就是如果服务器返回提交失败,天富娱乐注册官网平台不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。天富娱乐注册官网平台没有实现重试是因为,如果同时存在多个天富娱乐注册官网平台,进行重试可能会导致位移覆盖。

举个例子,假如我们发起了一个天富娱乐注册官网平台commitA,此时的提交位移为2000,随后又发起了一个天富娱乐注册官网平台commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

 try { while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); System.out.println(record.key()); } consumer.commitAsync(); } }catch (Exception e){+ e.printStackTrace(); System.out.println("记录错误信息:"+e); }finally { try { consumer.commitSync(); }finally { consumer.close(); } }

0 分享到:
和我们在线交谈!