使用自定义流水线导入元数据

本文档介绍了如何使用元数据导入 API 方法和您自己的流水线,将元数据从第三方系统导入 Dataplex Universal Catalog。Dataplex Universal Catalog 元数据由条目及其切面组成。

如果您想使用 Google Cloud管理的编排流水线来提取和导入元数据,建议使用托管式连接流水线。借助托管式连接流水线,您可以自带连接器,该连接器会提取元数据,并以可用作元数据导入 API 方法(元数据导入文件)的输入的格式生成输出。然后,您可以使用 Workflows 来编排流水线任务。

您可以运行以下类型的元数据导入作业:

  • 完全同步条目,并以增量方式导入条目的切面。支持自定义条目。
  • 仅增量导入切面。支持属于自定义条目和系统条目的切面。对于自定义条目,您可以修改可选切面和必需的切面。对于系统条目,您可以修改可选切面。

简要步骤

如需使用元数据导入 API 导入元数据,请按照以下简要步骤操作:

  1. 确定作业范围。

    此外,还需要了解 Dataplex Universal Catalog 如何为条目和切面应用比较逻辑和同步模式。

  2. 创建一个或多个元数据导入文件,以定义要导入的数据。

  3. 将元数据导入文件保存在 Cloud Storage 存储桶中。

  4. 运行元数据导入作业。

本页面中的步骤假设您熟悉 Dataplex Universal Catalog 元数据概念,包括条目组、条目类型和切面类型。如需了解详情,请参阅 Dataplex Universal Catalog 中的数据目录管理简介

准备工作

在导入元数据之前,请完成本部分中的任务。

所需的角色

如需确保 Dataplex Universal Catalog 服务账号拥有访问 Cloud Storage 存储桶所需的权限,请让您的管理员为 Dataplex Universal Catalog 服务账号授予该存储桶的 Storage Object Viewer (roles/storage.objectViewer) IAM 角色和 storage.buckets.get 权限。

如需获得管理元数据导入作业所需的权限,请让您的管理员为您授予以下 IAM 角色:

  • 在完整条目同步元数据作业中修改条目及其切面:
  • 在仅切面的元数据作业中修改必需的切面:
  • 在仅切面的元数据作业中修改可选切面:针对切面类型或在其中定义切面类型的项目的 Dataplex Aspect Type User (roles/dataplex.aspectTypeUser)。请注意,在仅切面的元数据的作业中修改可选切面时,您无需拥有关联的条目类型的权限。
  • 创建元数据导入作业:
  • 查看元数据作业:针对项目的 Dataplex Metadata Job Viewer (roles/dataplex.metadataJobViewer)
  • 创建、查看和取消元数据作业:针对项目的 Dataplex Metadata Job Owner (roles/dataplex.metadataJobOwner)

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

创建 Google Cloud 资源

准备以下 Google Cloud 资源:

  1. 为要导入的条目创建条目组
  2. 为要导入的切面创建切面类型
  3. 为要导入的条目创建条目类型
  4. 如果您要运行仅切面的元数据作业,请为要导入的切面创建条目
  5. 创建一个 Cloud Storage 存储桶以存储元数据导入文件。

元数据导入作业的组成部分

导入元数据时,请考虑元数据作业的以下组成部分:

  • 作业范围:要包含在作业中的条目组、条目类型和切面类型。
  • 同步模式:如何更新作业中的条目和切面。
  • 元数据导入文件:用于定义要为作业中的条目和切面设置的值的文件。您可以在同一元数据作业中提供多个元数据导入文件。您将文件保存在 Cloud Storage 中。
  • 比较逻辑:Dataplex Universal Catalog 如何确定要修改哪些条目和切面。

作业范围

作业范围用于定义您要包含在元数据导入作业中的条目组、条目类型和切面类型。导入元数据时,您需要修改属于作业范围内的资源的条目和切面。

如需定义作业范围,请遵循以下准则:

  • 条目组:指定要包含在作业中的单个条目组。作业仅会修改属于此条目组的条目和切面。条目组和作业必须位于同一区域。

  • 条目类型:指定要包含在作业中的一个或多个条目类型。作业仅会修改属于这些条目类型的条目和切面。条目类型的位置必须与作业位置匹配,或者条目类型必须是全球性的。

  • 切面类型:指定要在作业中包含的一个或多个切面类型。作业仅会修改属于这些切面类型的切面。切面类型的位置必须与作业位置匹配,或者切面类型必须是全球性的。

