TableFactory

    xiaoxiao2022-07-05  167

    文章目录

    例子DescriptorTableFactoryKafkaTableSourceSinkFactoryBase TableDescriptor

    例子

    以下例子是先创建一个KafkaSource、再创建一个CSVSink,最后将数据从KafkaSource接入到CSVSink。

    def testFromKafkaToCSV(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) //Source tEnv.connect(new Kafka() .version("0.10") .topic("Test") .startFromEarliest() .property("zookeeper.connect", "node00:2181") .property("bootstrap.servers", "node00:9092")) .withFormat( new Json() .deriveSchema() ) .withSchema( new Schema() .field("api", Types.STRING) .field("clientip", Types.STRING).from("clientip") .field("t1", Types.STRING).from("timestamp") ) .inAppendMode() .registerTableSource("srcTable") //Sink tEnv.connect(new FileSystem().path("hdfs://node00:8020/lisen5/connect")).withFormat( new Csv() .field("api", Types.STRING) .field("c", Types.INT) ) .withSchema( new Schema() .field("api", Types.STRING) .field("c", Types.INT) ) .inAppendMode() .registerTableSink("sinkTable") //Insert into tEnv.sqlUpdate("insert into sinkTable select api, 1 as c from srcTable") env.execute() }

    以上例子最终通过registerTableSource和registerTableSink方法创建了一个Source和Sink表,下面来看具体过程

    //ConnectTableDescriptor override def registerTableSource(name: String): Unit = { val tableSource = TableFactoryUtil.findAndCreateTableSource(tableEnv, this) tableEnv.registerTableSource(name, tableSource) }

    registerTableSource方法首先创建一个TableSource,然后通过registerTableSource注册表,下面看看tableSource是怎么获取的

    //TableFactoryUtil def findAndCreateTableSource[T]( tableEnvironment: TableEnvironment, descriptor: Descriptor) : TableSource[T] = { val javaMap = descriptor.toProperties tableEnvironment match { case _: BatchTableEnvironment => //批处理 case _: StreamTableEnvironment => TableFactoryService .find(classOf[StreamTableSourceFactory[T]], javaMap) .createStreamTableSource(javaMap) case e@_ => //抛异常 } } //TableFactoryService def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = { findInternal(factoryClass, propertyMap, None) } private def findInternal[T]( factoryClass: Class[T], propertyMap: JMap[String, String], classLoader: Option[ClassLoader]) : T = { Preconditions.checkNotNull(factoryClass) Preconditions.checkNotNull(propertyMap) val properties = propertyMap.asScala.toMap //反射加载所有TableFactory val foundFactories = discoverFactories(classLoader) //过滤掉未继承StreamTableSourceFactory的factory val classFactories = filterByFactoryClass( factoryClass, properties, foundFactories) //根据properties去匹配factory的requiredContext,返回匹配上的factory //例如connector.type和update-mode参数 val contextFactories = filterByContext( factoryClass, properties, foundFactories, classFactories) //根据properties去匹配factory的supportedProperties,返回匹配上的factory //例如connector.topic、schema.type等 filterBySupportedProperties( factoryClass, properties, foundFactories, contextFactories) } private def filterByContext[T]( factoryClass: Class[T], properties: Map[String, String], foundFactories: Seq[TableFactory], classFactories: Seq[TableFactory]) : Seq[TableFactory] = { val matchingFactories = classFactories.filter { factory => //获取factory的requiredContext,并将key转为小写,下面会具体分析TableFactory val requestedContext = normalizeContext(factory) val plainContext = mutable.Map[String, String]() plainContext ++= requestedContext //删除部分版本参数 plainContext.remove(CONNECTOR_PROPERTY_VERSION) plainContext.remove(FORMAT_PROPERTY_VERSION) plainContext.remove(METADATA_PROPERTY_VERSION) plainContext.remove(STATISTICS_PROPERTY_VERSION) plainContext.remove(CATALOG_PROPERTY_VERSION) //传入properties的每一个key值都包含plainContext的key,并且value也相等,比如传入了connector.type='kafka',那requiredContext种也是connector.type='kafka'才能匹配上 plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2) } if (matchingFactories.isEmpty) { //未能匹配上抛出异常 } matchingFactories } private def filterBySupportedProperties[T]( factoryClass: Class[T], properties: Map[String, String], foundFactories: Seq[TableFactory], classFactories: Seq[TableFactory]) : T = { val plainGivenKeys = mutable.ArrayBuffer[String]() properties.keys.foreach { k => //将'.数字'(例如.1)替换成'.#' val key = k.replaceAll(".\\d+", ".#") //plainGivenKeys中加入所有key并去重 if (!plainGivenKeys.contains(key)) { plainGivenKeys += key } } //记录上一次的key var lastKey: Option[String] = None val supportedFactories = classFactories.filter { factory => //获取requiredContext的所有key val requiredContextKeys = normalizeContext(factory).keySet //获取supportedProperties的所有key和末尾包含*的key并将*去掉的集合 val (supportedKeys, wildcards) = normalizeSupportedProperties(factory) //plainGivenKeys去掉context key得到givenContextFreeKeys val givenContextFreeKeys = plainGivenKeys.filter(!requiredContextKeys.contains(_)) // perform factory specific filtering of keys val givenFilteredKeys = filterSupportedPropertiesFactorySpecific( factory, givenContextFreeKeys) givenFilteredKeys.forall { k => lastKey = Option(k) supportedKeys.contains(k) || wildcards.exists(k.startsWith) } } //抛异常 supportedFactories.head.asInstanceOf[T] }

    Descriptor

    Descriptor是用来描述connector、schema、format的接口,我们知道创建一个Source正好需要connecortor、schema、format这三要素。所以Descriptor是用来描述一张表的接口

    TableFactory

    TableFactory是用来创建Source或Sink的表的工厂,是一个接口。

    public interface TableFactory { //定义了要求的参数 //- connector.property-version //- format.property-version Map<String, String> requiredContext(); //定义了支持的属性 //- schema.#.type //- schema.#.name //- connector.topic //- format.line-delimiter //- format.ignore-parse-errors //- format.fields.#.type //- format.fields.#.name List<String> supportedProperties(); }

    KafkaTableSourceSinkFactoryBase

    public Map<String, String> requiredContext() { Map<String, String> context = new HashMap<>(); context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND); // append mode context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_KAFKA); // kafka context.put(CONNECTOR_VERSION, kafkaVersion()); // version context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility return context; } public List<String> supportedProperties() { List<String> properties = new ArrayList<>(); // kafka properties.add(CONNECTOR_TOPIC); properties.add(CONNECTOR_PROPERTIES); properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY); properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE); properties.add(CONNECTOR_STARTUP_MODE); properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION); properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET); properties.add(CONNECTOR_SINK_PARTITIONER); properties.add(CONNECTOR_SINK_PARTITIONER_CLASS); // schema properties.add(SCHEMA() + ".#." + SCHEMA_TYPE()); properties.add(SCHEMA() + ".#." + SCHEMA_NAME()); properties.add(SCHEMA() + ".#." + SCHEMA_FROM()); // time attributes properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME()); properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE()); properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM()); properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS()); properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED()); properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE()); properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS()); properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED()); properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY()); // format wildcard properties.add(FORMAT + ".*"); return properties; }

    TableDescriptor

    TableDescriptor 顾名思义,是对表的描述,它由三个子描述符构成:第一是 Connector,描述数据的来源,比如 Kafka、ES 等;第二是 Format,描述数据的格式,比如 csv、json、avro 等;第三是 Schema,描述每个字段的名称与类型。TableDescriptor 有两个基本的实现——ConnectTableDescriptor 用于描述内部表,也就是编程方式创建的表;ExternalCatalogTable 用于描述外部表。 有了 TableDescriptor,接下来需要 TableFactory 根据描述信息来实例化 Table。不同的描述信息需要不同的 TableFactory 来处理,Flink 如何找到匹配的 TableFactory 实现呢?实际上,为了保证框架的可扩展性,Flink 采用了 Java SPI 机制来加载所有声明过的 TableFactory,通过遍历的方式去寻找哪个 TableFactory 是匹配该 TableDescriptor 的。TableDescriptor 在传递给 TableFactory 前,被转换成一个 map,所有的描述信息都用 key-value 形式来表达。TableFactory 定义了两个用于过滤匹配的方法——一个是 requiredContext(),用于检测某些特定 key 的 value 是否匹配,比如 connector.type 是否为 kakfa;另一个是 supportedProperties(),用于检测 key 是否能识别,如果出现不识别的 key,说明无法匹配。 匹配到了正确的 TableFactory,接下来就是创建真正的 Table,然后将其通过 TableEnvironment 注册。最终注册成功的 Table,才能在 SQL 中引用。 如果外部元数据创建的表也能被转换成 TableFactory 可识别的 map,那么就能被无缝地注册到 TableEnvironment。通过元数据中心创建的表,都会将元数据信息存储到 MySQL,我们用一张表来记录 Table 的基本信息,然后另外三张表分别记录 Connector、Format、Schema 转换成 key-value 后的描述信息。之所以拆开成三张表,是为了能够能独立的更新这三种描述信息。接下来是定制实现的 ExternalCatalog,能够读取 MySQL 这四张表,并转换成 map 结构。

    最新回复(0)