首页 > 编程知识 正文

flink连接mysql,flink读取mysql数据

时间:2023-05-04 09:21:49 阅读:107599 作者:3180

概要

flink的sink是flink的三大逻辑结构之一的“source、transform、sink”,负责将由flink处理的数据输出到外部系统。 flink的sink和source的代码结构类似。

在编写代码的过程中,可以使用flink已经提供的sink,例如kafka、jdbc和es。 当然,也可以定制自己的sink。 介绍核心类

核心类

SinkFunction是一种类似于SourceFunction界面的界面。 SinkFunction主要包含用于数据输出的invoke方法,每条记录都执行invoke方法进行输出操作。

//writesthegivenvaluetothesink.thisfunctioniscalledforeveryrecord.defaultvoidinvoke (in value ) throws Exception

efaultvoidinvoke(invalue,Context context ) throws Exception

//Context在界面中返回有关时间的信息的interface Context {

/* * returnsthecurrentprocessingtime.* /

long currentProcessingTime (;

/* * returnsthecurrentevent-time watermark.* /

长当前水印(;

/* * returnsthetimestampofthecurrentinputrecordor { @ code null } iftheelementdoesnot * haveanassignedtimestamp.* /

长时间(;

}

我们一般在定制Sink时会继承AbstractRichFunction。 他是抽象类,实现了RichFunction接口。

publicabstractclassabstractrichfunctionimplementsrichfunction,Serializable

它还提供了RuntimContext的操作和open、clone方法。

AbstractRichFunction有许多实现类,包括msyql操作的JDBCSinkFunction,例如直接输出结果的PrintSinkFunction。 在我们开发期间,我们的进程用print语句打印结果,但在print函数中称为PrintSinkFunction

public DataStreamSink print ()

printsinkfunctionprintfunction=newprintsinkfunction (;

returnaddsink(printfunction ).name ) printtoSTD.out;

}

打印链接函数

现在,让我们分析一下名为PrintSinkFunction的类。 该类不将元素输出到标准输出或标准错误输出流。

publicclassprintsinkfunctionextendsrichsinkfunction {

privatestaticfinallongserialversionuid=1l;

privatefinalprintsinkoutputwriter;

/* * * instantiatesaprintsinkfunctionthatprintstostandardout.* /

public PrintSinkFunction (

writer=newprintsinkoutputwriter (false;

}

/* * * instantiatesaprintsinkfunctionthatprintstostandardout.* * @ paramstderrtrue,iftheformatshouldprinttostandarderrorioue

publicprintsinkfunction (finalbooleanstderr ) (

writer=newprintsinkoutputwriter (stderr;

}

/* * instantiatesaprintsinkfunctionthatprint

s to standard out and gives a sink identifier.** @param stdErr True, if the format should print to standard error instead of standard out.* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value*/

public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {

writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);

}

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();

writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());

}

@Override

public void invoke(IN record) {

writer.write(record);

}

@Override

public String toString() {

return writer.toString();

}

}

分析:

1、调用构造函数来创建一个PrintSinkOutputWriter

2、调用open方法中在调用PrintSinkOutputWriter 的open方法,进行初始化

3、调用invoke方法,通过PrintSinkOutputWriter 的writer方法吧record输出

自定义sink

我们这里自定义一个msyql的sink,也就是把flink中的数据,最后输出到mysql中。

public class MyMysqlSink extends RichSinkFunction {

private PreparedStatement ps = null;

private Connection connection = null;

String driver = "com.mysql.jdbc.Driver";

String url = "jdbc:mysql://127.0.0.1:3306/flinkdb";

String username = "root";

String password = "root";

// 初始化方法 @Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

// 获取连接 connection = getConn();

//执行查询 ps = connection.prepareStatement("select * from person;");

}

private Connection getConn() {

try {

Class.forName(driver);

connection = DriverManager.getConnection(url, username, password);

} catch (Exception e) {

e.printStackTrace();

}

return connection;

}

//Writes the given value to the sink. This function is called for every record. //每一个元素的插入,都会被调用一次invoke方法 @Override

public void invoke(Person p, Context context) throws Exception {

ps.setString(1,p.getName());

ps.setInt(2,p.getAge());

ps.executeUpdate();

}

@Override

public void close() throws Exception {

super.close();

if(connection != null){

connection.close();

}

if (ps != null){

ps.close();

}

}

}

程序调用入口

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 添加自定义数据源 DataStreamSource data = env.addSource(new MyMysqlSource());

data.print().setParallelism(2);

data.addSink(new MyMysqlSink());

// 提交执行任务env.execute("MySourceMysql");

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。