我们将使用第二章实验中使用的customer数据,这是一个包含约 100,000 条客户记录的数据集。
进入Athena的Query Editor, 将查询第 11 行中的 $accountid 替换为自己的 AWS 帐号, 并执行SQL:
CREATE TABLE customers_iceberg(
card_id bigint,
customer_id bigint,
lastname string,
firstname string,
email string,
address string,
birthday string,
country string)
PARTITIONED BY (country)
LOCATION 's3://athena-workshop-$accountid/iceberg/customers/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_target_data_file_size_bytes'='536870912'
)
将数据插入customers_iceberg表中。将下面的 sql 语句复制到 Athena 查询编辑器中,然后单击运行按钮:
insert into "customers_iceberg" select * from "default"."customers_parquet"
验证数据已加载到customers_iceberg表中。将以下语句复制并粘贴到 Athena 查询编辑器中,然后单击运行按钮:
SELECT * FROM "default"."customers_iceberg" limit 10;
进入到以 athena-workshop-
开头的存储桶,然后选择iceberg 文件夹,然后选择customers 文件夹。注意到它包含data文件夹和metadata文件夹:
Iceberg 格式允许在表中使用更新语句。
执行更新和删除是 OLTP 数据库的基本功能,将这些功能扩展到数据湖一直是人们的追求。随着 Iceberg 更新功能的发布,将允许执行行级更新,并保持源系统和目标系统中的数据一致。这使得数据湖具有事务性。
在CDC场景经常会有更新的需求。例如,customer_parquet表的源已对客户 10473 的email字段进行了更新,并且想要更新 S3 中表customers_iceberg相对应的数据。
首先查询记录:
SELECT * FROM "default"."customers_iceberg" where "customer_id" = 10473;
现在执行更新语句:
update "default"."customers_iceberg" set email='newvalue@someemail.com' where "customer_id" = 10473;
验证更新语句:
SELECT * FROM "default"."customers_iceberg" where "customer_id" = 10473;
Iceberg是如何实现Update功能的呢?
当我们更新一张Iceberg table
时,会创建一个新的snapshot,并原子性的更新metadata,让metadata指向新的snapshot。
删除是ACID属性中的重要操作之一,可以在其中实现删除记录以满足监管要求。 安全和隐私法规会禁止公司保留个人身份用户数据;无论数据所在的数据湖有多大,企业都必须能够及时找到并删除这些数据。例如,删除客户 ID 为10473 的特定客户的所有记录。
使用以下查询从 Iceberg 表中删除客户 ID 10473的行:
delete from "default"."customers_iceberg" where "customer_id" = 10473;
验证结果。将下面的查询复制并粘贴到 Athena 查询编辑器中,然后单击运行按钮。请注意,没有返回任何记录表明该客户已从基础表中删除:
SELECT * FROM "default"."customers_iceberg" where "customer_id" = 10473;