首页 > 编程知识 正文

spring batch应用场景,flink读取es数据

时间:2023-05-05 17:40:16 阅读:159157 作者:4671

方便的数据读写-springbatch(5)结合beetlSql读写数据tags: springbatch

读写文章目录中的数据-springbatch(5) ) ) )。 结合beetlSql读写数据1 .引言2 .开发环境3.BeetlSQL简要说明3.BeetlSQL读写数据库部署3.BeetlSQL部署4.2依赖于多数据源创建的dao4.2.1配置文件4创建读取组件ItemReader4.5部件ItemWriter4.6装配件完成任务4.6.1读写组件4.6.2装配件任务4.7测试5 .总结

1 .引言

上一篇文章《决战数据库-spring batch(4)数据库到数据库》使用Spring Batch中的嵌入式读/写组件来同步数据库之间的数据。 相比之下,此数据读取和数据写入是基于jdbc读写的。 (数据对象的映射需要独特的处理,如UserRowMapper。 目前,使用更高级别的ORM框架(如Hibernate、MyBatis和BeetlSQL )的开发很常见。 对于Hibernate,Spring Batch提供了缺省HibernateCursorItemReader和HibernateItemWriter,您也可以自己使用MyBatis和BeetlSQL。 从易用性和学习曲线来看,我个人觉得BeetlSQL更容易使用,社区也更活跃。 因此,本文将介绍如何使用Spring Batch与BeetlSQL结合使用来实现从数据库到数据库的数据同步。

2 .开发环境JDK : JDK 1.8 spring boot :2.1.4.releasespringbatch :4.1.2.releasebeetlsql 33601.1.77.release开发id l的简要说明官方文档显示,BeeTLSQL是一个全功能的DAO工具,具有Hibernate的优点Mybatis的优点功能,因此需要能够以SQL为中心,同时自动生成很多常用的DAO工具,详细内容如下从最近一段时间的使用过程中,从开发效率、维护性、易用性来看,我觉得很优秀。

使用BeetlSQL读写数据库的此示例基于上一篇文章中的示例功能。 从源数据库读取和处理test_user表中的数据,然后将其写入目标数据库中的test_user表。 但是,不使用Spring Batch中的内置JdbcCursorItemReader和JdbcBatchItemWriter,而是使用BeetlSQL进行读写。 完整的示例可参考代码

4.1BeetlSQL的引入依赖BeeTLSQL提供Spring Boot的开始器,实现自动配置。 在pom.xml中添加以下依赖关系即可。

! -- orm框架: beetl SQL---- dependencygroupidcom.I beetl/groupidartifactidbeetl-framework-starter/artifactidvvid

4.2创建多数据源的dao 4.2.1配置文件。 由于使用多数据源进行读写,因此上一篇文章中已对多数据源的配置进行了说明,此处不再赘述。 BeetlSQL支持多个数据源,配置简单。 具体请参考官方文档。 我来简单说明一下这个。 将以下配置添加到application.properties文件中:

#beetlsql配置#默认值/sql,#beetlsql.sqlPath=/sql#dao文件的后缀beetlsql.daoSuffix=Repository# 不要设置要自动加载和搜索的dao文件的软件包beetlSQL.basePackage=me.mason .缺省org.beetl.SQL.core.db.MySQL style为# beetl SQL.db style=org.beetl.SQL.core.db.MySQL style # 可以不设置多数据源dao文件的位置数据源beetl SQL.ds.data source.base package=me.mason.spring batch.Dao.localbeetlsql 要读写的SQL.ds.target data source.base package=me.mason.spring batch.Dao.targetbeetlsql.mutiple.data source=dase

beetlsql.daoSuffix表示d

ao文件的后缀,BeetlSql会根据此后缀加载Dao。beetlsql.mutiple.datasource数据源名与数据源的配置一致。 4.2.2 添加dao文件

添加上述配置后,由于使用包名来区分读写数据源的Dao,因此需要创建对应的包,分别在工程中me.mason.springbatch下创建dao.local,dao.origin,dao.target三个包,分别存放需要读取的三个数据源对应的Dao。在本示例中,仅使用源数据库和目标数据库,因此,仅需要在dao.origin中添加OriginUserRepository进行源数据读操作,在dao.target中添加TargetUserRepository进行写操作即可。(注意,由于配置中指定是使用后缀Repository,因此此处的类名需要使用它作为后缀)。如下:

OriginUserRepository.java

@Repositorypublic interface OriginUserRepository extends BaseMapper<User> { List<User> getOriginUser(Map<String,Object> params);}

TargetUserRepository.java

@Repositorypublic interface TargetUserRepository extends BaseMapper<User> {}

说明:

使用注解@Repository标注是数据读写dao继承BaseMapper,以使用BeetlSQL内置的增删改查能力对于OriginUserRepository,getOriginUser是自定义的数据读取操作,此操作具体实现是使用写在sql/user.md中的sql语句(sql语句在后面将说明)。 4.3 编写sql文件

根据BeetlSql的功能,开发者可以自定义sql语句进行数据库操作,而sql语句是以markdown文件的形式保存,支持beetl的语法,支持参数化语句,逻辑判断等操作,这就有点像Mybatis中的xml语句,但显示和修改会更友好。具体sql文件更详细的使用功能,读者可到官文档查阅。

本示例中,上面OriginUserRepository自定义了getOriginUser函数,以此函数名即可编写sql进行读数据(当然,由于此sql比较简单,完全可以不写到markdown文件也可以实现,此处仅做示例展示此功能而已)。如下所示:

getOriginUser===* 查询user数据select * from test_user

接着就是写数据需要用到的sql,insertUser是这条语句的名称,在插入数据时会使用到:

insertUser===* 插入数据insert into test_user(id,name,phone,title,email,gender,date_of_birth ,sys_create_time,sys_create_user,sys_update_time,sys_update_user)values (#id#,#name#,#phone#,#title#,#email#,#gender#,#dateOfBirth# ,#sysCreateTime#,#sysCreateUser#,#sysUpdateTime#,#sysUpdateUser#)

由上面的sql可见,这跟我们平时写sql没有区别,其中以##包括的是参数,即实体User的字段。

4.4 编写读组件ItemReader

经过上面的配置和添加的dao类,我们已经有了读和写数据的能力。使用OriginUserRepository,可以编写Spring Batch的ItemReader。读取数据后在内存中,在read()时返回。如下所示:

@Slf4jpublic class UserItemReader implements ItemReader<User> { protected List<User> items; protected Map<String,Object> params; @Autowired private OriginUserRepository originUserRepository; @Override public User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { if(Objects.isNull(items)){ //使用beetlsql的md执行sql items = originUserRepository.getOriginUser(params); if(items.size() > 0){ return items.remove(0); } }else{ if (!items.isEmpty()) { return items.remove(0); } } return null; } public Map<String, Object> getParams() { return params; } public void setParams(Map<String, Object> params) { this.params = params; }}

说明:

originUserRepository.getOriginUser执行的是user.md的getOriginUser查询语句。查询后数据保存在List<User>,在read()时返回其中的数据。全部返回后会返回null,以表示结束。BeetlSql支持使用Map来传参,本示例暂时没有使用。 4.5 编写写组件ItemWriter

读取到数据后,使用TargetUserRepository,执行上面编写的insertUser来插入数据。如下所示

public class UserItemWriter implements ItemWriter<User> { @Autowired private TargetUserRepository targetUserRepository; @Override public void write(List<? extends User> items) throws Exception { targetUserRepository.getSQLManager().updateBatch("user.insertUser",items); }}

说明:

使用SQLManager的updateBatch批量写数据user.insertUser中,user是markdown文件的名称,也是实体名称,insertUser是写的sql语句。 4.6 组装完整任务

新建BeetlsqlBatchConfig.java,作为Spring Batch的任务配置

4.6.1 注入读写组件

使用上面已写的Reader和Writer,使用Bean注解加入,如下:

@Beanpublic ItemReader beetlsqlItemReader() { UserItemReader userItemReader = new UserItemReader(); //设置参数,当前示例可不设置参数 Map<String,Object> params = CollUtil.newHashMap(); userItemReader.setParams(params); return userItemReader;}@Beanpublic ItemWriter beetlsqlWriter() { return new UserItemWriter();} 4.6.2 组装任务

使用Step和Job,实现完整的任务配置,如下:

@Beanpublic Job beetlsqlJob(Step beetlsqlStep,JobExecutionListener beetlsqlListener){ String funcName = Thread.currentThread().getStackTrace()[1].getMethodName(); return jobBuilderFactory.get(funcName) .listener(beetlsqlListener) .flow(beetlsqlStep) .end().build();}@Beanpublic Step beetlsqlStep(ItemReader beetlsqlItemReader ,ItemProcessor beetlsqlProcessor ,ItemWriter beetlsqlWriter){ String funcName = Thread.currentThread().getStackTrace()[1].getMethodName(); return stepBuilderFactory.get(funcName) .<User,User>chunk(10) .reader(beetlsqlItemReader) .processor(beetlsqlProcessor) .writer(beetlsqlWriter) .build();} 4.7 测试

参考上一文章的Db2DbJobTest,编写BeetlsqlJobTest文件。测试前先把目标数据库中的test_user库清空,然后启动Job进行测试,结果跟Db2DbJobTest是一致的。日志输出如下:

在上面使用BeetlSql过程中,可见有几个好处:

不需要自己写RowMapper对数据进行映射,更简单。sql语句写在markdown文件中,修改更灵活。sql语句执行输出在日志中,更清晰。 5.总结

本文使用Spring Batch对数据库到数据库的示例做了一个改动,使用BeetlSQL进行多数据源的读写操作,以实现更简单、更灵活、更清晰的数据库读写。希望对想要用Spring Batch同时又想了解BeetlSql的读者有帮助。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。