This notebook demonstrates accessing Redshift datasets defined in the Glue Data Catalog data from a SageMaker notebook.
Access occurs via:
In order to set this up, follow the instructions in the accompanying blog post, here [[TO DO - INSERT LINK TO BLOG POST ]]
This Jupyter notebook is written to run on a SageMaker notebook instance. It uses SparkMagic (PySpark) to access Apache Spark, running on Amazon EMR.
The EMR cluster runs Spark and Apache Livy, and must be set up to use the AWS Glue Data Store for its Hive metastore.
In addition, the SageMaker notebook instance must be configured to access Livy.
This configuration is established in the accompanying blog post [[UPDATE WITH NAME AND LINK]].
Now:
Then, print out the list of Livy magic commands.
%%help
The following cell prints out info about the current Spark sessions.
%%info
There may be "no active sessions" so far. If so, that's ok, you'll start one below. If you or someone else has run other sessions recently, you may receive a list of existing sessions.
If you receive an error such as "Error sending http request and maximum retry encountered", try restarting the kernel, then re-executing these cells.
If you continue to see an error, check the configuration. Use a terminal to check that .sparkmagic/config.json on this SageMaker notebook instance has the correct IP address and port for the EMR cluster. Check that the EMR cluster has started correctly. Check that the EMR cluster's port is open and accessible to the SageMaker instance. Try accessing your EMR cluster on the Livy port (default 8998); you should see the Livy welcome page. Remember to restart the kernel after every change!
Once this cell executes correctly, move on!
The next cell should start a Spark session, and an application if needed.
It'll also print out the IP address of the EMR cluster's master, and current setting of environment variables.
import os
import platform
# Print some characteristics of the remote system
print(platform.node())
print(platform.platform(aliased=0, terse=0))
print("Spark home currently set to", os.environ.get('SPARK_HOME', None))
After running the previous cell (it may take a minute or three to complete), you should see a message "SparkSession available as 'spark'."
If you don't: restart the Kernel for this notebook, and try running the above cells again.
Now, run a basic Spark function. This cell shows how you can execute Spark parallelized functions on your EMR cluster.
# Run a basic Spark function, to show that's working
# NOTE: This cell will likely throw an error the first time you run it ('TypeError: object of type 'NoneType' has no len()').
# This is due to a bug that showed up after 2.3.1 -> 2.3.2 upgrade of Spark.
# It's also reported here: http://mail-archives.apache.org/mod_mbox/spark-issues/201811.mbox/%3CJIRA.13197858.1542041679000.391200.1542675600612@Atlassian.JIRA%3E
# and here: http://mail-archives.apache.org/mod_mbox/livy-issues/201811.mbox/<JIRA.13199283.1542622344000.385221.1542622380848@Atlassian.JIRA>
print("Spark version is: " + sc.version)
sc.parallelize(range(1000)).count()
# Run a basic Spark function, to show that's working (and this time, it really should!)
print("Spark version is: " + sc.version)
sc.parallelize(range(1000)).count()
Now, you can see the logs from this Spark task. This capability allows us to debug our Spark tasks without leaving the notebook environment.
%%logs
Let's switch to the topic at hand.
The following examples run against the Glue 'glueredsage' database (or, the Glue database name you gave when executing the CloudFormation template). If you have not already done so, follow the instructions in the Glue tutorial under “Crawling the Sample Data Used in the Tutorials” (https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-prerequisites.html#dev-endpoint-tutorial-prerequisites-crawl-data ), as outlined in the blog post instructions. You can stop once you’ve completed the steps to create the crawler, and can see the tables created by the crawler in your Data Catalog, containing metadata that the crawler retrieved.
First, list the databases in the Glue Data Catalog. Then, list the tables, and the columns for the table you're interested in.
%%local
######################################################################################################
# Update/Check the following variables with values for your environment
# -- should have been correctly updated by the CF stack during deployment
######################################################################################################
glueDatabase = "glueredsage"
region = "us-west-2"
%%local
!aws glue get-databases --output text | grep glueredsage
Set up a couple of helper functions that will call the Glue Data Catalog, and format the most relevant parts of the response for this task.
%%local
import boto3
# Helper functions, to retrieve Glue Data Catalog information
glue = boto3.client('glue', region_name=region)
# List the tables in the given Glue database
def get_glue_tables(gluedatabase):
# From Glue/Hive metastore, get the table info.
tbls = glue.get_tables(DatabaseName=gluedatabase)
i = 0
print('{:5s} {:20s} {:30s} {:20}'.format('', 'Database', 'TableName', 'TableType'))
for tbl in tbls['TableList']:
print('{:5s} {:20s} {:30s} {:20}'.format(str(i), gluedatabase, tbl['Name'], tbl['TableType']))
i += 1
# List the columns of the named Glue table
def get_glue_table(gluedatabase, gluetblnm):
# From Glue/Hive metastore, get the table info.
tbldef = glue.get_table(DatabaseName=gluedatabase, Name=gluetblnm)
print('Table: ' + gluedatabase + '.' + gluetblnm + ': ' + tbldef['Table']['TableType'])
if 'classification' in tbldef['Table']['Parameters'] and 'connectionName' in tbldef['Table']['Parameters']:
print('Classification: ' + tbldef['Table']['Parameters']['classification'] + '; connectionName: ' + tbldef['Table']['Parameters']['connectionName'])
else:
print('Classification / connection information not available.')
i = 0
print('{:5s} {:20s} {:20s} {:50s}'.format('','Column Name', 'Type', 'Comment'))
for col in tbldef['Table']['StorageDescriptor']['Columns']:
comment = ''
if 'Comment' in col:
comment = col['Comment']
print('{:5s} {:20s} {:20s} {:50s}'.format(str(i), col['Name'], col['Type'], comment))
i += 1
Call the first helper function, to list the tables in the Glue database you're interested in.
%%local
get_glue_tables(glueDatabase)
If you do not receive a list of the Redshift tables from the above command, check that you've run the Glue-Redshift crawler. Use the AWS Glue console to check that there are, in fact, tables in that database.
For a chosen table, list the columns.
%%local
tblnme = 'dev_public_category'
get_glue_table(glueDatabase, tblnme)
You can see that this table has a classification of "redshift", and it's connection (GlueRedshiftConnection). So, to access the data, you'll need to read the data from Redshift. The GlueRedshiftConnection will contain the information you need to connect to Redshift.
Now, you wish to retrieve a table (or, part of a table, or the results of a SQL query), for futher specialized processing. Generally speaking, you want to do as much as you can with the data in place, and only retrieve the subset of data you need to perform actions on that cannot be performed in SQL.
If the data is small, you can retrieve directly into your notebook. But since this data is likely large, you'll retrieve it into a data frame in Spark on EMR so you can process it further there.
First, set up the variables on the remote cluster. Note that these variables are separate from those used in your local environment, and that the variables are not communicated between the two environments.
######################################################################################################
# Update/Check the following variables with values for your environment
# -- should have been correctly updated by the CF stack during deployment
######################################################################################################
# Redshift IAM Copy Role, attached to the Redshift cluster you're trying to access.
iamcopyrole = 'arn:aws:iam::121969496650:role/glueredsage-RedshiftStack-LNHY-RedshiftIamCopyRole-T6TD448EYHR9'
glueDatabase = "glueredsage"
# Tablename here is the Glue table
# NOTE that they can't have "-" in them; it's ok by Glue, but not for Hive
tblname = "dev_public_category"
region = "us-west-2"
s3Bucket = "glueredsage-rs3bucket-ncmb8x3vtvly"
# Temp directory; should have lifecycle policy to delete temp files "frequently".
# Remember to use the format: "s3a://<bucket>/<prefix>/"
tempS3Dir = "s3a://" + s3Bucket + "/temp/"
Next, set up helper functions to retrieve Redshift connection information from the Glue Data Catalog, and to read the table from Redshift into EMR.
import boto3
from pyspark.sql import SQLContext
sql_context = SQLContext(sc)
def get_redshift_connection_info(gluedatabase, gluetblname, awsregion):
# From Glue/Hive metastore, get the table info.
client = boto3.client('glue', region_name=awsregion)
tbl = client.get_table(DatabaseName=gluedatabase, Name=gluetblname)
parms = tbl['Table']['StorageDescriptor']['Parameters']
classn = parms['classification']
conn = parms['connectionName']
location = tbl['Table']['StorageDescriptor']['Location']
if classn <> "redshift":
print("This table is not a Redshift table; it is of type " + str(classn))
return None
# THEN: Get connection data.
response = client.get_connection(Name=conn)
connp = response["Connection"]["ConnectionProperties"]
url = connp["JDBC_CONNECTION_URL"] + "?user=" + connp["USERNAME"] + "&password=" + connp["PASSWORD"]
print("Connection info:", location, connp["JDBC_CONNECTION_URL"])
return (url, location)
def get_redshift_data(gluedatabase, gluetblname, awsregion, tempS3Dir, iamcopyrole):
(url, location) = get_redshift_connection_info(gluedatabase, gluetblname, awsregion)
tblbits = location.split('.')
rstblname = tblbits[1] + '.' + tblbits[2]
print("Getting dataframe:", gluetblname, "-->", rstblname)
# THEN finally, get the data.
df = sql_context.read.format("com.databricks.spark.redshift").option("aws_iam_role",iamcopyrole).option("tempdir", tempS3Dir).option("url", url).option("dbtable", rstblname).load()
return df
Next, this code retrieves the table from Redshift to S3, then into EMR, and finally into a data frame that you can access locally. This is all done for you by the Spark Redshift driver you're using here.
The "-o" parameter gives the data frame name. You can take a subset or sample of the rows, using the Livy -r, -m and -n options described in the Help above. As it's a large dataset, here you'll first retrieve a small subset to review. (It may take a few minutes to retrieve the data.)
%%spark -o category -n 25
# This next command gets the table into the dataframe 'users', on the Spark cluster
category = get_redshift_data(glueDatabase, tblname, region, tempS3Dir, iamcopyrole)
category.show(truncate=False)
You can also use a SQL to specify a subset of the data; or, to join, aggregate and filter data on the Redshift cluster. The helper function and small example below shows you how.
def get_redshift_query(url, tempS3Dir, sqlquery):
df = sql_context.read.format("com.databricks.spark.redshift").option("tempdir", tempS3Dir).option("url", url).option("query", sqlquery).load()
print("Getting dataframe:", sqlquery)
return df
Alternately, you can add SQL statements, to join, filter or aggregate the Redshift data prior to unloading it.
%%spark -o userevents
# First, get the connection information for Redshift, using a table that's stored in that cluster
(url, tblbits) = get_redshift_connection_info(glueDatabase, tblname, region)
sqlquery = """SELECT distinct u.userid, u.city, u.state,
NVL(u.likebroadway, false) as likebroadway, NVL(u.likeclassical, false) as likeclassical, NVL(u.likeconcerts, false) as likeconcerts,
NVL(u.likejazz, false) as likejazz, NVL(u.likemusicals, false) as likemusicals, NVL(u.likeopera, false) as likeopera, NVL(u.likerock, false) as likerock,
NVL(u.likesports, false) as likesports, NVL(u.liketheatre, false) as liketheatre, NVL(u.likevegas, false) as likevegas,
d.caldate, d.day, d.month, d.year, d.week, d.holiday,
s.pricepaid, s.qtysold, -- s.salesid, s.listid, s.saletime, s.sellerid, s.commission
e.eventname, -- e.venueid, e.catid, e.eventid,
c.catgroup, c.catname,
v.venuecity, v.venuename, v.venuestate, v.venueseats
FROM users u, sales s, event e, venue v, date d, category c
WHERE u.userid = s.buyerid and s.dateid = e.dateid and s.eventid = e.eventid and e.venueid = v.venueid
and e.dateid = d.dateid and e.catid = c.catid
"""
# Execute the query and retrieve data into the dataframe 'userevents', on the Spark cluster
userevents = get_redshift_query(url, tempS3Dir, sqlquery)
userevents.show(5)
Next, let's check the characteristics of the local system. Check the IP address printed below; it should be different from the IP address of the EMR system above.
%%local
# Print some characteristics of the local system
import platform
print(platform.machine())
print(platform.node())
print(platform.platform(aliased=0, terse=0))
# Print some local data from the OS: current path, and files in the current directory
!pwd
!ls
Next, show basic information about the query results, but displaying them the local system. Note that it's now a pandas dataframe.
%%local
userevents.info()
# show the top few rows
display(userevents.head())
# describe the data object
display(userevents.describe())
# Summarize the categorical field species
display(userevents.city.value_counts())
%%local
userevents.sample(5)
You can mix and match between using local and remote. The next cell transforms the dataframe on Spark, converting a categorical field into a one-hot vector for use by Spark ML libraries. Below, this field will be used to look at correlations between the user "likes" and the types of events they attended.
%%spark
import pyspark
from pyspark.sql import SparkSession
#from pyspark.ml import Pipeline
#from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler
#from pyspark.sql.functions import *
import pandas as pd
# city, state, day, eventname, venuecity, venuename, venuestate, catname are all text, categorical fields.
# Convert needed text fields to one-hot vectors as needed for downstream calcs
stringIndexer = StringIndexer(inputCol="catname", outputCol="catIndex")
model = stringIndexer.fit(userevents)
indexed = model.transform(userevents)
encoder = OneHotEncoder(inputCol="catIndex", outputCol="catVec", dropLast=False)
encoded = encoder.transform(indexed)
encoded.show()
%%spark
# Get/Save the mapping of categorical values used in the one-hot vector encoding for (what will become) columns
meta = [
f.metadata for f in indexed.schema.fields if f.name == "catIndex"
]
theCorrCols = meta[0]["ml_attr"]["vals"]
colValues = dict(enumerate(meta[0]["ml_attr"]["vals"]))
colValues
Note that although there were more categories available, only 4 are represented in the merged event data. Perhaps no events of the other types were attended? Something to research in the data! But for now, go ahead with just the available data. Extract the "likes" columns and the one-hot vector just built and transform into a feature vector.
%%spark
# Only take the relevant columns for this analysis
theLikesCols = ['likebroadway', 'likeclassical', 'likeconcerts', 'likejazz', 'likemusicals', 'likeopera', 'likerock',
'likesports', 'liketheatre', 'likevegas']
theLikesColsCat = ['likebroadway', 'likeclassical', 'likeconcerts', 'likejazz', 'likemusicals', 'likeopera', 'likerock',
'likesports', 'liketheatre', 'likevegas','catVec']
theLikes = encoded.select(theLikesColsCat)
assembler = VectorAssembler(
inputCols= theLikesColsCat,
outputCol='features')
output = assembler.transform(theLikes)
output.show(truncate=False)
Now, calculate correlations between the features.
%%spark
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
# Grab just the features; and calc the Pearson correlation matrix
features = output.select('features')
r1 = Correlation.corr(features, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))
%%spark -o corrmat
# Pull out the subsection of the correlation array of interest: Likes, vs Categories
corrArr = r1[0].toArray()
print(corrArr.shape)
corrs = corrArr[0:len(theLikesCols), len(theLikesCols):]
print(corrs)
corrs.shape
# Convert to dataframe, for use by Local
pdf = pd.DataFrame(corrs, columns=theCorrCols)
corrmat = spark.createDataFrame(pdf)
%%local
# Install a pip package - Seaborn - in the current Jupyter kernel, for use in plotting
import sys
!{sys.executable} -m pip install seaborn
Now, in the local notebook, retrieve the correlations (which is a small dataset), and plot the correlations as a heatmap.
%%local
import seaborn as sns
%matplotlib inline
# Must redefine this variable here, since Local and Spark do not share vars
theLikesCols = ['likebroadway', 'likeclassical', 'likeconcerts', 'likejazz', 'likemusicals', 'likeopera', 'likerock',
'likesports', 'liketheatre', 'likevegas']
#theCorrCols = ['catVec0', 'catVec1','catVec2']
# plot the heatmap
sns.heatmap(corrmat, annot=True, fmt="g", cmap='viridis',
yticklabels=theLikesCols)
# xticklabels=corr.columns)
You can see that, based on these ticket purchases and event attendances, the likes and event categories are only very weakly correlated (max correlation is 0.02). Though the correlations are weak, relatively speaking:
Now, you can move on to other analyses, such as looking at user locations vs event locations; holidays and weekends versus weekdays, and other correlations. These kinds of correlations can help understand the user base and help in developing a recommendation engine.
Lastly, cleanup all sessions from this notebook, if you're done. (Can also be useful in case there are other sessions hanging around using up resources.)
%%cleanup -f