Flink(十五)Flink SQL Connector、savepoint、CateLog、Table API

前言

       今天一天争取搞完最后这一部分,学完赶紧把 Kafka 和 Flume 学完,就要开始做实时数仓了。据说是应届生得把实时数仓搞个 80%~90% 才能差不多找个工作,太牛马了。

1、常用 Connector 读写

        之前我们已经用过了一些简单的内置连接器,比如 'datagen' 、'print' ,其它的可以查看官网:Overview | Apache Flink

环境准备:

# 1. 先启动 hadoop
myhadoop start
# 2. 不需要启动 flink 只启动yarn-session即可
/opt/module/flink-1.17.0/bin/yarn-session.sh -d
# 3. 启动 flink sql 的环境 sql-client
./sql-client.sh embedded -s yarn-session

1.1、Kafka 

1)添加kafka连接器依赖

  • 将flink-sql-connector-kafka-1.17.0.jar上传到flink的lib目录下
  • 重启yarn-session、sql-client

        使用 kafka 连接器,我们需要清楚,我们用 Flink SQL 往连接器为 kafka 的表中插入数据就相当于 Flink 往 Kafka 写入数据,而我们查询 Flink SQL 表中的数据就相当于 从 Kafka 中读取数据。所以当我们建表时就需要初始化读取 Kafka 数据和消费 Kafka 数据的参数。

2)创建 kfaka 的映射表

CREATE TABLE t1( 
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
id int, 
ts bigint , 
vc int )
WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'hadoop102:9092',
  'properties.group.id' = 'lyh',
-- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
  'scan.startup.mode' = 'earliest-offset',
  -- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
'sink.partitioner' = 'fixed',
  'topic' = 'ws1',
  'format' = 'json'
) 

上面有一个参数 'sink.partitioner' 的值是 'fixed' ,我们之前学过 Kafka 的生产者的分区器有默认的 hash分区器和粘性分区器,这种 fixed 分区器是 kafka 为flink实现的 ,一个并行度只写往一个 kafka 分区,我们可以查看一下 FlinkFixedPartition 的源码:

创建好的表格是没有数据的,所以我们再创建一个数据源往 kfaka 里插入数据:

Flink SQL> CREATE TABLE source ( 
>     id INT, 
>     ts BIGINT, 
>     vc INT
> ) WITH ( 
>     'connector' = 'datagen', 
>     'rows-per-second'='1', 
>     'fields.id.kind'='random', 
>     'fields.id.min'='1', 
>     'fields.id.max'='10', 
>     'fields.ts.kind'='sequence', 
>     'fields.ts.start'='1', 
>     'fields.ts.end'='1000000', 
>     'fields.vc.kind'='random', 
>     'fields.vc.min'='1', 
>     'fields.vc.max'='100'
> );

插入数据:

insert into t1(id,ts,vc) select * from source;

查询 kafka 表:

select * from t1;

3)upsert-kafka 表

        如果当前表存在更新操作,那么普通的kafka连接器将无法满足(因为普通的连接器不支持更新操作),此时可以使用Upsert Kafka连接器。

        Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。

        作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。

        作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

1)创建upsert-kafka的映射表(必须定义主键)

CREATE TABLE t2( 
    id int , 
    sumVC int ,
    -- 主键必须 not enforced
    primary key (id) NOT ENFORCED 
)
WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = 'hadoop102:9092',
  'topic' = 'ws2',
  'key.format' = 'json',
  'value.format' = 'json'
)

2)插入 upset-kafka 表

insert into t2 select  id,sum(vc) sumVC  from source group by id;

3)查询 upset-kafka 表

select * from t2;

查询结果: 

可以看到,upsert-kafka 表是支持数据更新操作的。

1.2、File

Flink 天生就支持本地系统、HDFS 等。

1)创建 FileSystem 映射表

CREATE TABLE t3( id int, ts bigint , vc int )
WITH (
  'connector' = 'filesystem',
  -- 如果是本地系统就用 file:/// 
  'path' = 'hdfs://hadoop102:8020/data/t3',
  'format' = 'csv'
);

注意:之前我们在 flink 的 lib 目录下放了 hive 的连接器,这个包会和 flink 的依赖产生冲突:java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.DialectFactory 我们需要把这个依赖移除掉或者改名并重启 sqlSession :

# 重命名连接器
mv flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar.del
# yarn web端 kill 掉job
# 重启 yarn-session
bin/yarn-session.sh -d
bin/sql-client.sh embedded -s yarn-session.sh -i sql-client-init.sql

插入数据:

查询插入结果:

除了上面这种方式,我们还可以把 flink 目录下 opt/ 的 flink-table-planner-1.17.0.jar 和 lib/ 下面的 flink-table-planner-loader-1.17.0.jar 替换一下位置,这样我们就不用把 hive 的连接器移除带了。

1.3、JDBC

        Flink在将数据写入外部数据库时使用DDL中定义的主键。如果定义了主键,则连接器以upsert模式操作,否则,连接器以追加模式操作。

        在upsert模式下,Flink会根据主键插入新行或更新现有行,Flink这样可以保证幂等性。为了保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。在追加模式下,Flink将所有记录解释为INSERT消息,如果底层数据库中发生了主键或唯一约束违反,则INSERT操作可能会失败。

1)mysql 的 test 库中建表

