Author: Josh Patterson
Date: May 24th, 2022
Other entries in this series:
In the previous article (Part 5) in this series, we validated that the best model did meet the business requirements set by the business unit.
In this article we put our pilot project into production with Snowpark to scalably deliver predicted machine failures to the maintenance teams each day.
Key Take Aways:
Other relevant articles on our blog:
In Part 4 of this series we built an XGBoost model and saved it as PMML. In this article we'll take the saved PMML model and run it inside the Snowflake cloud database to score data in the cloud. We'll then build a quick dashboard that displays the daily Top 18 most likely to fail machines.
With that out of the way, let's dig into Snowpark.
Snowpark is an API created by Snowflake that is based on data frames and provides its own classes and methods. These classes and methods can be used to assist in transferring and working with your data within the Snowflake database. In order to do this, the API provides language constructs for building SQL statements that will be used on the server. For example, rather than writing ‘select column_name’ as a string to select a column of data, you can simply use a select method provided by the Snowpark API. You may still use the string statement to work with your data, if you wish to do so, but by using the constructs Snowpark provides, you also get the added benefit of Snowpark’s intelligent code completion, which may help when first getting to know Snowpark’s syntax.
By using the Snowpark API, client applications can process data within the Snowflake database without needing to move their data to the same location as the client application. They can simply connect to the Snowpark API by adding the API libraries to their client code, which is great for those wanting to process large sets of data for machine learning or artificial intelligence projects.
Once you connect to the Snowpark API, you will be able to construct a Snowpark DataFrame to store the data you want to use for the project. Snowpark DataFrames are executed lazily, so Snowpark will only retrieve and operate on the defined range of data when you actively choose to run the client code. Lazy execution is a great perk of this service as it saves time and money when you want to use and pay for Snowflake’s resources on a smaller section of the data rather than the entire dataset. The DataFrame class also provides a set of methods you can use on the created DataFrame, giving the user the flexibility to use built-in functions or create their own user-defined functions (UDFs) to upload to Snowflake, which we will briefly cover below.
If you have a specific task you need to perform on your data that doesn’t have a Snowpark equivalent, the Snowpark API also supports the use of UDFs in the programming language of your choice to create custom functions you need for your project. Snowflake explains how to perform this process from a lambda or function in Scala in this section of their documentation. You don’t need to transfer the data back to your client from Snowflake. Simply write the function on your client, push the code to the server through Snowflake, and use it to work with your data in Snowflake.
Snowpark supports anonymous UDFs, where you can assign and call the function using a variable, and named UDFs, which uses a custom name.
The steps to use our model to score new data in a Snowflake database are:
If you'd like to try out the code for yourself check out the following repository:
git clone https://github.com/pattersonconsulting/predictive_maintenance.git
Now let's dig into how to use the Snowpark API to work with data in Snowflake.
Our functional goal is to use the model to make a machine failure prediction based on each row in the table SUMMARY_SENSOR_DATA_TEST
.
To do this we need to use the snowpark API.
We'll create a Snowpark session and a dataframe for the table SUMMARY_SENSOR_DATA_TEST
.
We then load our model via pmml4s
(PMML implementation for scala) to load our pmml model file.
Once we have our Snowpark table data represented by a snowpark dataframe, we can apply a UDF in a functional programming-style that makes a prediction for each record in the table.
The results are stored back in new table linking the machine ID to the failure score predicted by the model.
Previously we saved the XGBoost model as a pmml file. Let's now take that file and package it in a jar file so that snowflake can upload it as a dependency.
jar cfm pm_pmml_model.jar pmml/pm_sklearn_xgb.jar
Below we can see the main function of our scala code that will run inside Snowflake via Snowpark:
def main(args: Array[String]): Unit = { Console.println("\n=== Creating the session ===\n") val session = Session.builder.configFile("conn.properties").create val df_raw_device_data_test = session.table("SUMMARY_SENSOR_DATA_TEST") //.select(col()) df_raw_device_data_test.show() val libPath = new java.io.File("").getAbsolutePath println(libPath) session.addDependency(s"$libPath/lib/pmml4s_2.12-0.9.11.jar") session.addDependency(s"$libPath/lib/spray-json_2.12-1.3.5.jar") session.addDependency(s"$libPath/lib/scala-xml_2.12-1.2.0.jar") // have to wrap pmml in a jar for this to work session.addDependency(s"$libPath/lib/pm_pmml_model.jar") session.sql("drop function if exists model_PM_UDF_predict(DOUBLE);").show() session.udf.registerTemporary("model_PM_UDF_predict", UDFCode.predictFailureScoreUDF) session.sql("SHOW USER FUNCTIONS;").show() val df_machines_scored = df_raw_device_data_test.withColumn("FAILURE_SCORE", callUDF("model_PM_UDF_predict", col("TYPE"), col("AIR_TEMPERATURE"), col("PROCESS_TEMPERATURE"), col("ROTATIONAL_SPEED"), col("TORQUE"), col("TOOL_WEAR"))) df_machines_scored.show() df_machines_scored.write.mode(SaveMode.Overwrite).saveAsTable("DAILY_SCORED_MACHINES") Console.println("\n=== CLOSING the session ===\n") session.close(); }
The above code section represents a "typical" Snowpark API main()
entry point to drive your analysis code. In the following line, you can see where we register our specific UDF with the Snowpark API:
session.udf.registerTemporary("model_PM_UDF_predict", UDFCode.predictFailureScoreUDF)
Once we have this UDF registered with Snowpark, we can use it in our Snowpark API dataframe code:
val df_machines_scored = df_raw_device_data_test.withColumn("FAILURE_SCORE", callUDF("model_PM_UDF_predict", col("TYPE"), col("AIR_TEMPERATURE"), col("PROCESS_TEMPERATURE"), col("ROTATIONAL_SPEED"), col("TORQUE"), col("TOOL_WEAR")))
The callUDF("model_PM_UDF_predict...
part of the dataframe API calls our UDF code and passes in the column data. We can see our UDF code below.
object UDFCode extends Serializable { lazy val udf_model = { import java.io._ val resourceName = "/pmml/pm_sklearn_xgb.pmml" // this is the path of the pmml in the .jar val inputStream = classOf[com.snowflake.snowpark.DataFrame].getResourceAsStream(resourceName) Model.fromInputStream(inputStream) } val predictFailureScoreUDF = (TYPE_col: String, AIR_TEMPERATURE: Double, PROCESS_TEMPERATURE: Double, ROTATIONAL_SPEED: Double, TORQUE: Double, TOOL_WEAR: Double) => { udf_model.predict(Map("TYPE" -> TYPE_col, "AIR_TEMPERATURE" -> AIR_TEMPERATURE, "PROCESS_TEMPERATURE" -> PROCESS_TEMPERATURE, "ROTATIONAL_SPEED" -> ROTATIONAL_SPEED, "TORQUE" -> TORQUE, "TOOL_WEAR" -> TOOL_WEAR)).get("probability(1)").get.asInstanceOf[Double] } }
Our UDFCode
class needs to extend Serializable
and inside the class, we only want the UDF to instantiate the PMML model once, we we tag the udf_model
variable with the lazy
keyword.
As we can see, this function loads the pmml file from the jar class we packed it in for transport to Snowflake.
The method predictFailureScoreUDF
is where we call .predict()
on our pmml model, passing in the column data from the dataframe, and returning the output from the model as the output of the UDF.
Once we have our code the way we want it, we need to connect to Snowflake with the Snowpark API.
To connect to Snowflake via the Snowpark API with scala we need to create a session:
val session = Session.builder.configFile("conn.properties").create
If we take a look at the contents of the conn.properties
file we see:
URL = https://nna57244.us-east-1.snowflakecomputing.com USER = your_user_name PASSWORD = your_password # # Note: To use key-pair authentication instead of a password, # comment out (or remove) PASSWORD. Then, uncomment PRIVATE_KEY_FILE # and set it to the path to your private key file. # PRIVATE_KEY_FILE = # # If your private key is encrypted, you must also uncomment # PRIVATE_KEY_FILE_PWD and set it to the passphrase for decrypting the key. # "PRIVATE_KEY_FILE_PWD" -> "", # ROLE = SYSADMIN WAREHOUSE = ANALYTICS_WH DB = PREDICTIVE_MAINTENANCE SCHEMA = PUBLIC
There are multiple ways to run scala code locally and one of the most popular is SBT.
There are multiple languages supported for Snowpark, but if you are going to use SBT with scala then install snowpark as dependency in build.sbt. Your versions may vary, but below are the dependencies we used with this version of the code.
scalaVersion := "2.12.13" libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.2" libraryDependencies += "com.snowflake" % "snowpark" % "0.11.0" libraryDependencies += "org.pmml4s" %% "pmml4s" % "0.9.11"
Once the above code is compiled by SBT (or your favorite tool) and run locally, Snowpark connects to Snowflake and compiles the code down to SQL that Snowflake understands natively, as shown below.
Note the model_PM_UDF_predict
function being referenced inline in the SQL. This is referencing our scala UDF that was uploaded to stage for the snowflake table.
We can see the results of the snowpark UDF operation in the new table DAILY_SCORED_MACHINES
as shown below.
We'd want to run this table scan at the end of each workday to update the FAILURE_SCORE
field for each machine being tracked. This gives the maintenance crew a list of machines they need to focus on for their nightly maintenance run.
We should not have to rebuild the model for this population of machines unless new types of brands of machines were added to the manufacturing lines (potentially changing the distribution of the data). However, checking the distributions with some EDA work would not be that hard.
Let's now move on to providin the report on these specific 18 machines most likely to fail to the maintenance team.
The maintenance team just needs a simple daily report with a list of machine numbers to go service to execute the predictive maintenance program.
The notebook the data science team provides is shown below.
We can see the top 4 rows from the embedded notebook above in the table below:
UDI | FAILURE_SCORE |
---|---|
4463 | 0.988231 |
4418 | 0.985929 |
1017 | 0.984222 |
4422 | 0.984187 |
Our team can help -- we help companies with Snowflake analytics.
In this series we analyzed the core business case for ACME manufacturing and how predictive modeling affected the financial health of their business under different scenarios.
Further, we analyzed the raw data, built a predictive model, and deployed it to the cloud to provide the line of business with operational intelligence.
If you would like to get a free report on the potential value of a specific machine learning use for your business, please reach out and contact us.