概要
flink的sink是flink的三大逻辑结构之一的“source、transform、sink”,负责将由flink处理的数据输出到外部系统。 flink的sink和source的代码结构类似。
在编写代码的过程中,可以使用flink已经提供的sink,例如kafka、jdbc和es。 当然,也可以定制自己的sink。 介绍核心类
核心类
SinkFunction是一种类似于SourceFunction界面的界面。 SinkFunction主要包含用于数据输出的invoke方法,每条记录都执行invoke方法进行输出操作。
//writesthegivenvaluetothesink.thisfunctioniscalledforeveryrecord.defaultvoidinvoke (in value ) throws Exception
efaultvoidinvoke(invalue,Context context ) throws Exception
//Context在界面中返回有关时间的信息的interface Context {
/* * returnsthecurrentprocessingtime.* /
long currentProcessingTime (;
/* * returnsthecurrentevent-time watermark.* /
长当前水印(;
/* * returnsthetimestampofthecurrentinputrecordor { @ code null } iftheelementdoesnot * haveanassignedtimestamp.* /
长时间(;
}
我们一般在定制Sink时会继承AbstractRichFunction。 他是抽象类,实现了RichFunction接口。
publicabstractclassabstractrichfunctionimplementsrichfunction,Serializable
它还提供了RuntimContext的操作和open、clone方法。
AbstractRichFunction有许多实现类,包括msyql操作的JDBCSinkFunction,例如直接输出结果的PrintSinkFunction。 在我们开发期间,我们的进程用print语句打印结果,但在print函数中称为PrintSinkFunction
public DataStreamSink print ()
printsinkfunctionprintfunction=newprintsinkfunction (;
returnaddsink(printfunction ).name ) printtoSTD.out;
}
打印链接函数
现在,让我们分析一下名为PrintSinkFunction的类。 该类不将元素输出到标准输出或标准错误输出流。
publicclassprintsinkfunctionextendsrichsinkfunction {
privatestaticfinallongserialversionuid=1l;
privatefinalprintsinkoutputwriter;
/* * * instantiatesaprintsinkfunctionthatprintstostandardout.* /
public PrintSinkFunction (
writer=newprintsinkoutputwriter (false;
}
/* * * instantiatesaprintsinkfunctionthatprintstostandardout.* * @ paramstderrtrue,iftheformatshouldprinttostandarderrorioue
publicprintsinkfunction (finalbooleanstderr ) (
writer=newprintsinkoutputwriter (stderr;
}
/* * instantiatesaprintsinkfunctionthatprint
s to standard out and gives a sink identifier.** @param stdErr True, if the format should print to standard error instead of standard out.* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value*/public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
}
@Override
public void invoke(IN record) {
writer.write(record);
}
@Override
public String toString() {
return writer.toString();
}
}
分析:
1、调用构造函数来创建一个PrintSinkOutputWriter
2、调用open方法中在调用PrintSinkOutputWriter 的open方法,进行初始化
3、调用invoke方法,通过PrintSinkOutputWriter 的writer方法吧record输出
自定义sink
我们这里自定义一个msyql的sink,也就是把flink中的数据,最后输出到mysql中。
public class MyMysqlSink extends RichSinkFunction {
private PreparedStatement ps = null;
private Connection connection = null;
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://127.0.0.1:3306/flinkdb";
String username = "root";
String password = "root";
// 初始化方法 @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 获取连接 connection = getConn();
//执行查询 ps = connection.prepareStatement("select * from person;");
}
private Connection getConn() {
try {
Class.forName(driver);
connection = DriverManager.getConnection(url, username, password);
} catch (Exception e) {
e.printStackTrace();
}
return connection;
}
//Writes the given value to the sink. This function is called for every record. //每一个元素的插入,都会被调用一次invoke方法 @Override
public void invoke(Person p, Context context) throws Exception {
ps.setString(1,p.getName());
ps.setInt(2,p.getAge());
ps.executeUpdate();
}
@Override
public void close() throws Exception {
super.close();
if(connection != null){
connection.close();
}
if (ps != null){
ps.close();
}
}
}
程序调用入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 添加自定义数据源 DataStreamSource data = env.addSource(new MyMysqlSource());
data.print().setParallelism(2);
data.addSink(new MyMysqlSink());
// 提交执行任务env.execute("MySourceMysql");