CREATE TABLE `ws2` (
  `id` int(11) NOT NULL,
  `ts` bigint(20) DEFAULT NULL,
  `vc` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

上传jdbc连接器的jar包和mysql的连接驱动包到flink/lib下:

  • flink-connector-jdbc-1.17-20230109.003314-120.jar
  • mysql-connector-j-5.1.7.jar
CREATE TABLE t4
(
    id INT,
    ts BIGINT,
    vc INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url' = 'jdbc:mysql://hadoop102:3306/test?useUnicode=true&characterEncoding=UTF-8',
    'username' = 'root',
    'password' = '123456',
    'connection.max-retry-timeout' = '60s',
    'table-name' = 'ws2',
    'sink.buffer-flush.max-rows' = '500',
    'sink.buffer-flush.interval' = '5s',
    'sink.max-retries' = '3',
    'sink.parallelism' = '1'
);

测试插入数据:

insert into t4 values(1,1,1);

 查看结果:

 这里,因为我们给 mysql 的这张表设置了主键,所以默认当出现和主键字段相同的新数据时,会直接以 upsert 的方式操作:

insert into t4 values(1,2,2);

运行结果: 

注意:我们这个表是和 mysql 关联的,所以我们不管对 mysql 操做还是对这张映射表操作都会互相影响,上面我们修改了映射表 t4 之后,同样会修改到 mysql 表 ws2(除了删除表格,删除flink sql 中的表格并不会删除mysql 中的表格)

 

如果我们希望使用追加模式,就必须保证 mysql 表和 Flink SQL 表都是没有主键的。

2、sql-client 使用 savepoint

1)提交一个insert作业,可以给作业设置名称

Flink SQL> create table sink(
> id int,
> ts bigint,
> vc int
> )with(
> 'connector' = 'print'
> );
insert into sink select * from source;

2)查看 job 列表

查看 job 列表是为了获得 job id,我们提交作业的时候会返回一个 job id 可以在 shell 命令行看到,或者从 web ui 端也可以看到,再或者通过下面的命令看:

show jobs;

3)停止作业,触发 savepoint

SET state.checkpoints.dir='hdfs://hadoop102:8020/chk';
SET state.savepoints.dir='hdfs://hadoop102:8020/sp';
-- 结束作业不设置保存点
stop job 'e6d3e9afed97aee7819c460a6e109445';
-- 结束作业设置保存点
stop job 'e6d3e9afed97aee7819c460a6e109445' with savepoint;

4)从 savepoint 恢复

-- 设置从savepoint恢复的路径 
SET execution.savepoint.path='hdfs://hadoop102:8020/sp/savepoint-0e0742-7e2154873185';  

-- 之后直接提交sql,就会从savepoint恢复

--允许跳过无法还原的保存点状态
set 'execution.savepoint.ignore-unclaimed-state' = 'true'; 

5)恢复后重置路径

注意:我们设置 savepoint 恢复路径后,之后的所有 insert 任务都会默认使用这个 savepoint,所以下一个作业一定要重置这个配置参数:

指定execution.savepoint.path后,将影响后面执行的所有DML语句,可以使用RESET命令重置这个配置选项。

RESET execution.savepoint.path;

 如果出现reset没生效的问题,可能是个bug(包括 pipeline.name 这个参数也是),我们可以退出sql-client,再重新进,不需要重启flink的集群。

3、CateLog

        Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。它本来翻译过来就是目录,我们可以理解为它就是数据库的目录。

        数据处理最关键的方面之一是管理元数据。元数据可以是临时的,例如临时表、UDF。我们之前上面使用的表都是基于内存的一个 Catelog ,所以每次我们退出 sql-client 客户端的时候,这些表和数据库就不见了。元数据也可以是持久化的,例如 Hive MetaStore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

        Catalog 允许用户引用其数据存储系统中现有的元数据,并自动将其映射到 Flink 的相应元数据。例如,Flink 可以直接使用 Hive MetaStore 中的表的元数据,不必在Flink中手动重写ddl,也可以将 Flink SQL 中的元数据存储到 Hive MetaStore 中。Catalog 极大地简化了用户开始使用 Flink 的步骤,并极大地提升了用户体验。

        注意:catalog 可以使得 mysql 、hive 和 flink 互通有无,互通就是可以操作读写(除了建表),而不是说只是在某个生命周期内起作用,只要连接上,flink 操作的就是实实在在的 hive 、mysql 本身,这才叫互通,而不是自嗨。

3.1、CateLog 类型

目前 Flink 包含了以下四种 Catalog:

  • GenericInMemoryCatalog:基于内存实现的 Catalog,所有元数据只在session 的生命周期(即一个 Flink 任务一次运行生命周期内)内可用。默认自动创建,会有名为“default_catalog”的内存Catalog,这个Catalog默认只有一个名为“default_database”的数据库。
  • JdbcCatalog:JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前仅有的两种JDBC Catalog实现,将元数据存储在数据库中。
  • HiveCatalog:有两个用途,一是单纯作为 Flink 元数据的持久化存储,二是作为读写现有 Hive 元数据的接口。注意:Hive MetaStore 以小写形式存储所有元数据对象名称。Hive Metastore以小写形式存储所有元对象名称,而 GenericInMemoryCatalog会区分大小写。
  • 用户自定义 Catalog:用户可以实现 Catalog 接口实现自定义 Catalog。从Flink1.16开始引入了用户类加载器,通过CatalogFactory.Context#getClassLoader访问,否则会报错ClassNotFoundException。

3.2、JdbcCatalog(MySQL)

JdbcCatalog不支持建表,只是打通flink与mysql的连接,可以去读写mysql现有的库表。

1)上传所需jar包到lib下

  • flink-connector-jdbc-1.17-20230109.003314-120.jar
  • mysql-connector-j-5.1.7.jar

注意:Flink 是冷加载,所以上传后需要重启 yarn-session 和 sql-client

2)创建Catalog

JdbcCatalog支持以下选项:

  • name:必需,Catalog名称。
  • default-database:必需,连接到的默认数据库。
  • username: 必需,Postgres/MySQL帐户的用户名。
  • password:必需,该帐号的密码。
  • base-url:必需,数据库的jdbc url(不包含数据库名)

对于Postgres Catalog,是"jdbc:postgresql://<ip>:<端口>"

对于MySQL Catalog,是"jdbc: mysql://<ip>:<端口>"

CREATE CATALOG my_jdbc_catalog WITH(
    'type' = 'jdbc',
    -- 这里指定的只是默认使用的数据库 它会把所有数据库导进这个catalog下
    'default-database' = 'test',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://hadoop102:3306'
);

3)查看 Catalog 

show catalogs;

4)使用指定的 Catalog

use catalog my_jdbc_catalog;

我们发现,除了 mysql 的系统数据库看不到,别的都别导进来了。

