Athena CTAS

Athena也可以用于做ETL。大数据场景经常会有这样的情况,数据湖里新来的数据是txt或csv格式,并不适合做列式查询。Athena可以使用Create Table As Select ( CTAS )将其转成parquet格式,并且按分区保存到S3

默认情况下,CTAS的结果保存为parquet格式,当然也可以指定成 ORC, AVRO, JSON, 或 TEXTFILE

默认情况下,CTAS的结果做GZIP压缩(ORC格式的做ZLIB压缩);

CTAS使用场景还包括:

  • 如果原来的表很大,每次都要查询所有数据,可以使用CTAS选择一部分数据出来到新表,对新表查询,减少查询的数据量。

CTAS的文件保存位置

  • 如果workgroup里指定了query result location,则保存在s3://<workgroup-query-results-location>/tables/<query-id>/
  • 如果workgroup里没指定query result location,则可以使用 WITH (external_location ='s3://location/prefix/') 语法来指定保存到哪个s3。注意这个目录一定要为空,如果不为空则CTAS不会往里面写数据。
  • 如果workgroup里没指定query result location,又没使用external_location,则写到s3://<client-query-results-location>/<Unsaved-or-query-name>/<year>/<month/<date>/tables/<query-id>/

在本实验中,我们将使用Create Table As Select ( CTAS )从现有 csv 格式表创建新的 parquet 格式表,并进行压缩和分区。

从 Athena 查询编辑器中选择“Saved queries”,然后单击名为Athena_ctas_customer_sales的查询:

img

看到它的SQL内容如下:

CREATE TABLE "customer_sales_ctas" WITH (
format = 'parquet',
external_location = 's3://athena-workshop-145197526627/basics/parquet/sales_ctas/',
partitioned_by = ARRAY [ 'year','month','day' ]
) AS
SELECT 
  c.customer_id,
  c.card_id,
  c.firstname,
  c.lastname,
  c.email,
  c.birthday,
  c.country,
  price , 
  product_id , 
  timestamp ,
  year(cast( timestamp as TIMESTAMP(3)))  as "year" ,
month(cast( timestamp as TIMESTAMP(3))) as "month" ,
day(cast( timestamp as TIMESTAMP(3)))  as "day" 
FROM "sales_csv" s join "customers_csv" c on s.customer_id = c.customer_id;

Select 
    firstname,
    lastname,
    sum(cast(price as decimal(6,2))) as total_sales
from "customer_sales_ctas"
where 
    month in (12,1)
group by firstname,lastname
order by 1,2;

有2条查询语句。第一个查询将创建一个名为customer_sales_ctas的新表:

  • 格式设置为 parquet
  • 我们将文件存储在外部位置, s3 存储桶位于 /basics/parquet/sales_ctas/前缀中。
  • 数据将按年、月、日分区
  • 我们使用 SELECT 查询从现有表中选择数据来填充新表。

选中第一个查询并单击 “Run” 按钮以创建新表。

选中第二个查询并单击 “Run” 按钮,这将查询新创建的表:

image-20231130100819735

所以Athena可以作为数据湖的一员,使用Create Table As Select (CTAS)查询向其中填充数据。