flinsql如何配置才能读取filesystem的json格式文件?

flinsql如何配置才能读取filesystem的json格式文件?

老师,您好!

看官方文档上,filesystem类型是支持json格式的,自己尝试用flinksql去读json文件作为数据源,一直报错,官网也没有找到filesystem的json示例,老师能否给一个例子?

正在回答

登陆购买课程后可参与讨论,去登陆

2回答

测试通过的代码,你可以试一下

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object FlinkTest {
  def main(args: Array[String]): Unit = {
    //获取StreamTableEnviroment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val ssTableEnv = StreamTableEnvironment.create(env, ssSettings)

    //组装source表建表语句
    val inTableSql ="""
                      |create table source(
                      |name string,
                      |age int
                      |) with(
                      |'connector' = 'filesystem',
                      |'path' = 'file:///Users/xuwei/tmp/a.json',
                      |'format' = 'json'
                      |)
                      |""".stripMargin
    ssTableEnv.executeSql(inTableSql)

    //组装sink表建表语句
    val outTableSql ="""
                       |CREATE TABLE print_table (
                       |name string,
                       |age int
                       |) WITH (
                       | 'connector' = 'print'
                       |)
                       |""".stripMargin
    ssTableEnv.executeSql(outTableSql)

    //执行sql查询
    val res = ssTableEnv.sqlQuery("select name,age from source")
    //保存结果
    res.executeInsert("print_table")
  }

}



需要导入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    <version>1.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.11.1</version>
</dependency>


  • pu2chyh 提问者 #1


    老师,我的demo.json的内容:

    {

    "name": "tom",

    "age":10

    }

    maven依赖没有做任何修改,代码唯一修改的地方:

    'path' = 'd:\test\source1\demo.json',


    我的输出里,没有报错了,但是没有看到这个文件的内容,输出如下:

    http://img1.sycdn.imooc.com//climg/60b24b0b09eb6b7e18360395.jpg


    2021-05-29 22:13:39
  • 徐老师 回复 提问者 pu2chyh #2

    1. json数据不能换行,在文件中是一行

    {"name":"zs","age":10}

    2. 添加log4j相关日志配置和log4j.properties配置文件看一下具体的报错信息



    2021-05-29 22:17:04
  • pu2chyh 提问者 回复 徐老师 #3

    谢谢老师,把json文件改成一行就可以了。

    2021-05-29 22:22:30
徐老师 2021-05-29 08:21:58

你是想在批处理中使用flinksql读取文件中的json格式数据吗?


在课程第20周中的8-1小节视频中有在流处理中使用flinksql读取kafka中的数据(json格式)的案例代码,你看下是否能满足你的需求

  • 提问者 pu2chyh #1

    老师,是流处理场景。我的需求是读取本地文件系统的一个json文件,这个能实现吗?

    ​connector.type是filesystem

    2021-05-29 08:27:44
  • 徐老师 回复 提问者 pu2chyh #2

    应该是可以的,不过流处理场景下读取文件意义不大,流处理基本上都是从消息队列中读取数据,你先根据第20周中的8-1小节视频中的内容修改一下,看看能不能搞定,还有问题的话我晚上试一下写个代码出来,这两天在外面出差了,白天暂时写不成代码



    2021-05-29 08:30:52
  • 提问者 pu2chyh #3

    老师,我的代码:

    def main(args: Array[String]): Unit = {
    //首先获取TableEnvironment
     val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    //创建TableEnvironment对象
     val sTableEnv = TableEnvironment.create(sSettings)

    //创建输入表,就类似于创建数据源了,用于获取数据
     sTableEnv.executeSql("" +
    "create table myTable(\n" +
    "pwd string\n" +
    ") with (\n" +
    "'connector.type' = 'filesystem',\n" +
    "'connector.path' = 'D:\\test\\source1\\demo.json',\n" +
    "'format.type' = 'json'\n" +
    ")"
     )

    val result = sTableEnv.sqlQuery("select * from myTable")
    //输出结果到控制台
     result.execute().print()
    }


    报错信息:

    Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed.

    at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)

    at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:190)

    at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:91)

    at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)

    at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)

    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)

    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)

    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)

    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)

    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)

    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)

    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)

    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)

    at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)

    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)

    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)

    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)

    at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)

    at com.sino_bridge.scala.tablesql.FlinkSqlJsonOps$.main(FlinkSqlJsonOps.scala:25)

    at com.sino_bridge.scala.tablesql.FlinkSqlJsonOps.main(FlinkSqlJsonOps.scala)

    Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in

    the classpath.


    Reason: Required context properties mismatch.


    The matching candidates:

    org.apache.flink.table.sources.CsvBatchTableSourceFactory

    Mismatched properties:

    'format.type' expects 'csv', but is 'json'


    The following properties are requested:

    connector.path=D:\\test\\source1\\demo.json

    connector.type=filesystem

    format.type=json

    schema.0.data-type=VARCHAR(2147483647)

    schema.0.name=pwd


    The following factories have been considered:

    org.apache.flink.table.sources.CsvBatchTableSourceFactory

    org.apache.flink.table.sources.CsvAppendTableSourceFactory

    org.apache.flink.table.filesystem.FileSystemTableFactory

    org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory

    at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)

    at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)

    at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)

    at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)

    at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)

    ... 19 more


    如果按照提示:

    The following properties are requested:

    connector.path=D:\\test\\source1\\demo.json

    connector.type=filesystem

    format.type=json

    schema.0.data-type=VARCHAR(2147483647)

    schema.0.name=pwd

    把对应的:

    schema.0.data-type=VARCHAR(2147483647)

    schema.0.name=pwd

    这2行配置加上,还是会报错。



    2021-05-29 08:56:25
问题已解决,确定采纳
还有疑问,暂不采纳

恭喜解决一个难题,获得1积分~

来为老师/同学的回答评分吧

0 星
请稍等 ...
意见反馈 帮助中心 APP下载
官方微信

在线咨询

领取优惠

免费试听

领取大纲

扫描二维码,添加
你的专属老师