我们也可以直接往表中插入数据,而不用向之前那样去建立映射表:

insert into ws2 values(2,2,2);

 注意:在 jdbcCatalog 下是不支持建表的,什么表都不行(映射表或者普通表)!

要建表需要返回到之前默认的 default_catalog 才可以,但是我们是可以从 jdbc_catalog 去查 default_catalog 下的表数据的。

select * from default_catalog.mydatabase.source;

此外,我们也可以把不同类型catalog下不同的表数据关联在一起:

select * from default_catalog.mydatabase.source s join my_jdbc_catalog.test.ws2 w on s.id=w.id;

最后,每次我们退出 sql-client 的时候,其实我们创建的 jdbc_catalog 还是会被删除的,所以我们最好把创建catalog这些命令写进一个 sql 文件,初始化启动 sql-client 的时候执行一下。

3.3、HiveCatalog 

同样,HiveCatalog 可以打通所有 Hive 的库和表,这样我们就可以在 Flink 直接读写 Hive 表。此外,我们还可以在 catalog 下创建我们 Flink 的表,比如带有 Kafka 连接器的表,而且即使我们退出客户端,再次进去 HiveCatalog ,那张表还是存在的。

1)上传 jar 包

  • flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar
  • mysql-connector-j-5.1.7.jar(我的 Hive 元数据存储在 MySQL)

2)更换planner依赖

只有在使用Hive方言或HiveServer2时才需要这样额外的计划器jar移动,但这是Hive集成的推荐设置。

这个我们之前使用 FileSystem 创建映射表的时候已经做过了。

3)重启flink集群和sql-client

4)启动外置的hive metastore服务

Hive metastore必须作为独立服务运行,也就是hive-site中必须配置hive.metastore.uris。(必须启动 hive 的元数据服务,不然我们flink无法获取hive中的数据)

# & 的意思是后台启动
# hive --service metastore &
# 这里直接启动我的 hive
hiveservice.sh start
# 查看hive 启动没有
hiveservice status

启动 hive 后会一直挂在那,我们可以判断一下元数据服务是否启动:

netstat -anp|grep 9083
# 或者
ps -ef|grep -i metastore

5)创建 Catalog 

配置项

必需

默认值

类型

说明

type

Yes

(none)

String

Catalog类型,创建HiveCatalog时必须设置为'hive'。

name

Yes

(none)

String

Catalog的唯一名称

hive-conf-dir

No

(none)

String

包含hive -site.xml的目录,需要Hadoop文件系统支持。如果没指定hdfs协议,则认为是本地文件系统。如果不指定该选项,则在类路径中搜索hive-site.xml。

default-database

No

default

String

Hive Catalog使用的默认数据库

hive-version

No

(none)

String

HiveCatalog能够自动检测正在使用的Hive版本。建议不要指定Hive版本,除非自动检测失败。

hadoop-conf-dir

No

(none)

String

Hadoop conf目录的路径。只支持本地文件系统路径。设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。只有当环境变量不适合你时才使用该选项,例如,如果你想分别配置每个HiveCatalog。

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/opt/module/hive-3.1.2/conf'
);

 6)查看 catalog

我们在 hive 中创建一个数据库 test 再创建一张表 ws:

我们再往 ws 中插入一条数据:

hive(test)> insert into ws values(1,1,1);

我们可以看到,即使是插入一条数据,hive 也是转换为一个 MapReduce 作业,所以很慢。

不对劲,是完全卡死了,估计是 flink 和 hive 同时占用 yarn 的资源,导致资源不足 的原因(暂且怀疑是 Yarn 的CPU核数的配置问题),在修改完 yarn 的最大内存发现在 flink 往 hive 插入查询数据都没有问题了,但是 hive 还是不行,那就暂且用 flink 端操作 hive 吧。

没毛病,在 flink sql 客户端往 hive 插入数据后,在 flink 和 hive 中都可以看到结果。

        此外,我们在 hiveCatalog 下创建一张连接器为 FileSystem 的表,那么这张表是只能在 flink 环境下才能查到的,在hive是只能看到有这张表,查不到数据的:

我们去 hive 端查看一下:

我们发现,hive 端可以查看到存在这张表。

当我们查的时候,发现直接报错,毕竟它不是一个满足 hive 规范的表。同样,我们在 hiveCatalog 下创建一个连接器为 jdbcCatalog 的表,同样在 flink sql 中也是可以查到并正常使用的,但是在 hive 端同样只能看到表名。

现在,我们退出 sql-client,我们重新变价一下初始化 sql 文件:

 重新启动:

我们可以看到,虽然每次退出 sql-client 之后 catalog 下次启动就消失了,但是catalog 下面的表不会消失,我们只需要创建对应的 catalog 即可。

我们再解决一下刚才 hive插入数据失败的问题:

关闭hadoop、hive,修改yarn-site.xml 中 yarn的最大cpu核数=8(无所谓,反正在自己电脑上测试)。重启 hadoop、hive,测试插入:

说明 hive 没有问题,但是还不能说明我们配置的 yarn 有用,我们再启动 flink 的 yarn-session 和 sql-client:

提交插入作业(插入数据到 hive):

没毛病,这下问题测彻底解决了。

4、Table API

4.1、引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
</dependency>

        这里的依赖是一个Java的“桥接器”(bridge),主要就是负责Table API和下层DataStream API的连接支持,按照不同的语言分为Java版和Scala版。

        如果我们希望在本地的集成开发环境(IDE)里运行Table API和SQL,还需要引入以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-loader</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>${flink.version}</version>
</dependency>

4.2、创建表环境

        对于Flink这样的流处理框架来说,数据流和表在结构上还是有所区别的。所以使用Table API和SQL需要一个特别的运行时环境,这就是所谓的“表环境”(TableEnvironment)。它主要负责:

(1)注册Catalog和表;

(2)执行 SQL 查询;

(3)注册用户自定义函数(UDF);

(4)DataStream 和表之间的转换。

