在 AWS 上构建一个伪匿名化服务以保护敏感数据:第二部分 大数据博客
在 AWS 上构建一个伪匿名化服务以保护敏感数据:第二部分
关键要点
在这一部分中,我们将讨论如何利用 AWS 上的伪匿名化服务来处理敏感数据,包括 ETL 作业的集成、批处理和流处理方案以及性能评估。
在这两个部分的系列文章中,我们介绍了如何构建一个伪匿名化服务,该服务将明文数据属性转换为伪匿名标识,反之亦然。集中式的伪匿名化服务提供了一种用于生成伪名的统一架构,从而使组织能够在所有平台上实现敏感数据的标准处理流程。这消除了开发团队和分析用户理解和实施各种合规要求所需的复杂性和专业知识,使他们能够专注于商业成果。
采用解耦服务的设计方法意味着组织在解决业务问题时并不偏向于使用任何特定技术。无论各个团队喜欢使用何种技术,都可以调用伪匿名化服务来处理敏感数据。
飞鸟加速器官方版在这篇文章中,我们将聚焦于可以使用伪匿名化服务的常见提取、转换和加载ETL消费模式。我们将讨论如何在 Amazon EMR 的 ETL 作业中使用伪匿名化服务包含Amazon EMR 在 EC2 上的使用,以支持流处理和批处理用例。此外,您还可以在解决方案的 GitHub 仓库 中找到基于 Amazon Athena 和 AWS Glue 的消费模式。
解决方案概述
下图描述了该解决方案的架构:
右侧账户托管伪匿名化服务,您可以根据本系列文章第一部分提供的说明进行部署。
左侧账户则是您在本文中设置的账户,代表基于 Amazon EMR 的 ETL 平台,使用伪匿名化服务。
您可以在同一账户中部署伪匿名化服务和 ETL 平台。
Amazon EMR 可以快速且经济高效地创建、操作和扩展诸如 Apache Spark 的大数据框架。
在此方案中,我们展示了如何在 Amazon EMR 中使用 Apache Spark 消费伪匿名化服务,以适应批处理和流处理用例。批处理应用程序从 Amazon Simple Storage Service (Amazon S3) 存储桶读取数据,而流处理应用程序则从 Amazon Kinesis Data Streams 消费记录。
在批处理和流处理工作中使用的 PySpark 代码
两个应用程序均使用一个通用的实用函数,该函数将 HTTP POST 调用发送到连接到伪匿名化 AWS Lambda 函数的 API Gateway。REST API 调用是针对每个 Spark 分区使用 Spark RDD 的 mapPartitions 函数完成的。POST 请求的主体包含给定输入列的唯一值列表。POST 请求的响应包含对应的伪匿名值。该代码将给定数据集的敏感值与已伪匿名化的值进行替换。结果将保存到 Amazon S3 和 AWS Glue 数据目录中,采用 Apache Iceberg 表格格式。
Iceberg 是支持 ACID 事务、模式演化和时间旅行查询的开放表格格式。您可以利用这些功能使用 SQL 语句或编程接口实现 被遗忘权或数据擦除解决方案。Iceberg 从版本 650 开始支持 Amazon EMR、AWS Glue 和 Athena。批处理和流处理模式使用 Iceberg 作为其目标格式。有关如何使用 Iceberg 构建合规的 ACID 数据湖的概述,请参阅使用 Apache Iceberg 在 Amazon EMR 上构建高性能、ACID 兼容、不断演化的数据湖。
先决条件
您需要具备以下先决条件:
一个 AWS 账户。具有部署 AWS CloudFormation 堆栈和相关资源的权限的 AWS 身份和访问管理 (IAM) 主体。在您将要运行提供的脚本的开发或部署机器上安装的 AWS 命令行接口 (AWS CLI)。在同一账户和 AWS 区域内存储解决方案的 Amazon S3 存储桶。在运行命令的本地机器上安装 Python3。使用 pip 安装 PyYAML。用于在 bash 脚本中运行部署 CloudFormation 堆栈的 bash 终端。另一个 S3 存储桶,其中包含 Parquet 文件格式的输入数据集仅用于批处理应用程序。将示例数据集复制到该 S3 存储桶。使用 git clone 或下载选项获取的本地机器上的 最新代码库。打开新的 bash 终端并导航到克隆仓库的根文件夹。
所提议的模式的源代码可以在克隆的仓库中找到。它使用以下参数:
参数名描述ARTEFACTS3BUCKET用于存储基础设施代码的 S3 存储桶。该存储桶必须在与解决方案相同的账户和区域中创建。AWSREGION解决方案将要部署的区域。AWSPROFILE将应用于 AWS CLI 命令 的命名配置文件。该配置应包含具有权限的 IAM 主体凭据,以部署相关资源的 CloudFormation 堆栈。SUBNETIDEMR 集群启动的子网 ID。该子网为预先存在,演示时我们使用默认 VPC 的默认子网 ID。EPURL伪匿名化服务的端点 URL。从作为本系列 第一部分 部署的解决方案中获取。APISECRET将存储在 AWS Secrets Manager 中的 Amazon API Gateway 密钥。API 密钥从 第一部分 的部署中生成。S3INPUTPATH指向包含 Parquet 文件的输入数据集文件夹的 S3 URI。KINESISDATASTREAMNAMECloudFormation 堆栈中部署的 Kinesis 数据流名称。BATCHSIZE每批数据推送到数据流的记录数。THREADSNUM在本地机器上用于将数据上传到数据流的并行线程数。更多线程对应更高的消息量。EMRCLUSTERID运行代码的 EMR 集群 ID该 EMR 集群由 CloudFormation 堆栈创建。STACKNAMECloudFormation 堆栈的名称,在部署脚本中指定。批处理部署步骤
如前所述的先决条件,在您部署解决方案之前,将 测试数据集 的 Parquet 文件上传到 Amazon S3。然后将包含文件的文件夹的 S3 路径作为参数 ltS3INPUTPATHgt 提供。
我们通过 AWS CloudFormation 创建解决方案资源。您可以通过运行 deploymentscripts 文件夹中的 deploy1sh 脚本来部署解决方案。
满足部署先决条件后,输入以下命令以部署解决方案:
bashsh /deploymentscripts/deploy1sh a ltARTEFACTS3BUCKETgt r ltAWSREGIONgt p ltAWSPROFILEgt s ltSUBNETIDgt e ltEPURLgt x ltAPISECRETgt i ltS3INPUTPATHgt
输出应类似于以下屏幕截图。
deploy1sh 脚本运行结束时,会打印出清理命令所需的参数。请确保记录下这些值。
测试批处理解决方案
在使用 deploy1sh 脚本部署的 CloudFormation 模板中,包含 Spark 批处理应用程序 的 EMR 步骤在 EMR 集群设置的最后添加。
要验证结果,请检查 CloudFormation 堆栈输出中的变量 SparkOutputLocation 所识别的 S3 存储桶。
您还可以使用 Athena 查询 数据库 blogbatchdb 中的表 pseudotable。
清理批处理资源
要销毁作为本练习的一部分创建的资源,
在 bash 终端中,导航到克隆仓库的根文件夹。输入清理命令,此命令已在之前运行的 deploy1sh 脚本的输出中提供:
bashsh /deploymentscripts/cleanup1sh a ltARTEFACTS3BUCKETgt s ltSTACKNAMEgt r ltAWSREGIONgt e ltEMRCLUSTERIDgt
输出应类似于以下屏幕截图。
流处理部署步骤
我们通过 AWS CloudFormation 创建解决方案资源。您可以通过运行 deploymentscripts 文件夹中的 deploy2sh 脚本来部署解决方案。此模式的 CloudFormation 堆栈模板可在 GitHub 仓库 中找到。
满足部署先决条件后,输入以下命令以部署解决方案:
bashsh deploymentscripts/deploy2sh a ltARTEFACTS3BUCKETgt r ltAWSREGIONgt p ltAWSPROFILEgt s ltSUBNETIDgt e ltEPURLgt x ltAPISECRETgt
输出应类似于以下屏幕截图。
deploy2sh 脚本输出结束时打印出清理命令所需的参数。请确保保存这些值以备后用。
测试流处理解决方案
在使用 deploy2sh 脚本部署的 CloudFormation 模板中,包含 Spark 流处理应用程序 的 EMR 步骤在 EMR 集群设置的最后添加。要测试端到端管道,您需要将记录推送到已部署的 Kinesis 数据流。使用以下命令在 bash 终端中,您可以激活一个 Kinesis 生产者,该生产者将持续将记录放入数据流,直到手动停止该过程。您可以通过修改 BATCHSIZE 和 THREADSNUM 变量来控制生产者的消息量。
bashpython3 m pip install kinerpython3 consumptionpatterns/emr/1pysparkstreaming/kinesisproducer/producerpy ltKINESISDATASTREAMNAMEgt ltBATCHSIZEgt ltTHREADSNUMgt
要验证结果,请检查 CloudFormation 堆栈输出中标识的 S3 存储桶,变量为 SparkOutputLocation。
在 Athena 查询编辑器中,通过查询数据库 blogstreamdb 中的表 pseudotable 来检查结果。

清理流处理资源
要销毁作为本练习的一部分创建的资源,请完成以下步骤:
停止在 bash 终端中启动的 Python Kinesis 生产者。输入以下命令:bashsh /deploymentscripts/cleanup2sh a ltARTEFACTS3BUCKETgt s ltSTACKNAMEgt r ltAWSREGIONgt e ltEMRCLUSTERIDgt
输出应类似于以下屏幕截图。
性能细节
用例在数据大小、计算能力和成本方面可能有不同的要求。我们提供了一些基准和可能影响性能的因素;但是,我们强烈建议您在较低的环境中验证该解决方案,以查看它是否满足您的特定需求。
您可以通过最大并行调用伪匿名化服务的数量和每次调用的有效负载大小来影响建议的解决方案性能。就并行调用而言,需要考虑 Secrets Manager 的 GetSecretValue 调用限制每秒 10000 次,硬限制和 Lambda 的默认并发性默认 1000;可以通过配额请求增加。您可以通过调整执行者数量、构成数据集的分区数量,以及集群配置节点数量和类型来控制最大并发性。就每次