In many use cases Machine Learning models are built and applied over data that is stored and managed by Azure Data Explorer (ADX). Most ML models are built and deployed in two steps:
ML Training is a long and iterative process. Commonly, a model is developed by researchers/data scientists. They fetch the training data, clean it, engineer features, try different models and tune parameters, repeating this cycle until the ML model meets the required accuracy and robustness. To improve accuracy, they can:
Once the model is ready, it can be deployed to production for scoring.
ML Scoring is the process of applying the model on new data to get predictions/regressions. Scoring usually needs to be done with minimal latency (near real time) for batches of streamed data.
Azure Data Explorer (ADX) supports running inline Python scripts that are embedded in the KQL query. The Python code runs on the existing compute nodes of ADX, in distributed manner near the data. It can handle Data Frames containing many millions of records, partitioned and processed on multiple nodes. This optimized architecture results in great performance and minimal latency.
Specifically, for ML workloads, ADX can be used for both training and scoring:
Scoring can be done using the predict_fl() library function
Still in many scenarios training is done on Big Data systems, such as Spark/Databricks. Specifically, ML training on these systems is preferred in case that:
So we end up in a workflow that uses Spark/Databricks for training, and ADX for scoring. But the problem is that training on these Spark platforms is mostly done using the Spark ML framework, that is optimized for Spark architecture, but not supported by plain vanilla Python environment like ADX Python. So how can we still score in ADX?
We present a solution which is built from these steps:
In the following example we build a logistic regression model to predict room occupancy based on Occupancy Detection data, a public dataset from UCI Repository. This model is a binary classifier to predict occupied/empty room based on Temperature, Humidity, Light and CO2 sensors measurements. The example contains code snips from Databricks notebook showing for the full process of retrieving the data from ADX, building the model, convert it to ONNX and push it to ADX. Finally the KQL scoring query to be run using Kusto Explorer.
from pyspark.sql import SparkSession
pyKusto = SparkSession.builder.appName("kustoPySpark").getOrCreate()
cluster = 'https://demo11.westus.kusto.windows.net'
db = 'ML'
query = 'OccupancyDetection'
AppId = '***** Your App Id *****'
AppSecret = '***** Your App Secret *****'
AuthorityId = '***** Your Authority Id *****'
# Read the data from the kusto table with default reading mode
s_df = pyKusto.read. \
format("com.microsoft.kusto.spark.datasource"). \
option("kustoCluster", cluster). \
option("kustoDatabase", db). \
option("kustoQuery", query). \
option("kustoAadAppId", AppId). \
option("kustoAadAppSecret", AppSecret). \
option("kustoAadAuthorityID", AuthorityId). \
load()
s_df.take(4)
Out[37]: [Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 51), Temperature=23.18, Humidity=27.272, Light=426.0, CO2=721.25, HumidityRatio=0.004792988, Occupancy=True, Test=False),
Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 51), Temperature=23.15, Humidity=27.2675, Light=429.5, CO2=714.0, HumidityRatio=0.004783441, Occupancy=True, Test=False),
Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 53), Temperature=23.15, Humidity=27.245, Light=426.0, CO2=713.5, HumidityRatio=0.004779464, Occupancy=True, Test=False),
Row(Timestamp=datetime.datetime(2015, 2, 4, 17, 54), Temperature=23.15, Humidity=27.2, Light=426.0, CO2=708.25, HumidityRatio=0.004771509, Occupancy=True, Test=False)]
s_df.groupBy('Test', 'Occupancy').count().show()
+-----+---------+-----+
| Test|Occupancy|count|
+-----+---------+-----+
| true| false| 9396|
| true| true| 3021|
|false| false| 6414|
|false| true| 1729|
+-----+---------+-----+
# Prepare the input for the model
# Spark Logistic Regression estimator requires integer label so create it from the boolean Occupancy column
s_df = s_df.withColumn('Label', s_df['Occupancy'].cast('int'))
# Split to train & test sets
s_train = s_df.filter(s_df.Test == False)
s_test = s_df.filter(s_df.Test == True)
# Create the Logistic Regression model
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
# The Logistic Regression estimator expects the features in a single column so create it using VectorAssembler
features = ('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio')
assembler = VectorAssembler(inputCols=features,outputCol='Features')
s_train_features = assembler.transform(s_train)
s_train_features.take(4)
lr = LogisticRegression(labelCol='Label', featuresCol='Features',maxIter=10)
s_clf = lr.fit(s_train_features)
# Predict the training set
s_predict_train = s_clf.transform(s_train_features)
# Predict the testing set
s_test_features = assembler.transform(s_test)
s_predict_test = s_clf.transform(s_test_features)
s_predict_test.select(['Timestamp', 'Features', 'Label', 'prediction']).show(10)
+-------------------+--------------------+-----+----------+
| Timestamp| Features|Label|prediction|
+-------------------+--------------------+-----+----------+
|2015-02-02 14:19:00|[23.7,26.272,585....| 1| 1.0|
|2015-02-02 14:19:00|[23.718,26.29,578...| 1| 1.0|
|2015-02-02 14:21:00|[23.73,26.23,572....| 1| 1.0|
|2015-02-02 14:22:00|[23.7225,26.125,4...| 1| 1.0|
|2015-02-02 14:23:00|[23.754,26.2,488....| 1| 1.0|
|2015-02-02 14:23:00|[23.76,26.26,568....| 1| 1.0|
|2015-02-02 14:25:00|[23.73,26.29,536....| 1| 1.0|
|2015-02-02 14:25:00|[23.754,26.29,509...| 1| 1.0|
|2015-02-02 14:26:00|[23.754,26.35,476...| 1| 1.0|
|2015-02-02 14:28:00|[23.736,26.39,510...| 1| 1.0|
+-------------------+--------------------+-----+----------+
only showing top 10 rows
# Calculate accuracy on the testing set
import pyspark.sql.functions as F
check = s_predict_test.withColumn('correct', F.when(F.col('Label') == F.col('prediction'), 1).otherwise(0))
check.groupby('correct').count().show()
accuracy = check.filter(check['correct'] == 1).count()/check.count()*100
print(f'Accuracy: {accuracy}')
+-------+-----+
|correct|count|
+-------+-----+
| 1|12271|
| 0| 146|
+-------+-----+
Accuracy: 98.8241926391238
from onnxmltools import convert_sparkml
from onnxmltools.convert.sparkml.utils import FloatTensorType
initial_types = [('Features', FloatTensorType([None, 5]))]
onnx_model = convert_sparkml(s_clf, 'Occupancy detection Pyspark Logistic Regression model', initial_types, spark_session = pyKusto)
onnx_model
{'classlabels_ints': [0, 1],
'coefficients': [0.2995554662269534,
0.08678036676466962,
-0.01768699375517248,
-0.005589950773872156,
19.092004694715197,
-0.2995554662269534,
-0.08678036676466962,
0.01768699375517248,
0.005589950773872156,
-19.092004694715197],
'intercepts': [1.396631045353889, -1.396631045353889],
'multi_class': 1,
'name': 'LinearClassifier',
'post_transform': 'LOGISTIC'}
(full print trimmed here)
import datetime
import pandas as pd
smodel = onnx_model.SerializeToString().hex()
models_tbl = 'Databricks_Models'
model_name = 'Occupancy_Detection_LR'
# Create a DataFrame containing a single row with model name, training time and
# the serialized model, to be appended to the models table
now = datetime.datetime.now()
dfm = pd.DataFrame({'name':[model_name], 'timestamp':[now], 'model':[smodel]})
sdfm = spark.createDataFrame(dfm)
sdfm.show()
+--------------------+--------------------+--------------------+
| name| timestamp| model|
+--------------------+--------------------+--------------------+
|Occupancy_Detecti...|2021-01-26 19:02:...|0807120b4f6e6e784...|
+--------------------+--------------------+--------------------+
# Write the model to Kusto
sdfm.write.format("com.microsoft.kusto.spark.datasource"). \
option("kustoCluster", cluster). \
option("kustoDatabase", db). \
option("kustoAadAppId", AppId). \
option("kustoAadAppSecret", AppSecret). \
option("kustoAadAuthorityID", AuthorityId). \
option("kustoTable", models_tbl). \
mode("Append"). \
save()
Is done by calling predict_onnx_fl() You can either install this function in your database, or call it in ad-hoc manner:
let predict_onnx_fl=(samples:(*), models_tbl:(name:string, timestamp:datetime, model:string), model_name:string, features_cols:dynamic, pred_col:string)
{
let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
let kwargs = pack('smodel', model_str, 'features_cols', features_cols, 'pred_col', pred_col);
let code =
'\n'
'import binascii\n'
'\n'
'smodel = kargs["smodel"]\n'
'features_cols = kargs["features_cols"]\n'
'pred_col = kargs["pred_col"]\n'
'bmodel = binascii.unhexlify(smodel)\n'
'\n'
'features_cols = kargs["features_cols"]\n'
'pred_col = kargs["pred_col"]\n'
'\n'
'import onnxruntime as rt\n'
'sess = rt.InferenceSession(bmodel)\n'
'input_name = sess.get_inputs()[0].name\n'
'label_name = sess.get_outputs()[0].name\n'
'df1 = df[features_cols]\n'
'predictions = sess.run([label_name], {input_name: df1.values.astype(np.float32)})[0]\n'
'\n'
'result = df\n'
'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
'\n'
;
samples | evaluate python(typeof(*), code, kwargs)
};
//
OccupancyDetection
| where Test == 1
| extend pred_Occupancy=int(null)
| invoke predict_onnx_fl(Databricks_Models, 'Occupancy_Detection_LR', pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
| summarize correct = countif(Occupancy == pred_Occupancy), incorrect = countif(Occupancy != pred_Occupancy), total = count()
| extend accuracy = 100.0*correct/total
correct incorrect total accuracy
12271 146 12417 98.8241926391238
In this blog we presented how to train your ML model in Azure Databricks, and use it for scoring in ADX. This can be done by converting the trained model from Spark ML to ONNX, a common ML model exchange format, enabling it to be consumed for scoring by ADX python() plugin.
This workflow is common for ADX customers that are building Machine Learning algorithms by batch training using Spark/Databricks models on big data stored in the data lake. This new option to use this model for scoring directly on ADX is very appealing as it's fast, simple and free.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.