每个表和SQL的执行,都必须绑定在一个表环境(TableEnvironment)中。TableEnvironment是Table API中提供的基本接口类,可以通过调用静态的create()方法来创建一个表环境实例。方法需要传入一个环境的配置参数EnvironmentSettings,它可以指定当前表环境的执行模式和计划器(planner)。执行模式有批处理和流处理两种选择,默认是流处理模式;计划器默认使用blink planner。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings
    .newInstance()
    .inStreamingMode()    // 使用流处理模式
    .build();

TableEnvironment tableEnv = TableEnvironment.create(setting);

对于流处理场景,其实默认配置就完全够用了。所以我们也可以用另一种更加简单的方式来创建表环境:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

这里我们引入了一个“流式表环境”(StreamTableEnvironment),它是继承自TableEnvironment的子接口。调用它的create()方法,只需要直接将当前的流执行环境(StreamExecutionEnvironment)传入,就可以创建出对应的流式表环境了。

4.3、创建表

表(Table)是我们非常熟悉的一个概念,它是关系型数据库中数据存储的基本形式,也是SQL执行的基本对象。

具体创建表的方式,有通过连接器(connector)和虚拟表(virtual tables)两种。

1)连接器表(Connector Tables)

最直观的创建表的方式,就是通过连接器(connector)连接到一个外部系统,然后定义出对应的表结构。

在代码中,我们可以调用表环境的executeSql()方法,可以传入一个DDL作为参数执行SQL操作。这里我们传入一个CREATE语句进行表的创建,并通过WITH关键字指定连接到外部系统的连接器:

tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");

这里的TEMPORARY关键字可以省略。关于连接器的具体定义,我们会在11.8节中展开讲解。

2)虚拟表(Virtual Tables)

在环境中注册之后,我们就可以在SQL中直接使用这张表进行查询转换了。

Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");

这里调用了表环境的sqlQuery()方法,直接传入一条SQL语句作为参数执行查询,得到的结果是一个Table对象。Table是Table API中提供的核心接口类,就代表了一个Java中定义的表实例。

由于newTable是一个Table对象,并没有在表环境中注册;所以如果希望直接在SQL中使用,我们还需要将这个中间结果表注册到环境中:

tableEnv.createTemporaryView("NewTable", newTable);

我们发现,这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与SQL语法中的视图(View)非常类似,所以调用的方法也叫作创建“虚拟视图”(createTemporaryView)。

4.4、表的查询

创建好了表,接下来自然就是对表进行查询转换了。对一个表的查询(Query)操作,就对应着流数据的转换(Transform)处理。

Flink为我们提供了两种查询方式:SQL,和Table API。

1)执行SQL进行查询

基于表执行SQL语句,是我们最为熟悉的查询方式。

在代码中,我们只要调用表环境的sqlQuery()方法,传入一个字符串形式的SQL查询语句就可以了。执行得到的结果,是一个Table对象。

// 创建表环境
TableEnvironment tableEnv = ...; 

// 创建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");

// 查询用户Alice的点击事件,并提取表中前两个字段
Table aliceVisitTable = tableEnv.sqlQuery(
    "SELECT user, url " +
    "FROM EventTable " +
    "WHERE user = 'Alice' "
  );

目前Flink支持标准SQL中的绝大部分用法,并提供了丰富的计算函数。这样我们就可以把已有的技术迁移过来,像在MySQL、Hive中那样直接通过编写SQL实现自己的处理需求,从而大大降低了Flink上手的难度。

例如,我们也可以通过GROUP BY关键字定义分组聚合,调用COUNT()、SUM()这样的函数来进行统计计算:

Table urlCountTable = tableEnv.sqlQuery(
    "SELECT user, COUNT(url) " +
    "FROM EventTable " +
    "GROUP BY user "
);

上面的例子得到的是一个新的Table对象,我们可以再次将它注册为虚拟表继续在SQL中调用。另外,我们也可以直接将查询的结果写入到已经注册的表中,这需要调用表环境的executeSql()方法来执行DDL,传入的是一个INSERT语句:

// 注册表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");


// 将查询结果输出到OutputTable中
tableEnv.executeSql (
"INSERT INTO OutputTable " +
    "SELECT user, url " +
    "FROM EventTable " +
    "WHERE user = 'Alice' "
  );

2)调用Table API进行查询

另外一种查询方式就是调用Table API。这是嵌入在Java和Scala语言内的查询API,核心就是Table接口类,通过一步步链式调用Table的方法,就可以定义出所有的查询转换操作。

由于Table API是基于Table的Java实例进行调用的,因此我们首先要得到表的Java对象。基于环境中已注册的表,可以通过表环境的from()方法非常容易地得到一个Table对象:

Table eventTable = tableEnv.from("EventTable");

传入的参数就是注册好的表名。注意这里eventTable是一个Table对象,而EventTable是在环境中注册的表名。得到Table对象之后,就可以调用API进行各种转换操作了,得到的是一个新的Table对象:

Table maryClickTable = eventTable
        .where($("user").isEqual("Alice"))
        .select($("url"), $("user"));

这里每个方法的参数都是一个“表达式”(Expression),用方法调用的形式直观地说明了想要表达的内容;“$”符号用来指定表中的一个字段。上面的代码和直接执行SQL是等效的。

Table API是嵌入编程语言中的DSL,SQL中的很多特性和功能必须要有对应的实现才可以使用,因此跟直接写SQL比起来肯定就要麻烦一些。目前Table API支持的功能相对更少,可以预见未来Flink社区也会以扩展SQL为主,为大家提供更加通用的接口方式;所以我们接下来也会以介绍SQL为主,简略地提及Table API。

3)两种API的结合使用

可以发现,无论是调用Table API还是执行SQL,得到的结果都是一个Table对象;所以这两种API的查询可以很方便地结合在一起。

(1)无论是那种方式得到的Table对象,都可以继续调用Table API进行查询转换;

(2)如果想要对一个表执行SQL操作(用FROM关键字引用),必须先在环境中对它进行注册。所以我们可以通过创建虚拟表的方式实现两者的转换:

tableEnv.createTemporaryView("MyTable", myTable);

