Kafka集群优化的方法是什么


这篇文章主要介绍了Kafka集群优化的方法是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Kafka集群优化的方法是什么文章都会有所收获,下面我们一起来看看吧。背景个推作为专业的数据智能服务商,已经成功服务了数十万APP,每日的消息下发量达百亿级别,由此产生了海量日志数据。为了应对业务上的各种需求,我们需要采集并集中化日志进行计算,为此个推选用了高可用的、高可靠的、分布式的Flume系统以对海量日志进行采集、聚合和传输。此外,个推也不断对Flume进行迭代升级,以实现自己对日志的特定需求。原有的异地机房日志汇聚方式,整个流程相对来说比较简单,A机房业务产生的日志通过多种方式写入该机房Kafka集群,然后B机房的Flume通过网络专线实时消费A机房Kafka的日志数据后写入本机房的Kafka集群,所有机房的数据就是通过相同方式在B机房Kakfa集群中集中化管理。如图一所示:图一:原有异地日志传输模式但是随着业务量的不断增加,日志数据在逐渐增多的过程中对带宽要求变高,带宽的瓶颈问题日益凸显。按照1G的专线带宽成本2~3w/月来计算,一个异地机房一年仅专线带宽扩容成本就高达30w以上。对此,如何找到一种成本更加低廉且符合当前业务预期的传输方案呢?Avro有快速压缩的二进制数据形式,并能有效节约数据存储空间和网络传输带宽,从而成为优选方案。优化思路Avro简介Avro是一个数据序列化系统。它是Hadoop的一个子项目,也是Apache的一个独立的项目,其主要特点如下:● 丰富的数据结构;● 可压缩、快速的二进制数据类型;● 可持久化存储的文件类型;● 远程过程调用(RPC);● 提供的机制使动态语言可以方便地处理数据。具体可参考官方网站:http://avro.apache.org/Flume Avro方案Flume的RPC Source是Avro Source,它被设计为高扩展的RPC服务端,能从其他Flume Agent 的Avro Sink或者Flume SDK客户端,接收数据到Flume Agent中,具体流程如图二所示:图二:Avro Source流程针对该模式,我们的日志传输方案计划变更为A机房部署Avro Sink用以消费该机房Kafka集群的日志数据,压缩后发送到B机房的Avro Source,然后解压写入B机房的Kafka集群,具体的传输模式如图三所示:图三:Flume Avro传输模式可能存在的问题我们预估可能存在的问题主要有以下三点:● 当专线故障的时候,数据是否能保证完整性;● 该模式下CPU和内存等硬件的消耗评估;● 传输性能问题。验证情况针对以上的几个问题,我们做了几项对比实验。环境准备情况说明:1. 两台服务器192.168.10.81和192.168.10.82,以及每台服务器上对应一个Kakfa集群,模拟A机房和B机房;2. 两个Kafka集群中对应topicA(源端)和topicB(目标端)。在topicA中写入合计大小11G的日志数据用来模拟原始端日志数据。3. 192.168.10.82上部署一个Flume,模拟原有传输方式。4. 192.168.10.81服务器部署Avro Sink,192.168.10.82部署Avro Source,模拟Flume Avro传输模式。原有Flume模式验证(非Avro)监控Kafka消费情况:81流量统计: 82流量统计:消费全部消息耗时:20min消费总日志条数统计:129,748,260总流量:13.5GAvro模式验证配置说明:Avro Sink配置:#kafkasink 是kafkatokafka的sinks的名字,可配多个,空格分开kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type = org.apache.flume.source.kafka.KafkaSourcekafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.zookeeperConnect =192.168.10.81:2181kafkatokafka.sources.kafka_dmc_bullet.topic = topicAkafkatokafka.sources.kafka_dmc_bullet.kafka.zookeeper.connection.timeout.ms =150000kafkatokafka.sources.kafka_dmc_bullet.kafka.consumer.timeout.ms =10000kafkatokafka.sources.kafka_dmc_bullet.kafka.group.id = flumeavrokafkatokafka.sources.kafka_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet的配置,可配置多个sink提高压缩传输效率kafkatokafka.sinks.kafkasink_dmc_bullet.type = org.apache.flume.sink.AvroSinkkafkatokafka.sinks.kafkasink_dmc_bullet.hostname =192.168.10.82kafkatokafka.sinks.kafkasink_dmc_bullet.port =55555//与source的rpc端口一一对应kafkatokafka.sinks.kafkasink_dmc_bullet.compression-type = deflate//压缩模式kafkatokafka.sinks.kafka免费云主机域名sink_dmc_bullet.compression-level =6//压缩率1~9kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =5000#source kafkasink_dmc_bullet配的channel,只配一个kafkatokafka.channels.channel_dmc_bullet.type = memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000#kafkatokafka.channels.channel_dmc_bullet.byteCapacity = 10000#kafkatokafka.channels.channel_dmc_bullet.byteCapacityBufferPercentage = 10kafkatokafka.channels.channel_dmc_bullet.transactionCapacity =5000kafkatokafka.channels.channel_dmc_bullet.keep-alive=60Avro Source配置:#kafkasink 是kafkatokafka的sinks的名字,可配多个,空格分开kafkatokafka.sources = kafka_dmc_bulletkafkatokafka.channels = channel_dmc_bulletkafkatokafka.sinks = kafkasink_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.type= avrokafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bulletkafkatokafka.sources.kafka_dmc_bullet.bind =0.0.0.0kafkatokafka.sources.kafka_dmc_bullet.port =55555//rpc端口绑定kafkatokafka.sources.kafka_dmc_bullet.compression-type= deflate//压缩模式kafkatokafka.sources.kafka_dmc_bullet.batchSize =100#source kafkasink_dmc_bullet的配置kafkatokafka.sinks.kafkasink_dmc_bullet.type= org.apache.flume.sink.kafka.KafkaSinkkafkatokafka.sinks.kafkasink_dmc_bullet.kafka.partitioner.class = com.gexin.rp.base.kafka.SimplePartitionerkafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bulletkafkatokafka.sinks.kafkasink_dmc_bullet.topic = topicBkafkatokafka.sinks.kafkasink_dmc_bullet.brokerList =192.168.10.82:9091,192.168.10.82:9092,192.168.10.82:9093kafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks =1kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize =500kafkatokafka.channels.channel_dmc_bullet.type= memorykafkatokafka.channels.channel_dmc_bullet.capacity =100000kafkatokafka.channels.channel_dmc_bullet.transactionCapacity=1000监控Kafka消费情况81流量统计:82流量统计:消费全部消息耗时:26min消费总日志条数统计:129,748,260总流量:1.69G故障模拟1. 模拟专线故障,在A、B两机房不通的情况下,Avro Sink报错如下:2. 监控Kafka消费情况,发现消费者已停止消费:3. 故障处理恢复后继续消费剩余日志,经统计,总日志条数为:129,747,255。结论1. 当专线发生故障时,正在网络传输中的通道外数据可能会有少部分丢失,其丢失原因为网络原因,与Avro模式无关;故障后停止消费的数据不会有任何的丢失问题,由于网络原因丢失的数据需要评估其重要性以及是否需要补传。2. 流量压缩率达80%以上,同时我们也测试了等级为1~9的压缩率,6跟9非常接近,CPU和内存的使用率与原有传输模式相差不大,带宽的优化效果比较明显。3. 传输性能由于压缩的原因适当变弱,单Sink由原先20分钟延长至26分钟,可适当增加Sink的个数来提高传输速率。生产环境实施结果实施结果如下:1. 由于还有其它业务的带宽占用,总带宽使用率节省了50%以上,现阶段高峰期带宽速率不超过400Mbps;2. 每个Sink传输速率的极限大概是3000条每秒,压缩传输速率问题通过增加Sink的方式解决,但会适当增加CPU和内存的损耗。关于“Kafka集群优化的方法是什么”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“Kafka集群优化的方法是什么”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注云编程开发博客行业资讯频道。

相关推荐: 输层协议讲解

[TOC]他们是TCP/IP协议簇的传输层协议​ TCP(Transmission Control Protocol),即传输控制协议​ UDP(User Datagram Protocol),即用户数据报协议关键词:连接,可靠,目前数据传输大部分用的是TCP…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 02/06 17:00
下一篇 02/06 17:00