老师,我的代码:
def main(args: Array[String]): Unit = {
//首先获取TableEnvironment
val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
//创建TableEnvironment对象
val sTableEnv = TableEnvironment.create(sSettings)
//创建输入表,就类似于创建数据源了,用于获取数据
sTableEnv.executeSql("" +
"create table myTable(\n" +
"pwd string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = 'D:\\test\\source1\\demo.json',\n" +
"'format.type' = 'json'\n" +
")"
)
val result = sTableEnv.sqlQuery("select * from myTable")
//输出结果到控制台
result.execute().print()
}
报错信息:
Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:190)
at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:91)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
at com.sino_bridge.scala.tablesql.FlinkSqlJsonOps$.main(FlinkSqlJsonOps.scala:25)
at com.sino_bridge.scala.tablesql.FlinkSqlJsonOps.main(FlinkSqlJsonOps.scala)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'format.type' expects 'csv', but is 'json'
The following properties are requested:
connector.path=D:\\test\\source1\\demo.json
connector.type=filesystem
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=pwd
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.filesystem.FileSystemTableFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
... 19 more
如果按照提示:
The following properties are requested:
connector.path=D:\\test\\source1\\demo.json
connector.type=filesystem
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=pwd
把对应的:
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=pwd
这2行配置加上,还是会报错。
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星