Athena也可以用于做ETL。大数据场景经常会有这样的情况,数据湖里新来的数据是txt或csv格式,并不适合做列式查询。Athena可以使用Create Table As Select ( CTAS )
将其转成parquet格式,并且按分区保存到S3
默认情况下,CTAS的结果保存为parquet格式,当然也可以指定成 ORC
, AVRO
, JSON
, 或 TEXTFILE
;
默认情况下,CTAS的结果做GZIP压缩(ORC格式的做ZLIB压缩);
CTAS使用场景还包括:
query result location
,则保存在s3://<workgroup-query-results-location>/tables/<query-id>/
query result location
,则可以使用 WITH (external_location ='s3://location/prefix/')
语法来指定保存到哪个s3。注意这个目录一定要为空,如果不为空则CTAS不会往里面写数据。query result location
,又没使用external_location
,则写到s3://<client-query-results-location>/<Unsaved-or-query-name>/<year>/<month/<date>/tables/<query-id>/
在本实验中,我们将使用Create Table As Select ( CTAS )
从现有 csv 格式表创建新的 parquet 格式表,并进行压缩和分区。
从 Athena 查询编辑器中选择“Saved queries”,然后单击名为Athena_ctas_customer_sales
的查询:
看到它的SQL内容如下:
CREATE TABLE "customer_sales_ctas" WITH (
format = 'parquet',
external_location = 's3://athena-workshop-145197526627/basics/parquet/sales_ctas/',
partitioned_by = ARRAY [ 'year','month','day' ]
) AS
SELECT
c.customer_id,
c.card_id,
c.firstname,
c.lastname,
c.email,
c.birthday,
c.country,
price ,
product_id ,
timestamp ,
year(cast( timestamp as TIMESTAMP(3))) as "year" ,
month(cast( timestamp as TIMESTAMP(3))) as "month" ,
day(cast( timestamp as TIMESTAMP(3))) as "day"
FROM "sales_csv" s join "customers_csv" c on s.customer_id = c.customer_id;
Select
firstname,
lastname,
sum(cast(price as decimal(6,2))) as total_sales
from "customer_sales_ctas"
where
month in (12,1)
group by firstname,lastname
order by 1,2;
有2条查询语句。第一个查询将创建一个名为customer_sales_ctas的新表:
/basics/parquet/sales_ctas/
前缀中。选中第一个查询并单击 “Run” 按钮以创建新表。
选中第二个查询并单击 “Run” 按钮,这将查询新创建的表:
所以Athena可以作为数据湖的一员,使用Create Table As Select (CTAS)
查询向其中填充数据。