flinsql如何配置才能读取filesystem的json格式文件?
老师,您好!
看官方文档上,filesystem类型是支持json格式的,自己尝试用flinksql去读json文件作为数据源,一直报错,官网也没有找到filesystem的json示例,老师能否给一个例子?
26
收起
正在回答
2回答
测试通过的代码,你可以试一下
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object FlinkTest {
def main(args: Array[String]): Unit = {
//获取StreamTableEnviroment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val ssTableEnv = StreamTableEnvironment.create(env, ssSettings)
//组装source表建表语句
val inTableSql ="""
|create table source(
|name string,
|age int
|) with(
|'connector' = 'filesystem',
|'path' = 'file:///Users/xuwei/tmp/a.json',
|'format' = 'json'
|)
|""".stripMargin
ssTableEnv.executeSql(inTableSql)
//组装sink表建表语句
val outTableSql ="""
|CREATE TABLE print_table (
|name string,
|age int
|) WITH (
| 'connector' = 'print'
|)
|""".stripMargin
ssTableEnv.executeSql(outTableSql)
//执行sql查询
val res = ssTableEnv.sqlQuery("select name,age from source")
//保存结果
res.executeInsert("print_table")
}
}需要导入依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.11.1</version> </dependency>

恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星