Friday, February 5, 2021

How to generate TPC-DS data and run TPC-DS performance benchmark for Spark

Goal:

This article explains how to use databricks/spark-sql-perf and databricks/tpcds-kit to generate TPC-DS data for Spark and run TPC-DS performance benchmark.

Env:

Spark 2.4.4 with Scala 2.11.12

MapR 6.1

Solution:

1. Download and build the databricks/tpcds-kit from github.

sudo yum install gcc make flex bison byacc git
cd /tmp/
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=LINUX

  • Note: This should be installed on all cluster nodes with the same location.

Here we downloaded it at "/tmp/tpcds-kit" on ALL cluster nodes.

2. Download and build the databricks/spark-sql-perf from github.

git clone https://github.com/databricks/spark-sql-perf
cd spark-sql-perf

  • Note: Make sure your Spark version and Scala version match this version of spark-sql-perf.

Here I am using Spark 2.4.4 with Scala 2.11.12. So I have to checkout an older branch:

git checkout remotes/origin/newversion

Now the build.sbt contains below entries which should be compatible with my env:

scalaVersion := "2.11.8"
sparkVersion := "2.3.0"

Then build:

sbt +package

  • Note:  If you checkout a much older branch of spark-sql-perf say "remotes/origin/branch-0.4" which is based on spark 2.0.1, then you may hit below error when running the TPC-DS benchmark in step 6. This is because starting from Spark 2.2 there is no such method getExecutorStorageStatus in class org.apache.spark.SparkContext.
