Applying TensorFlow Pipeline to Spark DataFrame (Distributed Data Processing)¶
Overview¶
In this tutorial, we will apply TensorFlow pipeline to distributed Big Data:
Use Spark Dataframe for distributed data processing.
Optionally create Orca TF Dataset (i.e., distributed TensorFlow Dataset) from Spark Dataframe.
Run distributed TensorFlow training on distributed data (Spark Dataframe or Orca TF Dataset).
This tutorial is based on the Tensorflow Recommenders example basic ranking.
Key Concepts¶
Orca TF Dataset¶
An Orca TF Dataset is a distributed Tensorflow tf.data.Dataset.
Each element of the Orca TF Dataset is a tf.data.Dataset.
An Orca TF Dataset has a collection of elements partitioned across the cluster nodes that can be operated on in parallel.
An Orca TF Dataset can be created from an Orca xShards, a Friesian FeatureTable or a Spark DataFrame.
After the Orca TF Dataset is created, map functions can be applied to the Dataset in parallel.
The Orca TF estimator can do model training, validation, and inference using the created Orca TF Dataset.
Orca Estimator¶
Orca Estimators encapsulate distributed model training, evaluation and inference.
To perform distributed training and inference, the user can first create an Orca Estimator from any standard (single-node) TensorFlow, Kera or PyTorch model, and then call Estimator.fit or Estimator.predict methods (using the data-parallel processing pipeline as input).
Under the hood, the Orca Estimator will replicate the model on each node in the cluster, feed the data partition (generated by the data-parallel processing pipeline) on each node to the local model replica, and synchronize model parameters using various backend technologies (such as Horovod, tf.distribute.MirroredStrategy, torch.distributed, or the parameter sync layer in BigDL).
init_orca_context¶
First, init an orca context and get the spark session using:
from bigdl.orca import init_orca_context, OrcaContext
# Init an orca context
init_orca_context("local", cores=4, memory="4g", init_ray_on_spark=True)
spark = OrcaContext.get_spark_session()
Create TensorFlow Model¶
Here we build a customized keras model which defines a dict mapping input names to the corresponding tensors as input.
import tensorflow as tf
class SampleRankingModel(tf.keras.Model):
def __init__(self, unique_user_ids, unique_movie_titles):
super().__init__()
embedding_dim = 32
self.user_embedding = tf.keras.Sequential([
tf.keras.layers.StringLookup(vocabulary=unique_user_ids, mask_token=None),
tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dim)])
self.movie_embedding = tf.keras.Sequential([
tf.keras.layers.StringLookup(vocabulary=unique_movie_titles, mask_token=None),
tf.keras.layers.Embedding(len(unique_movie_titles) + 1, embedding_dim)])
self.ratings = tf.keras.Sequential([
# Learn multiple dense layers.
tf.keras.layers.Dense(256, activation="relu"),
tf.keras.layers.Dense(64, activation="relu"),
# Make rating predictions in the final layer.
tf.keras.layers.Dense(1)
])
def call(self, features):
embeddings = tf.concat([self.user_embedding(features["user_id"]),
self.movie_embedding(features["movie_title"]),
tf.reshape(features["timestamp"], (-1, 1))], axis=1)
return self.ratings(embeddings)
def train_step(self, data):
y = data["user_rating"]
with tf.GradientTape() as tape:
y_pred = self(data, training=True)
loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
self.compiled_metrics.update_state(y, y_pred)
return {m.name: m.result() for m in self.metrics}
def test_step(self, data):
y = data["user_rating"]
y_pred = self(data, training=False)
self.compiled_loss(y, y_pred, regularization_losses=self.losses)
self.compiled_metrics.update_state(y, y_pred)
return {m.name: m.result() for m in self.metrics}
Using Spark DataFrame for Distributed Data Processing¶
import math
from pyspark.sql.functions import col, mean, stddev
# Read the input csv files
df = spark.read.options(header=True, inferSchema=True, delimiter=":").csv("/path/to/input_file")
df = df.withColumn("rating", col("rating").cast("float"))
df = df.withColumn("userid", col("userid").cast("string"))
df.show(5, False)
# +-------+------+------+---------+--------------------------------------+----------------------------+
# |movieid|userid|rating|timestamp|title |genres |
# +-------+------+------+---------+--------------------------------------+----------------------------+
# |1193 |1 |5.0 |978300760|One Flew Over the Cuckoo's Nest (1975)|Drama |
# |661 |1 |3.0 |978302109|James and the Giant Peach (1996) |Animation|Children's|Musical|
# |914 |1 |3.0 |978301968|My Fair Lady (1964) |Musical|Romance |
# |3408 |1 |4.0 |978300275|Erin Brockovich (2000) |Drama |
# |2355 |1 |5.0 |978824291|Bug's Life, A (1998) |Animation|Children's|Comedy |
# +-------+------+------+---------+--------------------------------------+----------------------------+
# Generate vocabularies for the StringLookup layers
user_id_vocab = df.select("userid").distinct().rdd.map(lambda row: row["userid"]).collect()
movie_title_vocab = df.select("title").distinct().rdd.map(lambda row: row["title"]).collect()
# Calculate mean and standard deviation for normalization
df_stats = df.select(
mean(col('timestamp')).alias('mean'),
stddev(col('timestamp')).alias('std')
).collect()
mean = df_stats[0]['mean']
stddev = df_stats[0]['std']
train_count = df.count()
steps = math.ceil(train_count / 8192)
print("train size: ", train_count, ", steps: ", steps)
Note: If the model input is the default type (inputs, targets), we can directly train on the Spark DataFrame using estimator. You can follow the ncf_train example.
But in this tutorial, the type of the model input is a dict mapping input names to the corresponding tensors, so we will create an Orca TF Dataset as a bridge between the Spark DataFrame and the Orca Estimator.
Create Orca TF Dataset from Spark DataFrame¶
from bigdl.orca.data.tf.data import Dataset
ds = Dataset.from_spark_df(df)
Preprocess Orca TF Dataset using Map Function¶
Once the Orca TF Dataset is created, we can perform some data preprocessing using the map function. Since the model use input[“movie_title”], input[“user_id”] and input[“user_rating”] in the model call, train_step and test_step function, we should change the key name of the Dataset. Also, we normalize the continuous feature timestamp here.
def preprocess(x):
return {
"movie_title": x["title"],
"user_id": x["userid"],
"user_rating": x["rating"],
# Normalize continuous timestamp
"timestamp": (tf.cast(x["timestamp"], tf.float32) - mean) / stddev
}
# Preprocess the ds using map function
ds = ds.map(preprocess)
# List 5 elements in ds
# {'movie_title': b"One Flew Over the Cuckoo's Nest (1975)", 'user_id': b'1', 'user_rating': 5.0, 'timestamp': 0.49397522}
# {'movie_title': b'James and the Giant Peach (1996)', 'user_id': b'1', 'user_rating': 3.0, 'timestamp': 0.4940853}
# {'movie_title': b'My Fair Lady (1964)', 'user_id': b'1', 'user_rating': 3.0, 'timestamp': 0.49407482}
# {'movie_title': b'Erin Brockovich (2000)', 'user_id': b'1', 'user_rating': 4.0, 'timestamp': 0.4939385}
# {'movie_title': b"Bug's Life, A (1998)", 'user_id': b'1', 'user_rating': 5.0, 'timestamp': 0.5368723}
Using Orca Estimator to Train The TensorFlow Model Distributedly¶
from bigdl.orca.learn.tf2.estimator import Estimator
# First, create the model_creator function using the SampleRankingModel defined in # Create TensorFlow model
def model_creator(config):
model = SampleRankingModel(unique_user_ids=user_id_vocab,
unique_movie_titles=movie_title_vocab)
model.compile(loss=tf.keras.losses.MeanSquaredError(),
metrics=[tf.keras.metrics.RootMeanSquaredError()],
optimizer=tf.keras.optimizers.Adagrad(config["lr"]))
return model
config = {
"lr": 0.1
}
# Create the orca tf estimator
est = Estimator.from_keras(model_creator=model_creator,
verbose=True,
config=config, backend="ray")
# Train the model using Orca TF Dataset.
est.fit(ds, 3, batch_size=8192, steps_per_epoch=steps)