两种API殊途同归,实际应用中可以按照自己的习惯任意选择。不过由于结合使用容易引起混淆,而Table API功能相对较少、通用性较差,所以企业项目中往往会直接选择SQL的方式来实现需求。

4.5、输出表

表的创建和查询,就对应着流处理中的读取数据源(Source)和转换(Transform);而最后一个步骤Sink,也就是将结果数据输出到外部系统,就对应着表的输出操作。

在代码上,输出一张表最直接的方法,就是调用Table的方法executeInsert()方法将一个 Table写入到注册过的表中,方法传入的参数就是注册的表名。

// 注册表,用于输出数据到外部系统
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");

// 经过查询转换,得到结果表
Table result = ...

// 将结果表写入已注册的输出表中
result.executeInsert("OutputTable");

在底层,表的输出是通过将数据写入到TableSink来实现的。TableSink是Table API中提供的一个向外部系统写入数据的通用接口,可以支持不同的文件格式(比如CSV、Parquet)、存储数据库(比如JDBC、Elasticsearch)和消息队列(比如Kafka)。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;

public class SqlDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 1.创建表环境
        // 1.1 写法1:
//        EnvironmentSettings es = EnvironmentSettings.newInstance()
//                .inStreamingMode()
//                .build();
//        TableEnvironment table_env = TableEnvironment.create(es);

        // 1.2 写法2:
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);

        // TODO 2. 创建表
        table_env.executeSql("CREATE TABLE source ( 
" +
                "    id INT, 
" +
                "    ts BIGINT, 
" +
                "    vc INT
" +
                ") WITH ( 
" +
                "    'connector' = 'datagen', 
" +
                "    'rows-per-second'='1', 
" +
                "    'fields.id.kind'='random', 
" +
                "    'fields.id.min'='1', 
" +
                "    'fields.id.max'='10', 
" +
                "    'fields.ts.kind'='sequence', 
" +
                "    'fields.ts.start'='1', 
" +
                "    'fields.ts.end'='1000000', 
" +
                "    'fields.vc.kind'='random', 
" +
                "    'fields.vc.min'='1', 
" +
                "    'fields.vc.max'='100'
" +
                ");
");
        table_env.executeSql("CREATE TABLE sink (
" +
                "    id INT, 
" +
                "    sum_vc INT 
" +
                ") WITH (
" +
                "'connector' = 'print'
" +
                ");
");

        // TODO 3. 执行查询 查询的结果也是一张表
        // 3.1 使用sql进行查询
        Table table = table_env.sqlQuery("select id,sum(vc) as sum_vc from source where id > 5 group by id;");
        // 把 table 对象注册成为表名
        table_env.createTemporaryView("tmp",table);
//        table_env.sqlQuery("select * from tmp;");
        // 3.2 用table api查询
//        Table source = table_env.from("source");
//        Table result = source
//                .where($("id").isGreater(5))
//                .groupBy($("id"))
//                .aggregate($("vc").sum().as("sum_vc"))
//                .select($("id"), $("sum_vc"));

        // TODO 4. 输出表
        // 4.1 sql用法
        table_env.executeSql("insert into sink select * from tmp");
        // 4.2 table api
//        result.executeInsert("sink");
    }
}

运行结果: 

4.6 表和流的转换

既然都不用 SQL 了,我们不可能用 API 只是为了没事找事干,而是为了方便结合一些底层的 API,比如我们之前学的 DataSream API。

4.6.1、将流(DataStream)转换成表(Table)

1. 调用fromDataStream()方法

想要将一个DataStream转换成表很简单,可以通过调用表环境的fromDataStream()方法来实现,返回的就是一个Table对象。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 获取表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 读取数据源
SingleOutputStreamOperator<WaterSensor> sensorDS = env.fromSource(...)

// 将数据流转换成表
Table sensorTable = tableEnv.fromDataStream(sensorDS);

由于流中的数据本身就是定义好的POJO类型WaterSensor,所以我们将流转换成表之后,每一行数据就对应着一个WaterSensor,而表中的列名就对应着WaterSensor中的属性。

另外,我们还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:

// 提取Event中的timestamp和url作为表中的列
Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id"), $("vc"));

也可以通过表达式的as()方法对字段进行重命名:

// 将timestamp字段重命名为ts
Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id").as("sid"), $("vc"));

(2)调用createTemporaryView()方法

       调用fromDataStream()方法简单直观,可以直接实现DataStream到Table的转换;不过如果我们希望直接在SQL中引用这张表,就还需要调用表环境的createTemporaryView()方法来创建虚拟视图了。

        对于这种场景,也有一种更简洁的调用方式。我们可以直接调用createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后仍旧可以传入多个参数,用来指定表中的字段

tableEnv.createTemporaryView("sensorTable",sensorDS, $("id"),$("ts"),$("vc"));

 这样,我们接下来就可以直接在SQL中引用表sensorTable了。

4.6.2、将表(Table)转换成流(DataStream)

(1)调用toDataStream()方法

将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。例如,我们可以将2.4小节经查询转换得到的表aliceClickTable转换成流打印输出:

tableEnv.toDataStream(table).print();

(2)调用toChangelogStream()方法

urlCountTable这个表中进行了分组聚合统计,所以表中的每一行是会“更新”的。对于这样有更新操作的表,我们不应该直接把它转换成DataStream打印输出,而是记录一下它的“更新日志”(change log)。这样一来,对于表的所有更新操作,就变成了一条更新日志的流,我们就可以转换成流打印输出了。

代码中需要调用的是表环境的toChangelogStream()方法:

Table table = tableEnv.sqlQuery(
    "SELECT id, sum(vc) " +
    "FROM source " +
    "GROUP BY id "
  );

// 将表转换成更新日志流
tableEnv.toChangelogStream(table).print();

4.6.3、支持的数据类型

整体来看,DataStream中支持的数据类型,Table中也是都支持的,只不过在进行转换时需要注意一些细节。

1. 原子类型

在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称作“原子类型”。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在fromDataStream()方法里增加参数,用来重新命名列字段。

StreamTableEnvironment tableEnv = ...;

DataStream<Long> stream = ...;

