将 Delta 表从 Azure 数据湖存储迁移到 Amazon S3,使用 AWS Glue 大数
从 Azure 数据湖存储迁移 Delta 表到 Amazon S3 的 AWS Glue 解决方案
关键要点
本文介绍了如何将 Microsoft Azure 上的 Delta 表迁移到 Amazon S3,使用 AWS Glue 提供的服务。数据工程师可以利用 Azure Data Lake Storage Connector for AWS Glue 进行数据提取,简化了跨云的数据访问。通过配置 AWS Glue ETL 作业,可以安全高效地将数据迁移到 Amazon S3,并使用 Amazon Athena 进行查询。组织越来越多地采用多云策略来运行其生产工作负载。我们经常收到客户的请求,这些客户在 Microsoft Azure 上建立了数据湖,想要将数据访问扩展到 AWS 服务。客户希望能够利用 AWS 的各种分析、数据、人工智能AI和机器学习ML服务,如 AWS Glue、Amazon Redshift 和 Amazon SageMaker,以构建更加高效、性能更优的数据解决方案,从而利用各个云服务提供商的优势满足其商业需求。
在这种情况下,数据工程师面临着从 Microsoft Azure 存储容器中连接和提取数据的挑战。客户通常将 Azure 数据湖存储 Gen2ADLS Gen2作为其数据湖存储介质,并以 Delta 表等开放表格式存储数据,期望利用 AWS 的分析服务如 AWS Glue读取 Delta 表。AWS Glue 能够使用 Apache Spark 处理数据,并与多种数据源连接,是解决跨多个云环境访问数据挑战的合适方案。
Azure 数据湖存储连接器
Azure 数据湖存储连接器为 AWS Glue 提供了简单的方式,以便提取 ADLS Gen2 中的数据。此连接器使用 Hadoop 的 FileSystem 接口 和 Hadoop 的 ADLS Gen2 连接器。连接器包含的 hadoopazure 模块 允许您在 ADLS 中直接运行 Apache Hadoop 或 Apache Spark 作业。当连接器添加到 AWS Glue 环境时,AWS Glue 会在初始化期间从 Amazon Elastic Container Registry 加载库。当 AWS Glue 有互联网访问时,Spark 作业可以读取和写入 ADLS。
在 AWS Marketplace 上提供 Azure 数据湖存储连接器后,AWS Glue 连接确保您在 AWS Glue 作业中有使用所需的软件包。
在本篇中,我们使用 共享密钥 认证方式。
解决方案概述
在本篇中,我们的目标是将名为 sampledeltatable 的产品表当前位于 ADLS Gen2 中迁移到 Amazon S3。为此,我们使用 AWS Glue、Azure 数据湖存储连接器以及 AWS Secrets Manager 安全存储 Azure 共享密钥。我们配置了一个 AWS Glue 无服务器 ETL 作业,利用此连接器通过公共互联网的共享密钥认证与 ADLS 建立连接。将表迁移到 Amazon S3 后,我们使用 Amazon Athena 查询 Delta Lake 表。
以下架构图展示了 AWS Glue 如何促进 ADLS 的数据摄取。
前提条件
您需要以下前提条件:
在 Microsoft Azure 上一个 存储账户 和您在 ADLS Gen2 中的数据路径。请提前准备存储账户凭证。有关说明,请参阅 创建存储账户共享密钥。一个 AWS 身份和访问管理IAM角色,适用于 AWS Glue 作业,并具有以下策略:AWSGlueServiceRole,允许 AWS Glue 服务角色访问相关服务。AmazonEC2ContainerRegistryReadOnly,提供对 Amazon EC2 Container Registry 存储库的只读访问。这一策略用于使用 AWS Marketplace 连接器库。一项 Secrets Manager 策略,提供对 Secrets Manager 中密钥的读取访问权限。需要加载 ETL 数据的 S3 桶的 S3 桶策略。
在 Secrets Manager 中配置 ADLS Gen2 帐户
以下步骤将帮助您在 Secrets Manager 中创建一个密钥以存储 ADLS 凭证:
在 Secrets Manager 控制台,选择 存储新密钥。对于 密钥类型,选择 其他类型密钥。输入键 accountName,用于 ADLS Gen2 存储账户名称。输入键 accountKey,用于 ADLS Gen2 存储账户密钥。输入键 container,用于 ADLS Gen2 容器。将其余选项保持默认,然后选择 下一步。输入密钥名称例如,adlstoragecredentials。选择 下一步。完成剩余步骤,以存储密钥。订阅 Azure 数据湖存储连接器
Azure 数据湖存储连接器简化了连接 AWS Glue 作业以提取 ADLS Gen2 中数据的过程。此连接器作为 AWS Marketplace 产品提供。
请完成以下步骤以订阅该连接器:
使用必要权限登录您的 AWS 账户。导航到 Azure 数据湖存储连接器的 AWS Marketplace 页面。选择 继续订阅。在阅读用户许可协议EULA后,选择 继续配置。对于 履行选项,选择 Glue 40。对于 软件版本,选择最新的软件版本。选择 继续启动。在 AWS Glue 中创建自定义连接
在您订阅连接器后,请完成以下步骤以基于它创建 AWS Glue 连接。此连接将被添加到 AWS Glue 作业中,以确保连接器可用,并且数据存储连接信息可访问,从而建立网络路径。
要创建 AWS Glue 连接,您需要在 AWS Glue Studio 控制台上启用 Azure 数据湖存储连接器。完成前面的步骤中的 继续启动 后,您将重定向到连接器着陆页面。
在 配置详情 部分,选择 使用说明。选择 从 AWS Glue Studio 激活连接器。AWS Glue Studio 控制台提供选项以立即启用连接器或者同时激活并创建此连接。对于本篇,我们选择第二种选项。
对于 连接器,确认选择 Azure ADLS Connector for AWS Glue 40。对于 名称,输入连接名称例如,AzureADLSStorageGen2Connection。输入可选描述。选择 创建连接并激活连接器。连接现在已准备好使用。连接器和连接信息在 AWS Glue 控制台的 数据连接 页面上可见。
使用 AWS Glue ETL 作业读取 Delta 表
完成以下步骤以创建 AWS Glue 作业并配置 AWS Glue 连接和作业参数选项:
在 AWS Glue 控制台,选择导航窗格中的 ETL 作业。选择 使用脚本编辑器编写代码,并选择 脚本编辑器。选择 创建脚本,然后转到 作业详细信息 部分。更新 名称 和 IAM 角色 设置。在 高级属性 下,添加之前创建的 AWS Glue 连接 AzureADLSStorageGen2Connection。 梯子npv对于 作业参数,添加键 datalakeformats,值设置为 delta。 使用以下脚本从 ADLS 中读取 Delta 表,并提供 Delta 表文件所在的 Azure 存储账户容器路径及写入 S3 的 Delta 文件的 S3 桶。pythonfrom pysparksql import SparkSessionfrom deltatables import import boto3import json
spark = SparkSessionbuildergetOrCreate()
sm = boto3client(secretsmanager)response = smgetsecretvalue(SecretId=adlstoragecredentials)value = jsonloads(response[SecretString])accountnamesparkconfig = ffsazureaccountkey{value[accountName]}dfscorewindowsnetaccountname = value[accountName]accountkey = value[accountKey]containername = value[container]path = fabfss//{containername}@{accountname}dfscorewindowsnet/pathtodeltatablefiles/s3DeltaTablePath = s3//yourdatalakebucketname/deltatablepath/
方法:共享密钥
sparkconfset(accountnamesparkconfig accountkey)
从 ADLS gen2 存储读取 Delta 表
df = sparkreadformat(delta)load(path)
将 Delta 表写入 S3 路径。
if DeltaTableisDeltaTable(spark s3DeltaTablePath) s3deltaTable = DeltaTableforPath(spark s3DeltaTablePath) print(合并到现有的 S3 Delta 表) (s3deltaTablealias(target) merge(dfalias(source) targetproductid = sourceproductid) whenMatchedUpdateAll() whenNotMatchedInsertAll() execute() )else print(创建新的 S3 Delta 表。) dfwriteformat(delta)save(s3DeltaTablePath)
选择 运行 启动作业。在 运行 选项卡中确认作业运行成功。 在 Amazon S3 控制台上验证 S3 桶中的 Delta 文件Delta 表路径。 在 Athena 中创建数据库和表以查询迁移到 Amazon S3 的 Delta 表。您可以使用 AWS Glue 爬虫自动爬取存储在 Amazon S3 中的 Delta 表,并在 AWS Glue 数据目录中创建必要的元数据。然后,Athena 可以使用这些元数据无缝查询和分析 Delta 表。有关更多信息,请参阅 使用 AWS Glue 爬虫爬取 Delta Lake 表。
sqlCREATE DATABASE deltadb
CREATE EXTERNAL TABLE deltadbsampledeltatableLOCATION s3//yourdatalakebucketname/deltatablepath/TBLPROPERTIES (tabletype=DELTA)
查询 Delta 表:sqlSELECT FROM deltadbsampledeltatable LIMIT 10
通过遵循本文中的步骤,您已成功将 Delta 表从 ADLS Gen2 迁移到 Amazon S3,使用 AWS Glue ETL 作业进行处理。
在 AWS Glue 笔记本中读取 Delta 表
如果您想从 AWS Glue 笔记本中读取 ADLS Gen2 的 Delta 表,可以按照以下可选步骤进行操作:
创建一个笔记本,并在第一个单元中运行以下代码以配置 AWS Glue 连接和 datalakeformats:pythonidletimeout 30glueversion 40workertype G1Xnumberofworkers 5connections AzureADLSStorageGen2Connectionconfigure{ datalakeformats delta}
在新单元中运行以下代码以读取存储在 ADLS Gen2 的 Delta 表。提供 Delta 文件所在 Azure 存储账户容器的路径,以及要写入 Amazon S3 的 Delta 文件的 S3 桶。pythonfrom pysparksql import SparkSessionfrom deltatables import import boto3import json
spark = SparkSessionbuildergetOrCreate()
sm = boto3client(secretsmanager)response = smgetsecretvalue(SecretId=adlstoragecredentials)value = jsonloads(response[SecretString])accountnamesparkconfig = ffsazureaccountkey{value[accountName]}dfscorewindowsnetaccountname = value[accountName]accountkey = value[accountKey]containername = value[container]path = fabfss//{containername}@{accountname}dfscorewindowsnet/pathtodeltatablefiles/s3DeltaTablePath = s3//yourdatalakebucketname/deltatablepath/
方法:共享密钥
sparkconfset(accountnamesparkconfig accountkey)
从 ADLS gen2 存储读取 Delta 表
df = sparkreadformat(delta)load(path)
将 Delta 表写入 S3 路径。
if DeltaTableisDeltaTable(spark s3DeltaTablePath) s3deltaTable = DeltaTableforPath(spark s3DeltaTablePath) print(合并到现有的 S3 Delta 表) (s3deltaTablealias(target) merge(dfalias(source) targetproductid = sourceproductid) whenMatchedUpdateAll() whenNotMatchedInsertAll() execute() )else print(创建新的 S3 Delta 表。) dfwriteformat(delta)save(s3DeltaTablePath)
清理
若要清理资源,请完成以下步骤:
删除 AWS Glue 作业、数据库、表和连接:在 AWS