flinsql如何配置才能读取filesystem的json格式文件?
老师,您好!
看官方文档上,filesystem类型是支持json格式的,自己尝试用flinksql去读json文件作为数据源,一直报错,官网也没有找到filesystem的json示例,老师能否给一个例子?
26
收起
正在回答
2回答
测试通过的代码,你可以试一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object FlinkTest { def main(args: Array[String]): Unit = { //获取StreamTableEnviroment val env = StreamExecutionEnvironment.getExecutionEnvironment val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val ssTableEnv = StreamTableEnvironment.create(env, ssSettings) //组装source表建表语句 val inTableSql = "" " |create table source( |name string, |age int |) with( | 'connector' = 'filesystem' , | 'path' = 'file:///Users/xuwei/tmp/a.json' , | 'format' = 'json' |) | "" ".stripMargin ssTableEnv.executeSql(inTableSql) //组装sink表建表语句 val outTableSql = "" " |CREATE TABLE print_table ( |name string, |age int |) WITH ( | 'connector' = 'print' |) | "" ".stripMargin ssTableEnv.executeSql(outTableSql) //执行sql查询 val res = ssTableEnv.sqlQuery( "select name,age from source" ) //保存结果 res.executeInsert( "print_table" ) } } |
需要导入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2. 12 </artifactId> <version> 1.11 . 1 </version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2. 12 </artifactId> <version> 1.11 . 1 </version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2. 12 </artifactId> <version> 1.11 . 1 </version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2. 12 </artifactId> <version> 1.11 . 1 </version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version> 1.11 . 1 </version> </dependency> |
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