// 将数据流转换成动态表,动态表只有一个字段,重命名为myLong
Table table = tableEnv.fromDataStream(stream, $("myLong"));
2. Tuple类型

当原子类型不做重命名时,默认的字段名就是“f0”,容易想到,这其实就是将原子类型看作了一元组Tuple1的处理结果。

Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元组中元素的属性名f0、f1、f2...。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的as()方法来进行重命名。

StreamTableEnvironment tableEnv = ...;

DataStream<Tuple2<Long, Integer>> stream = ...;

// 将数据流转换成只包含f1字段的表
Table table = tableEnv.fromDataStream(stream, $("f1"));

// 将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换
Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));

// 将f1字段命名为myInt,f0命名为myLong
Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));
3. POJO 类型

Flink也支持多种数据类型组合成的“复合类型”,最典型的就是简单Java对象(POJO 类型)。由于POJO中已经定义好了可读性强的字段名,这种类型的数据流转换成Table就显得无比顺畅了。

将POJO类型的DataStream转换成Table,如果不指定字段名称,就会直接使用原始 POJO 类型中的字段名称。POJO中的字段同样可以被重新排序、提却和重命名。

StreamTableEnvironment tableEnv = ...;

DataStream<Event> stream = ...;

Table table = tableEnv.fromDataStream(stream);
Table table = tableEnv.fromDataStream(stream, $("user"));
Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));
4. Row类型

Flink中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是Table中数据的基本组织形式。

Row类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;我们在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的“模式结构”(Schema)。

4.6.4、综合应用示例

package com.lyh.sql;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class TableStreamDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 2L, 2),
                new WaterSensor("s2", 2L, 21),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s3", 4L, 4)
        );


        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);

        // TODO 1. 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(sensorDS);
        // 或者指定保留哪些字段
//        table_env.fromDataStream(sensorDS,$("id"));
        // 注册
        table_env.createTemporaryView("sensor",sensorTable);

        Table result = table_env.sqlQuery("select id,sum(vc) from sensor group by id");
        Table filter = table_env.sqlQuery("select id,ts,vc from sensor where ts > 2");

        // TODO 2. 表 -> 流
        // 2.1 追加流
        table_env.toDataStream(filter,WaterSensor.class).print("filter");
        // 2.2 更新流(结果需要更新)
        table_env.toChangelogStream(result).print("sum");

        // 只要代码中调用了 DataStream 就需要使用 execute
        env.execute();
    }
}

运行结果:

4.7 自定义函数(UDF)

系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。

Flink的Table API和SQL提供了多种自定义函数的接口,以抽象类的形式定义。当前UDF主要有以下几类:

  1. 标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;
  2. 表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;
  3. 聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;
  4. 表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据。

4.7.1、整体调用流程

要想在代码中使用自定义的函数,我们需要首先自定义对应UDF抽象类的实现,并在表环境中注册这个函数,然后就可以在Table API和SQL中调用了。

(1)注册函数

注册函数时需要调用表环境的createTemporarySystemFunction()方法,传入注册的函数名以及UDF类的Class对象:

// 注册函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);

我们自定义的UDF类叫作MyFunction,它应该是上面四种UDF抽象类中某一个的具体实现;在环境中将它注册为名叫MyFunction的函数。

(2)使用Table API调用函数

在Table API中,需要使用call()方法来调用自定义函数:

tableEnv.from("MyTable").select(call("MyFunction", $("myField")));

这里call()方法有两个参数,一个是注册好的函数名MyFunction,另一个则是函数调用时本身的参数。这里我们定义MyFunction在调用时,需要传入的参数是myField字段。

(3)在SQL中调用函数

当我们将函数注册为系统函数之后,在SQL中的调用就与内置系统函数完全一样了:

tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");

可见,SQL的调用方式更加方便,我们后续依然会以SQL为例介绍UDF的用法。

4.7.2、标量函数(Scalar Functions

自定义标量函数可以把0个、 1个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数是“一对一”的转换。

想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类ScalarFunction,并实现叫作eval() 的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有的(public),而且名字必须是eval。求值方法eval可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型。

这里需要特别说明的是,ScalarFunction抽象类中并没有定义eval()方法,所以我们不能直接在代码中重写(override);但Table API的框架底层又要求了求值方法必须名字为eval()。这是Table API和SQL目前还显得不够完善的地方,未来的版本应该会有所改进。

下面我们来看一个具体的例子。我们实现一个自定义的哈希(hash)函数HashFunction,用来求传入对象的哈希值。

package com.lyh.sql;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;

// TODO 自定义函数的实现类
public class MyHashFunction extends ScalarFunction {
    // 接受任意类型的输入 返回int类型输出
    public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o){
        return o.hashCode();
    }
}
package com.lyh.sql;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class ScalarFunctionDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 2L, 2),
                new WaterSensor("s2", 2L, 21),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s3", 4L, 4)
        );


        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(sensorDS);
        table_env.createTemporaryView("sensor",sensorTable);

        //TODO 2. 注册函数
        table_env.createTemporaryFunction("hashFunction",MyHashFunction.class);

        // TODO 3. 调用自定义函数
        // 3.1 sql 用法
        table_env.sqlQuery("select hashFunction(id) from sensor")
                // 调用了 sql 的 execute 就不需要 env.execute()
                .execute()
                .print();
        // 3.2 table api 用法
        sensorTable
                // hashFunction的eval方法有注解的原因就是因为要告诉这里的参数类型
                .select(call("hashFunction",$("id")))
                .execute()
                .print();

    }
}

运行结果:

这里我们自定义了一个ScalarFunction,实现了eval()求值方法,将任意类型的对象传入,得到一个Int类型的哈希值返回。当然,具体的求哈希操作就省略了,直接调用对象的hashCode()方法即可。

另外注意,由于Table API在对函数进行解析时需要提取求值方法参数的类型引用,所以我们用DataTypeHint(inputGroup = InputGroup.ANY)对输入参数的类型做了标注,表示eval的参数可以是任意类型。

