-
Notifications
You must be signed in to change notification settings - Fork 114
ElasticDL TF Transform Explore
Data preprocessing is an important part before the model training in the end-to-end ML pipeline. Consistency between offline and online is the key point. User can write the preprocess code once, run the same logic in batch mode in offline environment and real time mode in online environment. It can avoid the training/serving skew.
Both TensorFlow Transform and Feature Column can ensure the consistency.
TF Transform is a standalone stage before model training stage. The key preprocess logic is a user defined function: preprocessing_fn(inputs). The function will be traced into a TF Graph. The graph may contain some analyze node such as min(x), mean(y). These nodes will be converted to placeholder tensors at the first step. After analyzing the entire dataset and calculating the result of all the analyze nodes, TF Transform will replace the placeholder tensor with the analyze result as a constant tensor. In the next step, we can use this TF Graph to transform the data records one by one. Finally, we can get the transformed data set and export the transform graph as the SavedModel format. Please check the official tutorial.
Feature columns are the bridges between input data and DNN model. They contain transform logic such as embedding, one-hot, cross and so on. Feature column list is wrapped with tf.keras.layers.DenseFeatures in the DNN model as the first layer. We can export both the feature column and NN structure in the SavedModel for inference.
There are some differences between these two and we will compare them in the following three aspects.
Let's take the operation of scaling one column of dense data to [0, 1) for example.
With Feature column, we need define the min and max value of column age as constants in the code. Min and Max are the statistical value after scanning the full dataset.
import tensorflow as tf
def _scale_age_to_0_1_fn(input_tensor):
min_age = 1
max_age = 100
return (input_tensor - min_age) / (max_age - min_age)
tf.feature_column.numeric_column('age', normalizer_fn=_scale_age_to_0_1_fn)
It's common that we will refit the model using the latest data every day. The statistic value varies in the data of different days. It's impractical to update these values in the code every day for the daily job.
With TF Transform, we only use just one Api tft.scale_to_0_1. TF Transform will analyze the whole dataset at first, calculate min and max value. After getting the analysis result, it then transforms the data.
import tensorflow_transform as tft
outputs['age'] = tft.scale_to_0_1(inputs['age'])
From feature column Api doc, except cross_column, all the other columns execute the transform on only one column of data. We can't implement inter columns calculation using Feature Column Api just as follows:
column_new = column_a * column_b
Inside preprocess_fn function, user can write any transform logic inside this function. Operations between two or more columns are naturally supported if it can be traced to TF Graph.
TF Transform is the preprocess stage before the training stage. The data will be preprocessed once and stored in a temporary storage (such as a new Odps table). And it will be reused by multiple training epochs and multiple training runs with different hyperparameters.
Feature Column is defined together with the DNN model. Its logic is executed inside the model training process. On each epoch, each hyperparameter combination and each minibatch, it will be executed.
The entire TF Transform process is a data processing pipeline. TF Transform is integrated closely with Apache Beam and use it to describe the pipeline as a DAG. Apache Beam pipeline is runtime engine neutral and can be translated to the execution plan of different data process engines (such as DataFlow, Flink, Spark and so on).
- We need a data process engine maturely supporting Apache Beam.
- We can run python scripts to process the data in parallel on this engine.
Apache Beam and Flink Walkthrough Issues
Let's check the typical SQL expression for model training as follows. SELECT * From iris.train means retrieving data from the data source. It's mapped to SQL query in database or Odps SQL in Odps table. COLUMN sepal_length, sepal_width is mapped to a feature_column array.
SELECT *
FROM iris.train
TO TRAIN DNNClassifier
WITH model.n_classes = 3, model.hidden_units = [10, 20]
COLUMN sepal_length, sepal_width, petal_length, petal_width
LABEL class
INTO sqlflow_models.my_dnn_model;
TF Transform process is a standalone stage between retrieving data and training model with feature columns. We need extend the SQLFlow syntax to fully express the TF Transform logic. The key of the transform process is a user defined function preprocess_fn. It's very flexible. We can define it in a python file and refer it using the TRANSFORM keyword in the SQL expression. Just as the expression below, iris_transformer is the name of the python file and preprocess is the user defined function.
SELECT *
FROM iris.train
TO TRAIN DNNClassifier
WITH model.n_classes = 3, model.hidden_units = [10, 20]
TRANSFORM iris_transformer.preprocess
COLUMN sepal_length, sepal_width, petal_length, petal_width
LABEL class
INTO sqlflow_models.my_dnn_model;
Q: Where is this python file defining preprocess function stored? Git repo?
It's not user friendly to write the python file of preprocess
From official tutorial example, TF Transform is integrated with Estimator. After training, we are calling estimator.export_saved_model(exported_model_dir, serving_input_fn) to export the trained model to SavedModel. Estimator will invoke serving_input_fn to reconstruct the transform graph and combine it with the model graph to build the entire inference graph. Please check the code snippet.
For TF2.0, we defines the model using keras and exports the model by tf.saved.saved_model. The inference graph only contains the feature column and DNN definition. tf.saved.saved_model Api doesn't have serving_input_fn parameter to combine the transform logic in the entire inference graph.
Please check the filed issue in TF Transform.
- If inter column calculation is an uncommon scenario in data preprocess, can we analyze the full data set using SQL. And then we can pass the analysis result into the feature column to finish the transform process?
- The output columns from TF Transform are defined in the python code. User should analyze the transform code and make sure that the column names in COLUMN expression of SQLFLow is mapped with the output of preprocess_fn in TF Transform. Is there better way to check just like program static analysis?