Categories
程式開發

Flink on Zeppelin (2) – Batch篇


 在Flink on Zeppelin 入门篇” 中我们讲述了如何配置Zeppelin + Flink来运行一个最简单的WordCount例子。本文将讲述如何使用Flink SQL + UDF来做Batch ETL和BI数据分析的任务。

Flink Interpreter类型

    首先介绍下Zeppelin中的Flink Interpreter类型。Zeppelin的Flink Interpreter支持Flink的所有API (DataSet, DataStream, Table API )。语言方面支持Scala,Python,SQL。下图是Zeppelin中支持的不同场景下的Flink Interpreter。

Flink on Zeppelin (2) - Batch篇 1

配置Flink Interpreter

       下图例举了所有重要的Flink配置信息,除此之外你还可以配置任意Flink的Configuration(https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html)

Flink on Zeppelin (2) - Batch篇 2

内置入口变量

      Flink Interpreter (%flink) 为用户自动创建了下面6个变量作为Flink Scala程序的入口。

senv (StreamExecutionEnvironment),benv (ExecutionEnvironment)stenv (StreamTableEnvironment for blink planner)btenv (BatchTableEnvironment for blink planner)stenv_2 (StreamTableEnvironment for flink planner)btenv_2 (BatchTableEnvironment for flink planner)

      PyFlinkInterpreter (%flink.pyflink, %flink.ipyflink) 为用户自动创建了6个python变量作为PyFlink程序的入口

s_env (StreamExecutionEnvironment),b_env (ExecutionEnvironment)st_env (StreamTableEnvironment for blink planner)bt_env (BatchTableEnvironment for blink planner)st_env_2 (StreamTableEnvironment for flink planner)bt_env_2 (BatchTableEnvironment for flink planner)

Blink/Flink Planner

Flink 1.10中有2种table api的planner:flink & blink.

如果你用DataSet api以及需要把DataSet转换成Table,那么就需要使用Flink planner的TableEnvironment (btenv_2 and stenv_2).其他场景下, 我们都会建议用户使用blink planner. 这也是Flink sql使用的planner(%flink.bsql & %flink.ssql)

使用Flink Batch SQL

      %flink.bsql 是用来执行Flink的batch sql. 运行 help 命令可以得到所有可用的命令

Flink on Zeppelin (2) - Batch篇 3

总的来说,Flink Batch SQL可以用来做2大任务:

使用 insert into 语句来做 Batch ETL使用 select 语句来做BI 数据分析

基于Bank数据的Batch ETL

下面我们基于Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做Batch ETL任务。

首先用Flink Sql创建一个raw 数据的source table,以及清洗干净后的sink table。

Flink on Zeppelin (2) - Batch篇 4

Flink on Zeppelin (2) - Batch篇 5

然后再定义Table Function来parse raw data。

Flink on Zeppelin (2) - Batch篇 6

接下来就可以用insert into语句来进行数据转换(source table –> sink table)

Flink on Zeppelin (2) - Batch篇 7

 用select语句来Preview最终数据,验证insert into语句的正确性

Flink on Zeppelin (2) - Batch篇 8

基于Bank数据的BI数据分析

经过上面的数据清洗工作,接下来就可以对数据进行分析了。用户不仅可以使用标准的SQL Select语句进行分析,也可以使用Zeppelin的dynamic forms来增加交互性(TextBox,Select,Checkbox)

Flink on Zeppelin (2) - Batch篇 9

使用Flink UDF

      SQL虽然强大,但表达能力毕竟有限。有时候就要借助于UDF来表达更复杂的逻辑。Flink Interpreter 支持2种UDF (Scala + Python)。下面是2个简单的例子。

       Scala UDF

%flink

class ScalaUpper extends ScalarFunction {
def eval(str: String) = str.toUpperCase
}

btenv.registerFunction("scala_upper", new ScalaUpper())

    Python UDF

%flink.pyflink

class PythonUpper(ScalarFunction):
def eval(self, s):
return s.upper()

bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))

Flink on Zeppelin (2) - Batch篇 10

Flink on Zeppelin (2) - Batch篇 11

对Hive数据的数据分析

     除了可以分析Flink SQL创建的table之外,Flink也可以分析Hive上已有的table。如果要让Flink Interpreter使用Hive,那么需要做以下配置

设置 zeppelin.flink.enableHive 为 trueCopy 下面这些 dependencies 到flink的 lib 目录flink-connector-hive_{scala_version}-{flink.version}.jarflink-hadoop-compatibility_{scala_version}-{flink.version}.jarflink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jarhive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)在Flink interpreter setting 里或者 zeppelin-env.sh里指定 HIVE_CONF_DIR在Flink interpreter setting 指定 zeppelin.flink.hive.version 为你使用的Hive版本

下面就用一个简单的例子展示如何在Zeppelin中用Flink查询Hive table

1. 用Zeppelin的jdbc interpreter查询hive tables

Flink on Zeppelin (2) - Batch篇 12

2. 用Flink sql 查询 hive table的schema

Flink on Zeppelin (2) - Batch篇 13

3. 用Flink Sql 查询hive table

Flink on Zeppelin (2) - Batch篇 14

       本文只是简单介绍如何在Zeppelin中使用Flink SQL + UDF,关于更多Flink SQL和UDF请参考Flink官方文档

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.htmlhttps://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.htmlhttps://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html

如果有碰到任何问题,请加入下面这个钉钉群讨论。后续我们会有更多Tutorial的文章,敬请期待。

Flink on Zeppelin (2) - Batch篇 15