首页 > 编程知识 正文

kafka版本号,redis kafka

时间:2023-05-05 17:29:10 阅读:107609 作者:4192

前篇:使用Sink-kafkaSink

需求:输入数据并将数据写入redis

直接代码:一个无限流动的程序

创建nc -lk 8888并输入数据后,可以在redis中写入数据

package cn._51doit.flink.day01; importorg.Apache.flink.API.com mon.functions.flatmapfunction; importorg.Apache.flink.API.Java.functions.key selector; importorg.Apache.flink.API.Java.tuple.tuple 2; importorg.Apache.flink.streaming.API.datastream.datastream; importorg.Apache.flink.streaming.API.datastream.keyed stream; importorg.Apache.flink.streaming.API.datastream.singleoutputstreamoperator; importorg.Apache.flink.streaming.API.environment.streamexecutionenvironment; importorg.Apache.flink.streaming.connectors.redis.redis sink; importorg.Apache.flink.streaming.connectors.redis.com mon.config.flinkjedispoolconfig; importorg.Apache.flink.streaming.connectors.redis.com mon.mapper.redis command; importorg.Apache.flink.streaming.connectors.redis.com mon.mapper.rediscommanddescription; importorg.Apache.flink.streaming.connectors.redis.com mon.mapper.redis mapper; importorg.Apache.flink.util.collector;/* * *使用sink-redis sink *要求:从指定套接字读取数据,计算单词,然后将结果写入redis * * (1) pom文件中的redis相关* */publicclassredis args ) Throwsexceptiion Flink流式计算运行时环境(ExecutionEnvironment ) streamexecutionenvironmentenv=streamexecutionenviron )//datastream//sourcedatastreamstringlines=env.socket text stream (' master ',8888 ); //transformation开始//transformation (优化代码(singleoutputstreamoperatortuple2string,integerwordandone=lines.flat map ) Tuple2String,integer ({ @ overridepublicvoidflatmap (string line,CollectorTuple2String,Integer collector ) ) throwsexcepep collector.collect(tuple2.of ) word,2 ); }}; KeyedStreamTuple2String,Integer,String keyed=wor dandone.keyby (newkeyselectortuple2string,Integer,string () overrred ) }; ); //singleoutputstreamoperatortuple 2聚合字符串,并输入integersummed=keyed.sum(1; //Transformation退出调用Sinnk //以创建redis连接池的flinkjedispoolconfigconf=newflinkjedispoolconfig.builder (.sethost ) summed.addsink (newredissinktuple2string,integer(conf,new RedisWordCountMapper ) ); 开始执行//env.execute (streaming word count ); } /** *通过定制类并进行字段绑定【映射】*/publicstaticclassrediswordcountmapperimplementsredismappertuple2string integer { @ overridepublicrediscommanddescriptiongetcommanddescription (/将数据写入reis ) (在flink中没有进行redis的累计) 例如Keeis覆盖) returnnewrediscommanddescription (redis command.hset,' WORD_COUNT ' ); } @ overridepublicstringgetkeyfromdata (tuple2string,Integer data ) { return data.f0; } @ overridepublicstringgetvaluefromdata (tuple2string,Integer data ) { return data.f1.toString ); } }查看job:http://localhost :58014/#/job/b 31166 df 92 d 7ba 97 b0CB B1 e 70 f7C3 b 7a/overview

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。