首页 > 编程知识 正文

flink定时调度,flink定时运行job

时间:2023-05-03 23:23:30 阅读:257813 作者:3556

Flink时间窗口运用

上一篇介绍了Flink定时读取外部数据Flink 定时加载外部文件数据并广播

这一篇将介绍Flink定时输出到外部存储介质,有两种办法实现,一种是同上一篇一样,在RichXXXFunction中实现SinkFunction的方法,在其中open()方法中引入java的定时任务。

本文介绍另一种实现,基于Flink window窗口机制,将结果定时sink到外部文件。

需求:

经过flink清洗后的数据,要求每天sink一次数据到某文件中(该文件内容是json格式,需要进行追加,不属于flink范畴不细讲)。

实现:

1、数据清洗:

DataStream<JSONObject> userSink = env.addSource(ssConsumer).map(new MapFunction<String, JSONObject>() {//此处清洗数据,取实时数据中的某些字段 public JSONObject map(String value) { JSONObject jsonObject = new JSONObject(); JSONObject out = new JSONObject(); try { jsonObject = JSON.parseObject(value); out.put(String.valueOf(jsonObject.get("subject_id")), jsonObject.get("subject_name")); } catch (Exception e) { JSONObject json = new JSONObject(); jsonObject = json; logger.error("This value parse has error:", e); } return out; } }).filter(new FilterFunction<JSONObject>() { @Override//此处过滤非Json格式数据 public boolean filter(JSONObject value) throws Exception { if ("error".equals(value.getOrDefault("error", ""))) { return false; } else { return true; } } });

2、自定义sink,流数据输出到txt文件:

/** * @Author luran * @create 2020/4/13 18:36 * @Desc *//** * 继承RichSinkFunction<String>类,其中List<JSONObject>为source端传到sink的数据类型,这个视Source端数据类型而定。 */public class MyRishSinkFileWriter extends RichSinkFunction<List<JSONObject>> implements SinkFunction<List<JSONObject>> { private static String path; /** * open方法在sink第一次启动时调用,一般用于sink的初始化操作 */ @Override public void open(Configuration parameters) { try { super.open(parameters); path = PropertyReaderUtil.getStrValue("user.txt.path"); File file = new File(path); if (!file.exists()) { file.createNewFile(); } } catch (Exception e) { log.error("获取user.txt路径失败:", e); } } /** * invoke方法是sink数据处理逻辑的方法,source端传来的数据都在invoke方法中进行处理 * 其中invoke方法中第一个参数类型与RichSinkFunctionList<JSONObject>中的泛型对应。第二个参数 * 为一些上下文信息 */ @Override public void invoke(List<JSONObject> v, Context context) { try { JSONObject json = null; String s = JsonFileReaderUtil.readJsonData(path); if (s.length() == 0) { json = new JSONObject(); } else { json = JSONObject.parseObject(s); } for (JSONObject d : v) { json.putAll(d); } String value = json.toJSONString(); File file = new File(path); FileWriter fileWriter = new FileWriter(file); fileWriter.write(value); fileWriter.flush(); fileWriter.close(); } catch (IOException e) { log.error("写入user文件异常:", e); } } /** * close方法在sink结束时调用,一般用于资源的回收操作 */ @Override public void close() throws Exception { super.close(); }}

3、时间窗口的启用:

//由于用了时间窗口,输出肯定是List<> DataStream<List<JSONObject>> dataBaseStream = userSink .windowAll(TumblingProcessingTimeWindows.of(Time.days(1))) .process(new ProcessAllWindowFunction<JSONObject, List<JSONObject>, TimeWindow>() { @Override public void process(Context context, Iterable<JSONObject> iterable, Collector<List<JSONObject>> collector) throws Exception { logger.info("进入时间窗口:"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); List<JSONObject> arrayList = new ArrayList<JSONObject>(); iterable.forEach(single -> { arrayList.add(single); }); if (arrayList.size() > 0) { collector.collect(arrayList); } } }); dataBaseStream.print("userStream"); dataBaseStream.addSink(new MyRishSinkFileWriter()); 测试:

方便测试,先将时间改为每30秒执行,Time.seconds(30),循环发送kafka数据:

第1个时间窗口到达:Iterable中集合了这30秒接收的所有实时数据,统一处理

第2个时间窗口达到:

总结:

Flink是实时处理,window机制可以认为是flink的批处理实现,因为需要等待水位线对齐触发timer。一般还会基于时间窗口做一些统计,如Flink按统计Kafka中每小时的数量并输出到MySQL 。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。