日志数据采集平台构建模型设计
1、为什么不能使用日志收集方案选择方案flume直接从日志服务器采用到hdfs :
1、由于日志服务器较多,从日志服务器直接访问HDFS时,HDFS的访问次数变得过高。 2、为了收集不同服务器上相同时间段的日志,flume会写入HDFS上的同一目录,但同一文件的写入不支持多线程并发写入。 使用方案flume聚合传输到hdfs的此解决方案解决了方案1的多线程并发写入问题。
无法采用的理由:为了flume聚合,多个flume被写入一个flume,末端的flume的传输负荷大,数据被存储。 停止收集方案3 :使用flumeKafkaflume的方式,为了通过Kafka集群的缓冲区,缓解flume的负荷,采用了该方案。 第1层flume配置计划flume需要读取本地日志服务器中的数据并监视多目录中文件的变化,因此source端采用taildir方法。
方案memory channel kafka sink
方案kafka channel
优点:不需要通过kafka sink,提高了传输速率
拦截器的配置为了以后分析数据,需要考虑数据的格式问题,首先需要清洗数据,通过拦截器清洗前端传输的json格式以外的数据
拦截器实现import com.Alibaba.fast JSON.JSON; import org.apache.flume.Context; import org.apache.flume.Event; importorg.Apache.flume.interceptor.interceptor; import java.util.Iterator; import java.util.List;/* * @在class name : localkfkinterceptor * @ author : kele * @ date :2021/1/1318336039 * @ description 3360中放置本地数据publicclasslocalkfkinterceptorimplementsinterceptor { @ overridepublicvoidinitialize ()/* *拦截单个事件的拦截器;传输的数据格式为JJ 如果不是用于确定是否为@ overridepubliceventintercept (event event ) strings=new string (event.getb bring )的try//JSON格式,则抛出异常否则,返回本身//是否存在异常将决定是否删除更改数据JSON.parseobject(s )。 返回事件; }catch(exceptione ) { return null; } @ overridepubliclisteventintercept (listeventlist ) { iterator event it=list.iterator }; wile(it.hasnext () ) { Event event=it.next ); if(intercept(event )==null ) it.remove ); } return list; } @ overridepublicvoidclose (({ }/* * *静态内部类为builder )/publicstaticclassmybuilderimplementsbuilder ) @ overrridepublicicilder @ overridepublicvoidconfigure (上下文上下文) { } }} flume配置文件) taildir source, 使用kafka channel将监视的数据写入a1.sources=R1 a1.cources tail dir多目录监视,监视的目录内的文件发生变化时首次写入a1.sources.R1 为了实现多目录监视a1.sources.r1.filegroups=f1#,设置监视的路径a1.sources
applog/log/app.*#配置批次的大小a1.sources.r1.batchSize = 100#设置断点续传记录的位置保存的地址a1.sources.r1.positionFile = /opt/module/flume/position.json#设置拦截器#将不是json格式传输的数据拦截a1.sources.r1.interceptors = i1#设置拦截器的类型,地址a1.sources.r1.interceptors.i1.type = com.atguigu.interce.LocalKfkInterceptor$MyBuilder#配置kafka channel,#channel类型,写入kafka channel的集群、topic名称、是否以事件的方式传输(该配置需要与kafka source设置的类型一致)a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092a1.channels.c1.kafka.topic = firsta1.channels.c1.parseAsFlumeEvent = false#source与channel的连接方式a1.sources.r1.channels = c1 2、日志存储 channel的类型选择 方案一:MemoryChannelMemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
方案二:FileChannelFileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。
方案三:kafkaChannel使用kafkachannel则不需要source,但由于需要拦截器,如果有没source则无法配置拦截器,(需要解决零点漂移问题)
拦截器配置由于Flume默认会用Linux系统时间,作为输出到HDFS路径的时间。如果数据是23:59分产生的。Flume消费Kafka里面的数据时,有可能已经是第二天了,那么这部门数据会被发往第二天的HDFS路径。我们希望的是根据日志里面的实际时间,发往HDFS的路径,所以下面拦截器作用是获取日志中的实际时间。
解决的思路:拦截json日志,通过fastjson框架解析json,获取实际时间ts。将获取的ts时间写入拦截器header头,header的key必须是timestamp,因为Flume框架会根据这个key的值识别为时间,写入到HDFS。数据形式
获取ts字段
仿照默认的TimpStamp interceptor拦截器进行设置