4.7.3、表函数(Table Functions

        跟标量函数一样,表函数的输入参数也可以是 0个、1个或多个标量值;不同的是,它可以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系。之前我们介绍过的窗口TVF,本质上就是表函数。

        类似地,要实现自定义的表函数,需要自定义类来继承抽象类TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction类本身是有一个泛型参数T的,这就是表函数返回数据的类型;而eval()方法没有返回类型,内部也没有return语句,是通过调用collect()方法来发送想要输出的行数据的。

        在SQL中调用表函数,需要使用LATERAL TABLE(<TableFunction>)来生成扩展的“侧向表”,然后与原始表进行联结(Join)。这里的Join操作可以是直接做交叉联结(cross join),在FROM后用逗号分隔两个表就可以;也可以是以ON TRUE为条件的左联结(LEFT JOIN)。

        下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数SplitFunction,可以将一个字符串转换成(字符串,长度)的二元组。

package com.lyh.sql;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

@FunctionHint(output = @DataTypeHint("ROW<word STRING,length INT>"))
public class MySplitFunction extends TableFunction<Row> {

    // 返回是 void ,用 collect 方法输出
    public void eval(String str){
        for (String word : str.split(" ")) {
            collect(Row.of(word,word.length()));
        }
    }
}
package com.lyh.sql;

import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class ScalarFunctionDemo2 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> wordsDS = env.fromElements(
                "hello flink",
                "hello hadoop",
                "hello kafka",
                "hello spark",
                "hello hive and impala"
        );
        
        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table sensorTable = table_env.fromDataStream(wordsDS,$("name"));// 字段名
        table_env.createTemporaryView("words",sensorTable);

        //TODO 2. 注册函数
        table_env.createTemporaryFunction("splitFunction",MySplitFunction.class);

        // TODO 3. 调用自定义函数
        // 交叉用法
        table_env.sqlQuery("select word,length from words,lateral table (splitFunction(name))")
                // 调用了 sql 的 execute 就不需要 env.execute()
                .execute()
                .print();

    }
}

运行结果:

左联结:

table_env
     .sqlQuery("select name,word,length from words left join lateral table (splitFunction(name)) on true")
      .execute()
      .print();

字段重命名:

.sqlQuery("select name,newWord,newLength from words left join lateral table (splitFunction(name)) as T(newWord,newLength) on true")

        这里我们直接将表函数的输出类型定义成了ROW,这就是得到的侧向表中的数据类型;每行数据转换后也只有一行。我们分别用交叉联结和左联结两种方式在SQL中进行了调用,还可以对侧向表的中字段进行重命名。

4.7.4、聚合函数(Aggregate Functions

用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据(也就是一个表)聚合成一个标量值。这是一个标准的“多对一”的转换。

聚合函数的概念我们之前已经接触过多次,如SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。而如果有些需求无法直接调用系统函数解决,我们就必须自定义聚合函数来实现功能了。

自定义聚合函数需要继承抽象类AggregateFunction。AggregateFunction有两个泛型参数<T, ACC>,T表示聚合输出的结果类型,ACC则表示聚合的中间状态类型。

Flink SQL中的聚合函数的工作原理如下:

(1)首先,它需要创建一个累加器(accumulator),用来存储聚合的中间结果。这与DataStream API中的AggregateFunction非常类似,累加器就可以看作是一个聚合状态。调用createAccumulator()方法可以创建一个空的累加器。

(2)对于输入的每一行数据,都会调用accumulate()方法来更新累加器,这是聚合的核心过程。

(3)当所有的数据都处理完之后,通过调用getValue()方法来计算并返回最终的结果。

所以,每个 AggregateFunction 都必须实现以下几个方法:

  1. createAccumulator()

这是创建累加器的方法。没有输入参数,返回类型为累加器类型ACC。

  1. accumulate()

这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定的,就是当前的累加器,类型为ACC,表示当前聚合的中间状态;后面的参数则是聚合函数调用时传入的参数,可以有多个,类型也可以不同。这个方法主要是更新聚合状态,所以没有返回类型。需要注意的是,accumulate()与之前的求值方法eval()类似,也是底层架构要求的,必须为public,方法名必须为accumulate,且无法直接override、只能手动实现。

  1. getValue()

这是得到最终返回结果的方法。输入参数是ACC类型的累加器,输出类型为T。

在遇到复杂类型时,Flink 的类型推导可能会无法得到正确的结果。所以AggregateFunction也可以专门对累加器和返回结果的类型进行声明,这是通过 getAccumulatorType()和getResultType()两个方法来指定的。

AggregateFunction 的所有方法都必须是 公有的(public),不能是静态的(static),而且名字必须跟上面写的完全一样。createAccumulator、getValue、getResultType 以及 getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以override;而其他则都是底层架构约定的方法。

下面举一个具体的示例,我们从学生的分数表ScoreTable中计算每个学生的加权平均分。

package com.lyh.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.AggregateFunction;

// 泛型 T:返回类型 ACC: 累加器类型<加权总和,权重总和>
public class MyWeightAvg extends AggregateFunction<Double, Tuple2<Integer,Integer>> {

    /**
     * 计算累加和
     * @param acc 累加器的类型 固定写法
     * @param score 分神
     * @param weight 权重
     */
    public void accumulate(Tuple2<Integer, Integer> acc,Integer score,Integer weight){
        acc.f0 += score*weight;
        acc.f1 += weight;
    }

    @Override
    public Double getValue(Tuple2<Integer, Integer> integerIntegerTuple2) {
        return integerIntegerTuple2.f0*1d/integerIntegerTuple2.f1;
    }

    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return Tuple2.of(0,0);
    }
}
package com.lyh.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class ScalarFunctionDemo3 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple3<String,Integer,Integer>> scoreDS = env.fromElements(
                Tuple3.of("燕双鹰",80,3),
                Tuple3.of("李大喜",90,4),
                Tuple3.of("李大喜",88,4),
                Tuple3.of("狄仁杰",95,4),
                Tuple3.of("狄仁杰",86,4)
        );

        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table scoreTable = table_env.fromDataStream(scoreDS,$("f0").as("name"),$("f1").as("score"),$("f2").as("weight"));// 字段名
        table_env.createTemporaryView("scores",scoreTable);

        //TODO 2. 注册函数
        table_env.createTemporaryFunction("weightAvg",MyWeightAvg.class);

        // TODO 3. 调用自定义函数
        table_env
                .sqlQuery("select name,weightAvg(score,weight) from scores group by name")
                .execute()
                .print();
    }
}

