首页 > 编程知识 正文

埋点数据采集,大数据采集

时间:2023-05-05 14:20:19 阅读:138799 作者:289

日志数据采集平台构建模型设计

1、为什么不能使用日志收集方案选择方案flume直接从日志服务器采用到hdfs :

1、由于日志服务器较多,从日志服务器直接访问HDFS时,HDFS的访问次数变得过高。 2、为了收集不同服务器上相同时间段的日志,flume会写入HDFS上的同一目录,但同一文件的写入不支持多线程并发写入。 使用方案flume聚合传输到hdfs的此解决方案解决了方案1的多线程并发写入问题。

无法采用的理由:为了flume聚合,多个flume被写入一个flume,末端的flume的传输负荷大,数据被存储。 停止收集方案3 :使用flumeKafkaflume的方式,为了通过Kafka集群的缓冲区,缓解flume的负荷,采用了该方案。 第1层flume配置计划flume需要读取本地日志服务器中的数据并监视多目录中文件的变化,因此source端采用taildir方法。

方案memory channel kafka sink

方案kafka channel

优点:不需要通过kafka sink,提高了传输速率

拦截器的配置为了以后分析数据,需要考虑数据的格式问题,首先需要清洗数据,通过拦截器清洗前端传输的json格式以外的数据

拦截器实现import com.Alibaba.fast JSON.JSON; import org.apache.flume.Context; import org.apache.flume.Event; importorg.Apache.flume.interceptor.interceptor; import java.util.Iterator; import java.util.List;/* * @在class name : localkfkinterceptor * @ author : kele * @ date :2021/1/1318336039 * @ description 3360中放置本地数据publicclasslocalkfkinterceptorimplementsinterceptor { @ overridepublicvoidinitialize ()/* *拦截单个事件的拦截器;传输的数据格式为JJ 如果不是用于确定是否为@ overridepubliceventintercept (event event ) strings=new string (event.getb bring )的try//JSON格式,则抛出异常否则,返回本身//是否存在异常将决定是否删除更改数据JSON.parseobject(s )。 返回事件; }catch(exceptione ) { return null; } @ overridepubliclisteventintercept (listeventlist ) { iterator event it=list.iterator }; wile(it.hasnext () ) { Event event=it.next ); if(intercept(event )==null ) it.remove ); } return list; } @ overridepublicvoidclose (({ }/* * *静态内部类为builder )/publicstaticclassmybuilderimplementsbuilder ) @ overrridepublicicilder @ overridepublicvoidconfigure (上下文上下文) { } }} flume配置文件) taildir source, 使用kafka channel将监视的数据写入a1.sources=R1 a1.cources tail dir多目录监视,监视的目录内的文件发生变化时首次写入a1.sources.R1 为了实现多目录监视a1.sources.r1.filegroups=f1#,设置监视的路径a1.sources

applog/log/app.*#配置批次的大小a1.sources.r1.batchSize = 100#设置断点续传记录的位置保存的地址a1.sources.r1.positionFile = /opt/module/flume/position.json#设置拦截器#将不是json格式传输的数据拦截a1.sources.r1.interceptors = i1#设置拦截器的类型,地址a1.sources.r1.interceptors.i1.type = com.atguigu.interce.LocalKfkInterceptor$MyBuilder#配置kafka channel,#channel类型,写入kafka channel的集群、topic名称、是否以事件的方式传输(该配置需要与kafka source设置的类型一致)a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092a1.channels.c1.kafka.topic = firsta1.channels.c1.parseAsFlumeEvent = false#source与channel的连接方式a1.sources.r1.channels = c1 2、日志存储

channel的类型选择 方案一:MemoryChannel

MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

方案二:FileChannel

FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。

方案三:kafkaChannel

使用kafkachannel则不需要source,但由于需要拦截器,如果有没source则无法配置拦截器,(需要解决零点漂移问题)

拦截器配置

由于Flume默认会用Linux系统时间,作为输出到HDFS路径的时间。如果数据是23:59分产生的。Flume消费Kafka里面的数据时,有可能已经是第二天了,那么这部门数据会被发往第二天的HDFS路径。我们希望的是根据日志里面的实际时间,发往HDFS的路径,所以下面拦截器作用是获取日志中的实际时间。

解决的思路:拦截json日志,通过fastjson框架解析json,获取实际时间ts。将获取的ts时间写入拦截器header头,header的key必须是timestamp,因为Flume框架会根据这个key的值识别为时间,写入到HDFS。

数据形式
获取ts字段

仿照默认的TimpStamp interceptor拦截器进行设置

官网Timestamp Interceptor介绍:Timestamp InterceptorThis interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp (or as specified by the header property) whose value is the relevant timestamp. This interceptor can preserve an existing timestamp if it is already present in the configuration.这个拦截器插入一个带有键时间戳的头(或由头属性指定的),它的值是timestamp 。如果配置中已经存在一个时间戳,这个拦截器可以保留它。 拦截器实现 /** * @ClassName : KfkHdfsInterceptor * @Author : kele * @Date: 2021/1/12 21:02 * @Description :自定义拦截器, * 通过设置event的K,V来解决零点漂移问题 */public class KfkHdfsInterceptor implements Interceptor { @Override public void initialize() { } /** * 增加时间的k,v解决零点漂移问题 * @param event * @return */ @Override public Event intercept(Event event) { String s = new String(event.getBody()); JSONObject json = JSON.parseObject(s); Long ts = json.getLong("ts"); Map<String, String> headers = event.getHeaders(); headers.put("timestamp",ts+""); return event; } @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { } public static class MyBuilder implements Builder{ @Override public Interceptor build() { return new KfkHdfsInterceptor(); } @Override public void configure(Context context) { } }} flume的配置文件 #1、定义agent、channel、source、sink的名称a1.sources = r1a1.channels = c1a1.sinks = k1#2、描述source#source类型,所在集群,topic名称,groupid,a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSourcea1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092a1.sources.r1.kafka.topics = firsta1.sources.r1.kafka.consumer.group.id = g3a1.sources.r1.batchSize = 100a1.sources.r1.useFlumeEventFormat = falsea1.sources.r1.kafka.consumer.auto.offset.reset = earliest#3、描述拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = com.atguigu.interce.KfkHdfsInterceptor$MyBuilder#4、描述channela1.channels.c1.type = filea1.channels.c1.checkpointDir = /opt/module/flume/checkpointa1.channels.c1.dataDirs = /opt/module/flume/datasa1.channels.c1.checkpointInterval = 1000a1.channels.c1.transactionCapacity = 1000#5、描述sinka1.sinks.k1.type = hdfsa1.sinks.k1.channel = c1a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/applog/%Y-%m-%da1.sinks.k1.hdfs.filePrefix = log-a1.sinks.k1.hdfs.rollInterval = 30#滚动大小,一般设置为稍小于128M,这里设置为126Ma1.sinks.k1.hdfs.rollSize = 132120576a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.batchSize = 100#设置文件保存到HDFS的时候采用哪种压缩格式#a1.sinks.k1.hdfs.codeC = lzop#设置文件的输出格式#a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.fileType = DataStream#6、关联source->channel->sinka1.sources.r1.channels = c1a1.sinks.k1.channel = c1

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。