java.lang.NoSuchMethodError: org.apache.spark.SparkContext.getExecutorStorageStatus()[Lorg/apache/spark/storage/StorageStatus;
at com.databricks.spark.sql.perf.Benchmarkable$class.afterBenchmark(Benchmarkable.scala:63)

3. create gendata.scala

import com.databricks.spark.sql.perf.tpcds.TPCDSTables

// Note: Declare "sqlContext" for Spark 2.x version
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Set:
// Note: Here my env is using MapRFS, so I changed it to "hdfs:///tpcds".
// Note: If you are using HDFS, the format should be like "hdfs://namenode:9000/tpcds"
val rootDir = "hdfs:///tpcds" // root directory of location to create data in.

val databaseName = "tpcds" // name of database to create.
val scaleFactor = "10" // scaleFactor defines the size of the dataset to generate (in GB).
val format = "parquet" // valid spark format like parquet "parquet".
// Run:
val tables = new TPCDSTables(sqlContext,
dsdgenDir = "/tmp/tpcds-kit/tools", // location of dsdgen
scaleFactor = scaleFactor,
useDoubleForDecimal = false, // true to replace DecimalType with DoubleType
useStringForDate = false) // true to replace DateType with StringType


tables.genData(
location = rootDir,
format = format,
overwrite = true, // overwrite the data that is already there
partitionTables = true, // create the partitioned fact tables
clusterByPartitionColumns = true, // shuffle to get partitions coalesced into single files.
filterOutNullPartitionValues = false, // true to filter out the partition with NULL key value
tableFilter = "", // "" means generate all tables
numPartitions = 20) // how many dsdgen partitions to run - number of input tasks.

// Create the specified database
sql(s"create database $databaseName")
// Create metastore tables in a specified database for your data.
// Once tables are created, the current database will be switched to the specified database.
tables.createExternalTables(rootDir, "parquet", databaseName, overwrite = true, discoverPartitions = true)
// Or, if you want to create temporary tables
// tables.createTemporaryTables(location, format)

// For CBO only, gather statistics on all columns:
tables.analyzeTables(databaseName, analyzeColumns = true)  

4. Run the gendata.scala using spark-shell

spark-shell --jars ~/hao/spark-sql-perf/target/scala-2.11/spark-sql-perf_2.11-0.5.0-SNAPSHOT.jar \
--master yarn \
--deploy-mode client \
--executor-memory 4G \
--num-executors 4 \
--executor-cores 2 \
-i ~/hao/gendata.scala
  • Note: Tune --executor-memory , --num-executors and --executor-cores to make sure no OOM happens.
  • Note: If we just need to generate 10G data, reduce "numPartitions" in above gendata.scala say 20 to reduce the overhead of too many tasks. 

5. Confirm the data files and Hive tables are created.

It should create 24 tables.

Check Data:

# hadoop fs -du -s -h /tpcds/*
11.8 K /tpcds/call_center
695.0 K /tpcds/catalog_page
133.8 M /tpcds/catalog_returns
1.1 G /tpcds/catalog_sales
25.4 M /tpcds/customer
4.7 M /tpcds/customer_address
7.5 M /tpcds/customer_demographics
1.8 M /tpcds/date_dim
30.1 K /tpcds/household_demographics
1.1 K /tpcds/income_band
467.9 M /tpcds/inventory
9.4 M /tpcds/item
30.7 K /tpcds/promotion
1.8 K /tpcds/reason
2.3 K /tpcds/ship_mode
18.3 K /tpcds/store
190.4 M /tpcds/store_returns
1.4 G /tpcds/store_sales
1.1 M /tpcds/time_dim
4.3 K /tpcds/warehouse
7.7 K /tpcds/web_page
69.7 M /tpcds/web_returns
516.3 M /tpcds/web_sales
13.1 K /tpcds/web_site

Check Hive tables in hive CLI(or spark-sql):

hive> use tpcds;
OK
Time taken: 0.011 seconds

hive> show tables;
OK
call_center
catalog_page
catalog_returns
catalog_sales
customer
customer_address
customer_demographics
date_dim
household_demographics
income_band
inventory
item
promotion
reason
ship_mode
store
store_returns
store_sales
time_dim
warehouse
web_page
web_returns
web_sales
web_site
Time taken: 0.012 seconds, Fetched: 24 row(s)

6. Run TPC-DS benchmark

After the tables are created, we can run the 99 TPC-DS queries which are located under folder "./src/main/resources/tpcds_2_4/".

Create runtpcds.scala:

import com.databricks.spark.sql.perf.tpcds.TPCDS

// Note: Declare "sqlContext" for Spark 2.x version
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val tpcds = new TPCDS (sqlContext = sqlContext)
// Set:
val databaseName = "tpcds" // name of database with TPCDS data.
sql(s"use $databaseName")
val resultLocation = "/tmp/tpcds_results" // place to write results
val iterations = 1 // how many iterations of queries to run.
val queries = tpcds.tpcds2_4Queries // queries to run.
val timeout = 24*60*60 // timeout, in seconds.
// Run:
val experiment = tpcds.runExperiment(
queries,
iterations = iterations,
resultLocation = resultLocation,
forkThread = true)
experiment.waitForFinish(timeout)

Run runtpcds.scala using spark-shell:

spark-shell --jars ~/hao/spark-sql-perf/target/scala-2.11/spark-sql-perf_2.11-0.5.0-SNAPSHOT.jar \
--driver-class-path /home/mapr/.ivy2/cache/com.typesafe.scala-logging/scala-logging-slf4j_2.10/jars/scala-logging-slf4j_2.10-2.1.2.jar:/home/mapr/.ivy2/cache/com.typesafe.scala-logging/scala-logging-api_2.11/jars/scala-logging-api_2.11-2.1.2.jar \
--master yarn \
--deploy-mode client \
--executor-memory 2G \
--driver-memory 4G \
--num-executors 4 \
-i ~/hao/runtpcds.scala
  • Note: We need to include scala-logging-slf4j and also scala-logging-api jars otherwise error java.lang.ClassNotFoundException will show up for related classes. Good thing is that you can find those jars in ivy cache directories when building the spark-sql-perf using "sbt +package".
  • Note:  "sql(s"use $databaseName")" should be put before declaring "val queries = tpcds.tpcds2_4Queries". Otherwise you will not see the explain plan for each query because it could not find the table in default database. So the Doc on github should be corrected.
  • Note: We need to increase the --driver-memory to large enough because broadcast join needs much memory. Otherwise you may hit below error when running q10.sql:
failure in runBenchmark: java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. 
As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value

7. Run customized query benchmark

If we do not want to run the whole TPC-DS benchmark, and only want to test the benchmark result for certain customized query(eg. subset of TPC-DS queries, or just some other ad-hoc queries),  we can do so. But before that, we need to understand the source code on this project firstly.

Code Analysis

Previous example uses "tpcds.tpcds2_4Queries" which is Seq[com.databricks.spark.sql.perf.Query].
"tpcds" is the object for class "TPCDS".
Because class "TPCDS" extends trait "Tpcds_2_4_Queries", so "tpcds2_4Queries" is actually the member of trait "Tpcds_2_4_Queries":
  val tpcds2_4Queries = queryNames.map { queryName =>
val queryContent: String = IOUtils.toString(
getClass().getClassLoader().getResourceAsStream(s"tpcds_2_4/$queryName.sql"))
Query(queryName + "-v2.4", queryContent, description = "TPCDS 2.4 Query",
executionMode = CollectResults)
}
Initially, I mistook the "Query" for "com.databricks.spark.sql.perf.Query". So I failed to define the object for Query many times.
Later on I found that trait "Tpcds_2_4_Queries" actually extends abstract class "Benchmark" which contains the Factory object for benchmark queries as below:
  /** Factory object for benchmark queries. */
case object Query {
def apply(
name: String,
sqlText: String,
description: String,
executionMode: ExecutionMode = ExecutionMode.ForeachResults): Query = {
new Query(name, sqlContext.sql(sqlText), description, Some(sqlText), executionMode)
}

def apply(
name: String,
dataFrameBuilder: => DataFrame,
description: String): Query = {
new Query(name, dataFrameBuilder, description, None, ExecutionMode.CollectResults)
}
}
After understanding the logic, if we want to customize the query, we just need to create a class extends abstract class "Benchmark".

Example of customized query

import com.databricks.spark.sql.perf.{Benchmark, ExecutionMode}

// Customized a query
class customized_query extends Benchmark {
import ExecutionMode._
private val sqlText = "select * from customer limit 10"
val q1 = Seq(Query(name = "my customized query", sqlText = sqlText, description = "check some customer info", executionMode = CollectResults))
}
val queries = new customized_query().q1
Everything else is the same as previous example in step 6.

8. View Benchmark results

a. If experiment is still running, use "experiment.getCurrentResults".

experiment.getCurrentResults.createOrReplaceTempView("result") 
spark.sql("select substring(name,1,100) as Name, bround((parsingTime+analysisTime+optimizationTime+planningTime+executionTime)/1000.0,1) as Runtime_sec from result").show()

Sample Output:

+---------+-----------+
| Name|Runtime_sec|
+---------+-----------+
| q1-v2.4| 21.1|
| q2-v2.4| 13.2|
| q3-v2.4| 6.0|
| q4-v2.4| 135.1|
| q5-v2.4| 38.9|
| q6-v2.4| 43.4|
| q7-v2.4| 10.6|
| q8-v2.4| 9.9|
| q9-v2.4| 51.7|
| q10-v2.4| 25.8|
| q11-v2.4| 92.3|
| q12-v2.4| 6.8|
| q13-v2.4| 12.5|
|q14a-v2.4| 130.7|
|q14b-v2.4| 91.3|
| q15-v2.4| 8.8|
| q16-v2.4| 30.8|
| q17-v2.4| 46.6|
| q18-v2.4| 14.2|
| q19-v2.4| 7.9|
+---------+-----------+
only showing top 20 rows

b. If experiment has ended, read the result json file.

  • Note: since the json file contains nested columns, we need to flatten the data using "explode" function.
import org.apache.spark.sql.functions._
val result = spark.read.json(resultLocation).filter("timestamp = 1612560709933").select(explode($"results").as("r"))
result.createOrReplaceTempView("result")
spark.sql("select substring(r.name,1,100) as Name, bround((r.parsingTime+r.analysisTime+r.optimizationTime+r.planningTime+r.executionTime)/1000.0,1) as Runtime_sec from result").show()

Sample Output:

+---------+-----------+
| Name|Runtime_sec|
+---------+-----------+
| q1-v2.4| 21.1|
| q2-v2.4| 13.2|
| q3-v2.4| 6.0|
| q4-v2.4| 135.1|
| q5-v2.4| 38.9|
| q6-v2.4| 43.4|
| q7-v2.4| 10.6|
| q8-v2.4| 9.9|
| q9-v2.4| 51.7|
| q10-v2.4| 25.8|
| q11-v2.4| 92.3|
| q12-v2.4| 6.8|
| q13-v2.4| 12.5|
|q14a-v2.4| 130.7|
|q14b-v2.4| 91.3|
| q15-v2.4| 8.8|
| q16-v2.4| 30.8|
| q17-v2.4| 46.6|
| q18-v2.4| 14.2|
| q19-v2.4| 7.9|
+---------+-----------+
only showing top 20 rows


No comments:

Post a Comment

Popular Posts