运行结果:

 

聚合函数的accumulate()方法有三个输入参数。第一个是WeightedAvgAccum类型的累加器;另外两个则是函数调用时输入的字段:要计算的值 ivalue 和 对应的权重 iweight。这里我们并不考虑其它方法的实现,只要有必须的三个方法就可以了。

4.7.5、表聚合函数(Table Aggregate Functions

        用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。

        自定义表聚合函数需要继承抽象类TableAggregateFunction。TableAggregateFunction的结构和原理与AggregateFunction非常类似,同样有两个泛型参数<T, ACC>,用一个ACC类型的累加器(accumulator)来存储聚合的中间结果。聚合函数中必须实现的三个方法,在TableAggregateFunction中也必须对应实现:

  • createAccumulator()
  •     创建累加器的方法,与AggregateFunction中用法相同。
  • accumulate()
  •     聚合计算的核心方法,与AggregateFunction中用法相同。
  • emitValue()

    所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着AggregateFunction中的getValue()方法;区别在于emitValue没有输出类型,而输入参数有两个:第一个是ACC类型的累加器,第二个则是用于输出数据的“收集器”out,它的类型为Collect<T>。另外,emitValue()在抽象类中也没有定义,无法override,必须手动实现。

        表聚合函数相对比较复杂,它的一个典型应用场景就是TOP-N查询。比如我们希望选出一组数据排序后的前两名,这就是最简单的TOP-2查询。没有现成的系统函数,那么我们就可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值,每当来一条新数据就在accumulate()方法中进行比较更新,最终在emitValue()中调用两次out.collect()将前两名数据输出。

具体代码如下:

package com.lyh.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

// T: 返回类型(数值,排名) ACC:累加器类型(第一大的数,第二大的数)
public class MyTableAggregate extends TableAggregateFunction<Tuple2<Integer,Integer>,Tuple2<Integer,Integer>> {

    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return Tuple2.of(0,0);
    }

    /**
     *
     * 每来一个数据 调用一次 判断比较大小 更新top2到acc中
     * @param acc 累加器类型
     * @param num 过来的数据
     */
    public void accumulate(Tuple2<Integer,Integer> acc,Integer num){
        if (num > acc.f0){
            // 新来的变第一,旧的第一变第二
            acc.f1 = acc.f0;
            acc.f0 = num;
        }else if (num > acc.f1){
            // 新来的变第二,旧的第二删除
            acc.f1 = num;
        }
    }

    /**
     * 输出结果 (数值,排名)
     * @param acc 累加器类型
     * @param out 采集器类型,和输出结果类型一样
     */
    public void emitValue(Tuple2<Integer,Integer> acc, Collector<Tuple2<Integer,Integer>> out){
        if (acc.f0 != 0){
            out.collect(Tuple2.of(acc.f0,1));
        }
        if (acc.f1 != 0){
            out.collect(Tuple2.of(acc.f1,2));
        }
    }
}
package com.lyh.sql;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class ScalarFunctionDemo4 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> valDS = env.fromElements(3,5,4,7,5,6,1,4,2);

        // TODO 1.创建表环境
        StreamTableEnvironment table_env = StreamTableEnvironment.create(env);
        // TODO 流 -> 表
        // 属性名 就是表的 字段名
        Table valueTable = table_env.fromDataStream(valDS,$("value"));// 字段名
        table_env.createTemporaryView("values",valueTable);

        //TODO 2. 注册函数
        table_env.createTemporaryFunction("top2",MyTableAggregate.class);

        // TODO 3. 调用自定义函数: 只支持 table api
        valueTable
                .flatAggregate(call("top2",$("value")).as("value","rank"))
                .select($("value"),$("rank"))
                .execute()
                .print();
    }
}

运行结果:

+----+-------------+-------------+
| op |       value |        rank |
+----+-------------+-------------+
| +I |           3 |           1 |
| -D |           3 |           1 |
| +I |           5 |           1 |
| +I |           3 |           2 |
| -D |           5 |           1 |
| -D |           3 |           2 |
| +I |           5 |           1 |
| +I |           4 |           2 |
| -D |           5 |           1 |
| -D |           4 |           2 |
| +I |           7 |           1 |
| +I |           5 |           2 |
| -D |           7 |           1 |
| -D |           5 |           2 |
| +I |           7 |           1 |
| +I |           5 |           2 |
| -D |           7 |           1 |
| -D |           5 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
| -D |           7 |           1 |
| -D |           6 |           2 |
| +I |           7 |           1 |
| +I |           6 |           2 |
+----+-------------+-------------+
32 rows in set

目前SQL中没有直接使用表聚合函数的方式,所以需要使用Table API的方式来调用。

        这里使用了flatAggregate()方法,它就是专门用来调用表聚合函数的接口。统计num值最大的两个;并将聚合结果的两个字段重命名为value和rank,之后就可以使用select()将它们提取出来了。

总结

这节课用到不少东西:Kafka、Hive、MySQL、Flink,需要注意的地方很多,忘了的东西也很多。这里记录一下:

  • 关闭 hadoop 前 把 yarn里的任务都 kill 掉(尤其是 flink 的 yarn-session)

一些启动命令:

  • 启动 hive:hiveservices.sh start
  • 启动 flink sql:
    • bin/sqlsession -d
    • bin/sql-client embeded -s yarn-session -i sql-client-init.sql

        现在是 2024-01-23 22:13,终于把 Flink 完结了,从这学期的开始,耗时半年断断续续。Flink 是我难以言说的喜欢的一门课,没有缘由,希望接下来可以好好把它弄熟,更希望未来若干年的工作可以都和它打交道。

        永言配命,自求多福,希望未来能有一个好的结果,浅浅期待一下吧 ~