作业范围必须包含您在元数据导入文件中指定的所有条目类型和切面类型。

您可以在创建元数据作业时指定作业范围。

同步模式

同步模式用于指定如何更新元数据导入作业中的条目和切面。您需要为条目和切面都提供同步模式。根据您要导入的资源,系统支持以下同步模式组合。

目标 条目同步模式 切面同步模式 结果
导入条目及其切面 FULL INCREMENTAL

作业范围内的所有条目都会被修改。

如果 Dataplex Universal Catalog 中存在某个条目,但该条目未包含在元数据导入文件中,则在您运行元数据作业时,该条目会被删除。

只有当元数据导入文件在 updateMask 字段和 aspectKeys 字段中包含对相应切面的引用时,才会修改该切面。请参阅导入项的结构

仅导入切面 NONE INCREMENTAL

如果切面属于作业的范围,并且元数据导入文件在 aspectKeys 字段中包含对切面的引用,则会修改这些切面。请参阅导入项的结构

属于作业范围内的条目的其他元数据不会被修改。

您可以在创建元数据作业时指定同步模式。

元数据导入文件

元数据导入文件是您要修改的条目和切面的集合。它定义了要为属于这些条目和切面的所有字段设置的值。您需要在运行元数据导入作业之前准备好该文件。

请遵循以下一般准则:

  • 您可以在同一元数据作业中提供多个元数据导入文件。
  • 运行完整条目同步元数据作业时,您在文件中提供的条目会完全替换作业范围内的所有资源的所有现有条目。这意味着您必须为作业中的所有条目都提供值,而不仅仅是您要添加或更新的值。如需获取项目中的当前条目列表以用作起点,请使用 entries.list API 方法

  • 您必须在元数据作业中提供元数据导入文件。如果您想删除作业范围内的条目的所有现有数据,请提供一个空的元数据导入文件。

  • 您在文件中包含的所有条目和切面都必须属于您在作业范围内定义的条目组、条目类型和切面类型。

请按照以下各部分中的详细准则创建元数据导入文件。

文件的结构

元数据导入文件中的每一行都包含一个与一个导入项对应的 JSON 对象。导入项是一个对象,用于描述要针对条目及其附加切面修改的值。

您可以在单个元数据导入文件中提供多个导入项。不过,请勿在元数据作业中多次提供相同的导入项。使用换行符 (0x0a) 分隔每个导入项。

每个导入项之间都有换行符的元数据导入文件如以下示例所示:

{ "entry": { "name": "entry 1", #Information about entry 1 }
{ "entry": { "name": "entry 2", #Information about entry 2 }

导入项的结构

元数据导入文件中的每个导入项都可以包含以下字段(请参阅 ImportItem)。以下示例的格式设置中包含换行符,以便于阅读,但保存文件时,请仅在每个导入项后添加换行符。请勿在单个导入项的字段之间添加换行符。

{
  "entry": {
    "name": "ENTRY_NAME",
    "entryType": "ENTRY_TYPE",
    "entrySource": {
      "resource": "RESOURCE",
      "system": "SYSTEM",
      "platform": "PLATFORM",
      "displayName": "DISPLAY_NAME",
      "description": "DESCRIPTION",
      "createTime": "ENTRY_CREATE_TIMESTAMP",
      "updateTime": "ENTRY_UPDATE_TIMESTAMP"
    },
    "aspects": {
      "ASPECT": {
        "data": {
          "KEY": "VALUE"
        },
        "aspectSource": {
          "createTime": "ASPECT_CREATE_TIMESTAMP",
          "updateTime": "ASPECT_UPDATE_TIMESTAMP"
        }
      },
      # Additional aspect maps
    },
    "parentEntry": "PARENT_ENTRY",
    "fullyQualifiedName": "FULLY_QUALIFIED_NAME"
  },
  "updateMask": "UPDATE_MASK_FIELDS",
  "aspectKeys": [
    "ASPECT_KEY",
    # Additional aspect keys
  ],
}

替换以下内容:

  • entry:有关条目及其附加切面的信息。在仅切面的元数据导入作业中,Dataplex Universal Catalog 会忽略条目的所有可选字段,但切面映射除外。

    • ENTRY_NAME:条目的相对资源名称,格式为 projects/PROJECT_ID_OR_NUMBER/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID/entries/ENTRY_ID
    • ENTRY_TYPE:用于创建此条目的条目类型的相对资源名称,格式为 projects/PROJECT_ID_OR_NUMBER/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
    • entrySource:来源系统中有关相应条目所表示的数据资源的信息:
      • RESOURCE:源系统中资源的名称。
      • SYSTEM:源系统的名称。
      • PLATFORM:包含源系统的平台。
      • DISPLAY_NAME:简单易记的显示名称。
      • DESCRIPTION:条目的说明。
      • ENTRY_CREATE_TIMESTAMP:在源系统中创建条目的时间。
      • ENTRY_UPDATE_TIMESTAMP:在源系统中更新条目的时间。
    • aspects:附加到条目的切面。aspect 对象及其数据称为切面映射。

      • ASPECT:附加到条目的切面。根据切面附加到条目的方式,使用以下格式之一:

        • 如果切面直接附加到条目,请提供其切面类型的相对资源名称,格式为 PROJECT_ID_OR_NUMBER.LOCATION_ID.ASPECT_TYPE_ID
        • 如果切面附加到条目的路径,请提供切面类型的路径,格式为 PROJECT_ID_OR_NUMBER.LOCATION_ID.ASPECT_TYPE_ID@PATH
      • KEYVALUE:切面的内容,具体取决于其切面类型元数据模板。内容必须采用 UTF-8 编码。字段的大小上限为 120 KB。即使 data 字典为空,也必须提供。

      • ASPECT_CREATE_TIMESTAMP:在源系统中创建切面的时间。

      • ASPECT_UPDATE_TIMESTAMP:在源系统中更新切面的时间。

    • PARENT_ENTRY:父条目的资源名称。

    • FULLY_QUALIFIED_NAME:可供外部系统引用的条目的名称。请参阅完全限定名称

  • UPDATE_MASK_FIELDS:要更新的字段,以相对于 Entry 资源的路径表示。请使用英文逗号分隔每个字段。

    在完整条目同步作业中,Dataplex Universal Catalog 会包含可修改的条目的所有字段的路径,包括切面。创建或重新创建条目时,系统会忽略 updateMask 字段。

    在仅切面的元数据作业中,将此值设置为 aspects

  • ASPECT_KEY:要修改的切面。支持以下语法:

    • ASPECT_TYPE_REFERENCE:匹配直接附加到条目的切面的切面类型。
    • ASPECT_TYPE_REFERENCE@PATH:匹配切面类型和指定的路径。
    • ASPECT_TYPE_REFERENCE@*:匹配所有路径的切面类型。
    • *@PATH:匹配指定路径上的所有切面类型。

    ASPECT_TYPE_REFERENCE 替换为对切面类型的引用,格式为 PROJECT_ID_OR_NUMBER.LOCATION_ID.ASPECT_TYPE_ID

    在完整条目同步作业中,如果您将此字段留空,则系统会将其视为准确指定了指定条目中存在的切面。Dataplex Universal Catalog 会隐式添加条目所有必需切面的键。

文件要求

元数据导入文件具有以下要求:

  • 该文件必须采用 JSON 行文件格式,即以换行符分隔的 JSON 文件。使用换行符 (0x0a) 分隔每个导入项。
  • 该文件必须使用 UTF-8 字符编码。
  • 支持的文件扩展名为 .jsonl.json
  • 每个元数据导入文件的大小必须小于 1 GiB。元数据作业中所有数据的总大小上限为 3 GB。这包括与作业关联的所有文件和元数据。
  • 您在该文件中指定的条目类型和切面类型必须属于元数据作业的范围。
  • 该文件必须上传到 Cloud Storage 存储桶。请勿将该文件保存在名为 CLOUD_STORAGE_URI/deletions/ 的文件夹中。

比较逻辑

Dataplex Universal Catalog 会将您在元数据导入文件中提供的值和时间戳与项目中存在的值和时间戳进行比较,以确定要修改哪些条目和切面。

概括来讲,如果元数据导入文件中的至少一项建议的更改会在作业运行时更改项目的状态,且不会引入过时的数据,则 Dataplex Universal Catalog 会更新项目中的值。建议的更改必须在元数据导入文件的更新掩码字段或切面键字段中引用。

比较逻辑会因您运行的元数据导入作业的类型而异。

完整条目同步作业

在完整条目同步元数据作业中,对于属于作业范围的每个条目,Dataplex Universal Catalog 会执行以下操作之一:

  • 创建条目和附加的切面。如果元数据导入文件包含项目中不存在的条目,Dataplex Universal Catalog 会创建该条目和附加的切面。
  • 删除条目和附加的切面。如果项目中存在某个条目,但元数据导入文件不包含该条目,则 Dataplex Universal Catalog 会从项目中删除该条目及其附加的切面。
  • 更新条目和附加的切面。如果元数据导入文件和项目中都存在某个条目,则 Dataplex Universal Catalog 会评估与该条目关联的条目源时间戳和切面源时间戳,以确定要修改哪些值。然后,Dataplex Universal Catalog 会执行以下一项或多项操作:

    • 重新创建条目。如果元数据导入文件中的条目源创建时间戳晚于项目中的对应时间戳,则 Dataplex Universal Catalog 会在项目中重新创建该条目。
    • 更新条目。如果元数据导入文件中的条目源更新时间戳晚于项目中的对应时间戳,则 Dataplex Universal Catalog 会在项目中更新该条目。
    • 创建切面。如果项目中不存在某个切面,但该切面包含在元数据导入文件的切面映射、更新掩码字段和切面键字段中,则 Dataplex Universal Catalog 会创建该切面。
    • 删除切面。如果项目中存在某个切面,并且该切面包含在元数据导入文件的更新掩码字段和切面键字段中,但不包含在切面映射中,则 Dataplex Universal Catalog 会删除该切面。
    • 更新切面。如果项目中存在某个切面,并且该切面包含在元数据导入文件的切面映射、更新掩码字段和切面键字段中,且元数据导入文件中的切面源更新时间戳晚于项目中的对应时间戳,则 Dataplex Universal Catalog 会更新该切面。

      如果元数据导入文件中未提供切面源更新时间戳,但对应条目标记为待更新,则 Dataplex Universal Catalog 也会更新该切面。

      但是,如果元数据导入文件中的至少一个切面的时间戳晚于项目中的对应时间戳,则 Dataplex Universal Catalog 不会更新附加的条目。

仅切面的作业

在仅切面的元数据作业中,对于属于作业范围的每个切面,Dataplex Universal Catalog 会执行以下操作之一:

  • 创建切面。如果项目中不存在某个切面,但该切面包含在元数据导入文件的切面映射、更新掩码字段和切面键字段中,则 Dataplex Universal Catalog 会创建该切面。
  • 删除切面。对于可选切面,如果项目中存在某个切面,并且该切面包含在元数据导入文件的更新掩码字段和切面键字段中,但不包含在切面映射中,则 Dataplex Universal Catalog 会删除该切面。

    无法删除必需的切面。

  • 更新切面。如果项目中存在某个切面,并且该切面包含在元数据导入文件的切面映射、更新掩码字段和切面键字段中,且元数据导入文件中的切面源更新时间戳晚于项目中的对应时间戳,则 Dataplex Universal Catalog 会更新该切面。

    如果元数据导入文件中未提供切面源更新时间戳,则 Dataplex Universal Catalog 也会更新该切面。

    Dataplex Universal Catalog 会根据切面源更新时间戳来更新切面,而不考虑对应条目的条目源更新时间戳。

创建元数据导入文件

在导入元数据之前,请先为作业创建元数据导入文件。请按照以下步骤操作:

  1. 按照本文档前面所述的准则准备元数据导入文件
  2. 将该文件上传到 Cloud Storage 存储桶

您可以在同一元数据作业中提供多个元数据导入文件。如需提供多个文件,请将这些文件保存在同一 Cloud Storage 存储桶中。运行作业时,您需要指定存储桶,而不是特定文件。Dataplex Universal Catalog 会从存储桶中保存的所有文件(包括子文件夹中的文件)导入元数据。

运行元数据导入作业

创建元数据导入文件后,使用 API 运行元数据导入作业。

REST

如需导入元数据,请使用 metadataJobs.create 方法

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT_NUMBER:您的 Google Cloud 项目编号或项目 ID。
  • LOCATION_ID: Google Cloud 位置,例如 us-central1
  • METADATA_JOB_ID:可选。元数据作业 ID。
  • CLOUD_STORAGE_URI:包含元数据导入文件的 Cloud Storage 存储桶或文件夹的 URI。如需详细了解文件要求,请参阅元数据导入文件

  • ENTRY_GROUP:作业范围内的条目组的相对资源名称,格式为 projects/PROJECT_ID_OR_NUMBER/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID。仅提供一个条目组。如需了解详情,请参阅作业范围
  • ENTRY_TYPE:作业范围内的条目类型的相对资源名称,格式为 projects/PROJECT_ID_OR_NUMBER/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID。如需了解详情,请参阅作业范围

  • ASPECT_TYPE:作业范围内的切面类型的相对资源名称,格式为 projects/PROJECT_ID_OR_NUMBER/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID。创建完整条目同步作业时为可选,创建仅切面的作业时为必需。如需了解详情,请参阅作业范围
  • ENTRY_SYNC_MODE:条目同步模式,例如 FULLNONE。如需了解详情,请参阅同步模式
  • LOG_LEVEL:要捕获的日志级别,例如 INFODEBUG。如需了解详情,请参阅查看作业日志并排查问题

HTTP 方法和网址:

POST https://dataplex.googleapis.com/v1/projects/PROJECT_NUMBER/locations/LOCATION_ID/metadataJobs?metadataJobId=METADATA_JOB_ID

请求 JSON 正文:

{
  "type": IMPORT,
  "import_spec": {
    "source_storage_uri": "gs://CLOUD_STORAGE_URI/",
    "scope": {
      "entryGroups": [
        "ENTRY_GROUP"
      ],
      "entry_types": [
        "ENTRY_TYPE"
      ],
      "aspect_types": [
        "ASPECT_TYPE"
      ]
    },
    "entry_sync_mode": ENTRY_SYNC_MODE,
    "aspect_sync_mode": INCREMENTAL,
    "log_level": LOG_LEVEL
  }
}

如需发送您的请求,请展开以下选项之一:

响应会识别长时间运行的操作。

获取有关元数据作业的详细信息

如需获取有关元数据作业的信息,例如作业的状态和已修改的条目数,请按照以下步骤操作。如需详细了解如何排查失败的作业问题,请参阅本文档的查看作业日志并排查问题部分。

REST

如需获取有关元数据作业的信息,请使用 metadataJobs.get 方法

获取元数据作业列表

您可以获取最新的元数据作业列表。系统中会定期删除已达到终止状态的旧作业。

REST

如需获取最新的元数据作业列表,请使用 metadataJobs.list 方法

取消元数据作业

您可以取消不想运行的元数据作业。

REST

如需取消元数据作业,请使用 metadataJobs.cancel 方法

查看作业日志并排查问题

使用 Cloud Logging 查看元数据作业的日志。如需了解详情,请参阅监控 Dataplex Universal Catalog 日志

您可以在创建元数据作业时配置日志级别。以下日志级别可用:

  • INFO:在总体作业级别提供日志。包含有关导入项的汇总日志,但不指定哪个导入项存在错误。
  • DEBUG:为每个导入项提供详细日志。使用调试级日志记录来排查特定导入项的问题。例如,使用调试级日志记录来识别作业范围中缺少的资源、不符合相关条目类型或切面类型的条目或切面,或者元数据导入文件中的其他错误配置。

验证错误

Dataplex Universal Catalog 会根据项目中的当前元数据验证元数据导入文件。如果存在验证问题,作业状态可能会返回以下状态之一:

  • FAILED:当元数据导入文件存在错误时出现此状态。Dataplex Universal Catalog 不会导入任何元数据,并且作业会失败。元数据导入文件中的错误示例包括:
    • 该文件中的某个项无法解析为有效的导入项
    • 该文件中的条目或切面属于作业范围之外的条目组、条目类型或切面类型
    • 作业中多次指定了同一条目名称
    • 在切面映射或切面键中指定的切面类型不使用 PROJECT_ID_OR_NUMBER.LOCATION_ID.ASPECT_TYPE_ID@OPTIONAL_PATH 格式。
    • 必需的切面被标记为待删除
  • SUCCEEDED_WITH_ERRORS:当元数据导入文件可以成功解析,但导入文件中的项导致项目中的条目处于不一致状态时出现此状态。Dataplex Universal Catalog 会忽略此类条目,但会从文件中导入其余的元数据。

使用作业日志排查错误。

后续步骤