-
Notifications
You must be signed in to change notification settings - Fork 114
ElasticDL TF Transform Explore
Data preprocess is an important part before model training in the end-to-end ML pipeline. Consistency between offline and online is the key point. User can write the preprocess code once, run the same logic in batch mode in offline environment and real time mode in online environment. It can avoid the training/serving skew.
Both TensorFlow Transform and Feature Column can ensure the consistency.
TF Transform is a standalone stage before model training stage. The key preprocess logic is a user defined function: preprocessing_fn(inputs). The function will be traced into a TF Graph. The graph may contain some analyze node such as min(x), mean(y). These nodes will be convert to placeholder tensors at the first step. After analyzing the entire dataset and calculating the result of all the analyze node, then TF Transform will replace the placeholder tensor with the analyze result as a constant tensor. In the next step, We can use this TF Graph to transform the data records one by one. Finally we can get the transformed data set and transform graph in SavedModel format. Please check the official tutorial.
Feature columns are the bridge between input data and DNN model. They contain transform logic such as embedding, one-hot, cross and so on. Feature column list is wrapped with tf.keras.layers.DenseFeatures in the DNN model. We can export both the feature column and NN structure in the SavedModel.
Analyzer
Let's take scaling one column of dense data to [0, 1) for example.
import tensorflow as tf
def _scale_age_to_0_1(input_tensor):
min_age = 1
max_age = 100
return (input_tensor - min_age) / (max_age - min_age)
tf.feature_column.numeric_column('age', normalizer_fn=_scale_age_to_0_1)
We need define the min and max value of column age as constants in the code. Min and Max are the statistical value after scanning the entire dataset. It's common that we will refit the model using the latest data everyday. The statistic value varies in the data of different days. It's impractical to update these value in the code everyday for the daily job.
With TF Transform, we only use just one Api tft.scale_to_0_1. TF Transform will analyze the whole dataset at first, calculate min and max value. After getting the analysis result, it then transform the data.
import tensorflow_transform as tft
outputs['age'] = tft.scale_to_0_1(inputs['age'])
Inter Columns Calculation
From feature column Api doc, except cross_column, all the other columns execute the transform on only one column of data. We can't implement inter columns calculation using Feature Column Api just as follows:
column_new = column_a * column_b
Inside preprocess_fn(input), user can write any transform logic inside this function. Operations between two or more columns are naturally supported if it can be traced to TF Graph.
Execution Frequency
TF Transform is the preprocess stage before the training stage. The data will be preprocessed once and stored in a temporary storage (such as Odps table). And it will be reused by multiple training epoches and multipe training runs with different hyperparameters.
Feature Column is defined together with the DNN model. Its logic is executed inside the model training process. On each epoch, each hyperparameter combination and each minibatch, it will be executed.
The entire TF Transform process is a data processing pineline. TF Transform is integrated closely with Apache Beam and use it to describe the pipeline as a DAG. Apache Beam pipeline is runtime engine neutral and can be translated to the execution plan of different data process engines (such as DataFlow, Flink, Spark and so on).
- We need a data process engine maturally supporting Apache Beam.
- We can run python scripts to process the data in parallel on this engine.
Apache Beam and Flink Walkthrough Issues
Let's check the typical SQL expression for model training as follows. SELECT * From iris.train means retriving data from the data source. It's mapped to SQL query in database or Odps SQL in Odps table. COLUMN sepal_length, sepal_width is mapped to a feature_column array.
SELECT *
FROM iris.train
TO TRAIN DNNClassifier
WITH model.n_classes = 3, model.hidden_units = [10, 20]
COLUMN sepal_length, sepal_width, petal_length, petal_width
LABEL class
INTO sqlflow_models.my_dnn_model;
TF Transform process is a standalone stage between retrieving data and training model with feature columns. We need extend the SQLFlow synatx to fully express the TF Transform logic. The key of the transform process is a user defined function preprocess_fn. It's very flexible. We can define it in a python file and refer it using the TRANSFORM keyword in the SQL expression. Just as the expression below, iris_transformer is the name of the python file and preprocess is the user defined function.
SELECT *
FROM iris.train
TO TRAIN DNNClassifier
WITH model.n_classes = 3, model.hidden_units = [10, 20]
TRANSFORM iris_transformer.preprocess
COLUMN sepal_length, sepal_width, petal_length, petal_width
LABEL class
INTO sqlflow_models.my_dnn_model;
Q: Where is this python file defining preprocess function stored? A git repo?
From official tutorial example, TF Transform is integrated with Estimator. After training, we are calling estimator.export_saved_model(exported_model_dir, serving_input_fn) to export the trained model to SavedModel. Estimator will invoke serving_input_fn to reconstruct the transform graph and combine it with the model graph to build the entire inference graph. Please check the code snippet.
For TF2.0, we defines the model using keras and exports the model by tf.saved.saved_model. The inference graph only contains the feature column and DNN definition. tf.saved.saved_model Api doesn't have serving_input_fn parameter to combine the transform logic in the entire inference graph.
Please check the filed issue in TF Transform.
- The output columns from TF Transform are defined in the python code. How do we map them to the COLUMN expression in SQLFlow?