Flume+Kafka整合


Flume+Kafka整合准备5台内网服务器创建ZookeeperKafka集群服务器地址:192.168.2.240192.168.2.241192.168.2.242192.168.2.243192.168.2.244服务器系统:Centos 6.5 64下载安装包Zookeeperhttp://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gzFlumehttp://apache.fayea.com/flume/1.7.0/apache-flume-1.7.0-bin.tar.gzKafkahttp://apache.fayea.com/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgzZookeeperFlumekafka需要用到Java环境,所以先安装JDk

选择3台服务器作为zookeeper集群,他们的IP分别为:192.168.2.240192.168.2.241192.168.2.242注:先在第一台服务器192.168.2.240上分别执行(1)-(3)步。1)解压:将zookeeper-3.4.6.tar.gz放入/opt目录下2)创建配置文件:将conf/zoo_sample.cfg拷贝一份命名为zoo.cfg,也放在conf目录下。然后按照如下值修改其中的配置:tickTime=2000dataDir=/opt/zookeeper/DatainitLimit=5syncLimit=2clientPort=2181server.1=192.168.2.240:2888:3888server.2=192.168.2.241:2888:3888server.3=192.168.2.242:2888:3888各个参数的意义:tickTime:心跳检测的时间间隔(毫秒),缺省:2000clientPort:其他应用(比如solr)访问ZooKeeper的端口,缺省:2181initLimit:初次同步的阶段(followers连接到leader的阶段),允许的时长(tick数量),缺省:10syncLimit:允许followers同步到ZooKeeper的时长(tick数量),缺省:5dataDir数据(比如所管理的配置文件)的存放路径server.XX是集群中一个服务器的id,与myid文件中的id是一致的。右边可以配置两个端口,第一个端口用于FllowerLeader之间的数据同步和其它通信,第二个端口用于Lea开发云主机域名der选举过程中投票通信。3)创建/opt/zookeeper/Data快照目录,并创建my id文件,里面写入1。4)将192.168.2.240上已经配置好的/opt/zookeeper/目录分别拷贝至192.168.2.241192.168.2.242。然后将对应的myid的内容修改为235)启动zookeeper集群分别在3台服务器上执行启动命令一共5台服务器,服务器IP地址:192.168.2.240 node1192.168.2.241 node2192.168.2.242 node3192.168.2.243 node4192.168.2.244 node51、解压安装文件到/opt/目录2、修改server. properties文件#node1 配置broker.id=0port=9092advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9092advertised.host.name=58.246.xx.xx#碰到的坑,由于我是从线上把nginx日志拉回公司本地服务器,所以这两选项必须配置成路由器外网IP地址,否则线上flume报无法连接kafka节点,报无法传送日志消息advertised.port=9092num.network.threads=3num.io.threads=8num.partitions=5zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node2 配置broker.id=1port=9093advertised.listeners=PLAINTEXT://58.246.xx.xx:9093advertised.host.name=58.246.xx.xxadvertised.port=9093num.network.threads=3num.io.threads=8num.partitions=5zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node3 配置broker.id=2port=9094advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9094advertised.host.name=58.246.xx.xxadvertised.port=9094num.network.threads=3num.io.threads=8num.partitions=5zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181

#node4 配置
broker.id=2port=9095advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9095advertised.host.name=58.246.xx.xxadvertised.port=9095num.network.threads=3num.io.threads=8num.partitions=5zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181
#node5 配置broker.id=2port=9096advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9096advertised.host.name=58.246.xx.xxadvertised.port=9096num.network.threads=3num.io.threads=8num.partitions=5zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181启动卡夫卡集群分别在所有节点执行以下命令来启动服务
安装两台flume,一台安装在线上,把线上的日志传回本地kafka,另一台安装在本地,把kafka集群的日志信息转存到HDFS收集nginx日志传给公司内部kafka1、 解压安装包cd /opttar –zxvf apache-flume-1.7.0-bin.tar.gz2、 创建配置文件Vi flume-conf.properties 添加以下内容a1.sources = r1a1.sinks = k1a1.channels = c1
# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F/unilifeData/logs/nginx/access.loga1.sources.r1.channels = c1
# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 100000a1.channels.c1.transactionCapacity = 100000
#sinksa1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = unilife_nginx_productiona1.sinks.k1.kafka.bootstrap.servers = 58.246.xx.xx:9092,58.246.xx.xx:9093,58.246.xx.xx:9094a1.sinks.k1.brokerList = 58.246.xx.xx:9092,58.246.xx.xx:9093,58.246.xx.xx:9094a1.sinks.k1.kafka.producer.acks = 1a1.sinks.k1.flumeBatchSize = 2000a1.sinks.k1.channel = c1启动flume服务
转存日志到HDFS1、解压安装包cd /opttar –zxvf apache-flume-1.7.0-bin.tar.gz3、 创建配置文件nginx.sources= source1nginx.channels = channel1nginx.sinks = sink1nginx.sources.source1.type =org.apache.flume.source.kafka.KafkaSourcenginx.sources.source1.zookeeperConnect =master:2181,slave1:2181,slave2:2181nginx.sources.source1.topic =unilife_nginx_productionnginx.sources.source1.groupId =flume_unilife_nginx_productionnginx.sources.source1.channels = channel1nginx.sources.source1.interceptors = i1nginx.sources.source1.interceptors.i1.type =timestampnginx.sources.source1.kafka.consumer.timeout.ms = 100
nginx.channels.channel1.type = memorynginx.channels.channel1.capacity = 10000000nginx.channels.channel1.transactionCapacity = 1000
nginx.sinks.sink1.type = hdfsnginx.sinks.sink1.hdfs.path =hdfs://192.168.2.240:8020/user/hive/warehouse/nginx_lognginx.sinks.sink1.hdfs.writeFormat=Textnginx.sinks.sink1.hdfs.inUsePrefix=_nginx.sinks.sink1.hdfs.rollInterval = 3600nginx.sinks.sink1.hdfs.rollSize = 0nginx.sinks.sink1.hdfs.rollCount = 0nginx.sinks.sink1.hdfs.fileType = DataStreamnginx.sinks.sink1.hdfs.minBlockReplicas=1nginx.sinks.sink1.channel = channel1
启动服务

相关推荐: vsftpd安装配置虚拟用户

当用户量越来越大,创建更多的系统用户是不明智的,这时就需要为vsftpd创建虚拟账户,但vsftpd虚拟账户的数据库要保存在Berkeley DB格式的数据文件中,所以需要安装db4-utils工具来创建这样的数据库文件一、系统环境# cat /etc/iss…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

(0)
打赏 微信扫一扫 微信扫一扫
上一篇 03/30 09:39
下一篇 03/30 09:39