## Parameters

In [None]:
# Default parameters
CATEGORIES = ["Apparel", "Baby"]
FROM_DATE = "2015-08-28"
TO_DATE = "2015-08-30"
OUTPUT_LOCATION = "s3://airflow-my-test-bucket/query_output/"

## Create an external table for each input Category

In [None]:
from pyspark.sql import HiveContext
hc = HiveContext(sc)

for CATEGORY in CATEGORIES:
    sql = (
        f"CREATE EXTERNAL TABLE IF NOT EXISTS {CATEGORY}"
        "(review_id STRING,product_id STRING,product_title STRING,star_rating INT,verified_purchase STRING,review_date DATE)"
        f"STORED AS PARQUET LOCATION 's3://amazon-reviews-pds/parquet/product_category={CATEGORY}/'"
    )
    hc.sql(sql)

## Show available tables

In [None]:
hc.sql("show tables;").show()

## Perform query and save result to S3 for each category

In [None]:
for CATEGORY in CATEGORIES:
    sql = (
        f"SELECT product_title, AVG(star_rating), count(review_id) AS review_count FROM {CATEGORY} "
        f"WHERE review_date >= '{FROM_DATE}' AND review_date <= '{TO_DATE}' AND verified_purchase='Y' "
        "GROUP BY product_title "
        "ORDER BY SUM(star_rating) desc "
        "limit 20"
    )

    df = hc.sql(sql)
    output_folder = f"{OUTPUT_LOCATION}/{CATEGORY}_{FROM_DATE}_{TO_DATE}"
    df.write.csv(output_folder, mode="overwrite")
    print(f"Saved query result to {output_folder}")