Wednesday, February 24, 2021

Spark on GPU -- Hands on GCP Dataproc to test Spark on GPU using RAPIDS

Goal:

This article shares a quick hands-on experience to test Spark on GPU using RAPIDS in GCP Dataproc.

We will provide step-by-step instructions on how to create a single node Dataproc cluster to test Spark on GPU.

Env:

GCP Dataproc

Ubuntu 18.04.5

NVIDIA Tesla T4 GPU

CUDA 11.2

Spark 3.0.1

rapids-4-spark_2.12-0.3.0.jar

Solution:

Starting from Spark 3, we can run Spark on GPU.

The RAPIDS Accelerator for Apache Spark leverages GPUs to accelerate processing via the RAPIDS libraries.

To get hands-on experience, we can firstly try it out on a single node Dataproc cluster.

1. Open a google cloud account.

Currently GCP provides $300 free trial credit for 90 days.

To use GPU, we have to upgrade "free trial" account to a paid Cloud Billing account.

https://cloud.google.com/free/docs/gcp-free-tier#how-to-upgrade

Note: As per today's information in above link:
Any remaining, unexpired Free Trial credits remain in your Cloud Billing account.

2. Create a GCP project, install GCP SDK on Mac and then login.

gcloud auth login
gcloud config set project sodium-atrium-305323

3. Enable below services in GCP -- Compute, Dataproc and Storage.

gcloud services enable compute.googleapis.com
gcloud services enable dataproc.googleapis.com
gcloud services enable storage-api.googleapis.com

4. Increase the GPU quota for all regions

Go to "IAM & Admin" => "Quotas", Filter "Limit Name: GPUs(all regions)".
Click "EDIT QUOTAS" button to request a quota increase from current 0 to 1 or higher. 

You will receive an email saying that the request is approved(or rejected).
After that, it may take up to 15mins to take effect.
Otherwise you may meet below error when creating Dataproc cluster:

failed: Quota 'GPUS_ALL_REGIONS' exceeded.  Limit: 0.0 globally

5. In GCP, create a bucket in "Storage"

For example, here my bucket name is "mytestbucket123456".

And then upload a sample TPC-DS parquet file for "customer" table under directory "customer" in this bucket.

Please refer to this post: How to generate TPC-DS data and run TPC-DS performance benchmark for Spark.

6. Create a single-node Dataproc cluster with 1 GPU attached.

Note: the supported Dataproc image version is listed here:
https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions

But here we are using an image "2.0.0-RC22-ubuntu18" instead.

export IMAGE=2.0.0-RC22-ubuntu18
export REGION=us-west1
export GCS_BUCKET=mytestbucket123456
export CLUSTER_NAME=rapidscluster
export NUM_GPUS=1

gcloud dataproc clusters create $CLUSTER_NAME \
--region $REGION \
--image-version=$IMAGE \
--master-machine-type n1-standard-4 \
--single-node \
--master-accelerator type=nvidia-tesla-t4,count=$NUM_GPUS \
--worker-accelerator type=nvidia-tesla-t4,count=$NUM_GPUS \
--num-master-local-ssds 1 \
--initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/gpu/install_gpu_driver.sh,gs://goog-dataproc-initialization-actions-${REGION}/rapids/rapids.sh \
--optional-components=JUPYTER,ZEPPELIN \
--metadata gpu-driver-provider="NVIDIA" \
--metadata rapids-runtime=SPARK \
--bucket $GCS_BUCKET \
--enable-component-gateway \
--properties="spark:spark.yarn.unmanagedAM.enabled=false,spark:spark.task.resource.gpu.amount=1,spark:spark.executor.cores=1,spark:spark.task.cpus=1,spark:spark.executor.memory=4G"

This make take 10mins to complete.

7. Double confirm the NVIDIA GPU is attached successfully.

gcloud compute ssh rapidscluster-m

# sudo su - root

# nvidia-smi
Wed Feb 24 21:59:54 2021
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.32.03 Driver Version: 460.32.03 CUDA Version: 11.2 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|===============================+======================+======================|
| 0 Tesla T4 Off | 00000000:00:05.0 Off | 0 |
| N/A 76C P0 34W / 70W | 0MiB / 15109MiB | 0% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=============================================================================|
| No running processes found |
+-----------------------------------------------------------------------------+

8. Double confirm needed packages are installed and yarn/Spark related parameters are set correctly.

# apt list |grep -i spark-core
spark-core/stable,stable,now 3.0.1-1 all [installed]

# cat /etc/spark/conf/spark-defaults.conf|grep -i gpu
spark.task.resource.gpu.amount=1
spark.rapids.sql.concurrentGpuTasks=2
spark.executor.resource.gpu.amount=1
spark.task.resource.gpu.amount=0.5
spark.executor.resource.gpu.discoveryScript=/usr/lib/spark/scripts/gpu/getGpusResources.sh

# cat /etc/spark/conf/spark-defaults.conf|grep -i rapids
spark.yarn.historyServer.address=rapidscluster-m:18080
###### BEGIN : RAPIDS properties for Spark 3.0 ######
spark.rapids.sql.concurrentGpuTasks=2
spark.rapids.memory.pinnedPool.size=2G
spark.executor.extraJavaOptions='-Dai.rapids.cudf.prefer-pinned=true'
###### END : RAPIDS properties for Spark 3.0 ######

9. Make sure resource-types.xml is created with correct content.

As of today, there is one bug in one of the initialization scripts install_gpu_driver.sh.

In function configure_yarn():

set_hadoop_property 'yarn-site.xml' 'yarn.resource-types' 'yarn.io/gpu'

This is wrong because the parameter yarn.resource-types should be set in resource-types.xml. 

So above code should corrected to:

set_hadoop_property 'resource-types.xml' 'yarn.resource-types' 'yarn.io/gpu'

So here to quickly fix this, we should manually create resource-types.xml under /etc/hadoop/conf with below content:

<configuration>
<property>
<name>yarn.resource-types</name>
<value>yarn.io/gpu</value>
</property>
</configuration>

After that, we need to restart both NM and RM manually:

systemctl restart hadoop-yarn-nodemanager.service
systemctl restart hadoop-yarn-resourcemanager.service

Note: Updated on 2021-03-06, I just fixed above issue with this PR

After that, then we should see "yarn.io/gpu: 1" in RM UI. 


Otherwise, we will get below error when submitting spark on yarn jobs:

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException): Invalid resource request! Cannot allocate containers as requested resource is greater than maximum allowed allocation. Requested resource type=[yarn.io/gpu], Requested resource=<memory:6144, vCores:2, yarn.io/gpu: 1>, maximum allowed allocation=<memory:12624, vCores:4>, please note that maximum allowed allocation is calculated by scheduler based on maximum resource of registered NodeManagers, which might be less than configured maximum allocation=<memory:12624, vCores:32000, yarn.io/gpu: 9223372036854775807>

And NM log would show the root cause because:

2021-02-24 17:42:48,263 INFO org.apache.hadoop.conf.Configuration: resource-types.xml not found
2021-02-24 17:42:48,275 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils: Unable to find 'resource-types.xml'.
...
2021-02-24 17:55:42,320 WARN org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl: Got unknown resource type: yarn.io/gpu; skipping

10. Submit a hello-world pyspark job to make sure it works fine.

Create a hello-world.py and upload it on bucket:   

#!/usr/bin/python
import pyspark
sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!'])
words = sorted(rdd.collect())
print(words)

Submit it:

gcloud dataproc jobs submit pyspark \
gs://$GCS_BUCKET/hello-world.py \
--cluster=$CLUSTER_NAME \
--region=$REGION

11. In Hive CLI, create this external table "customer".

CREATE EXTERNAL TABLE customer
(
c_customer_sk integer ,
c_customer_id char(16) ,
c_current_cdemo_sk integer ,
c_current_hdemo_sk integer ,
c_current_addr_sk integer ,
c_first_shipto_date_sk integer ,
c_first_sales_date_sk integer ,
c_salutation char(10) ,
c_first_name char(20) ,
c_last_name char(30) ,
c_preferred_cust_flag char(1) ,
c_birth_day integer ,
c_birth_month integer ,
c_birth_year integer ,
c_birth_country varchar(20) ,
c_login char(13) ,
c_email_address char(50) ,
c_last_review_date char(10)
) STORED AS PARQUET
LOCATION 'gs://mytestbucket123456/customer'
;

12. In spark-sql, show the explain plan for different queries. 

Filters:

spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS;

spark-sql> explain cost select c_customer_sk from customer where c_customer_sk < 500;
== Optimized Logical Plan ==
Project [c_customer_sk#170], Statistics(sizeInBytes=5.7 MiB, rowCount=5.00E+5)
+- Filter (isnotnull(c_customer_sk#170) AND (c_customer_sk#170 < 500)), Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)
+- Relation[c_customer_sk#170,c_customer_id#171,c_current_cdemo_sk#172,c_current_hdemo_sk#173,c_current_addr_sk#174,c_first_shipto_date_sk#175,c_first_sales_date_sk#176,c_salutation#177,c_first_name#178,c_last_name#179,c_preferred_cust_flag#180,c_birth_day#181,c_birth_month#182,c_birth_year#183,c_birth_country#184,c_login#185,c_email_address#186,c_last_review_date#187] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)

== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [c_customer_sk#170]
+- GpuCoalesceBatches TargetSize(2147483647)
+- GpuFilter (gpuisnotnull(c_customer_sk#170) AND (c_customer_sk#170 < 500))
+- GpuFileGpuScan parquet default.customer[c_customer_sk#170] Batched: true, DataFilters: [isnotnull(c_customer_sk#170), (c_customer_sk#170 < 500)], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk), LessThan(c_customer_sk,500)], ReadSchema: struct<c_customer_sk:int>

Note: "GpuFileGpuScan", "GpuFilter" , "GpuCoalesceBatches" and "GpuProject" are the key words which show the differences.

Sort:

spark-sql> explain cost select a.* from customer a order by c_customer_sk;
== Optimized Logical Plan ==
Sort [c_customer_sk#790 ASC NULLS FIRST], true, Statistics(sizeInBytes=106.8 MiB)
+- Relation[c_customer_sk#790,c_customer_id#791,c_current_cdemo_sk#792,c_current_hdemo_sk#793,c_current_addr_sk#794,c_first_shipto_date_sk#795,c_first_sales_date_sk#796,c_salutation#797,c_first_name#798,c_last_name#799,c_preferred_cust_flag#800,c_birth_day#801,c_birth_month#802,c_birth_year#803,c_birth_country#804,c_login#805,c_email_address#806,c_last_review_date#807] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)

== Physical Plan ==
GpuColumnarToRow false
+- GpuSort [c_customer_sk#790 ASC NULLS FIRST], true, RequireSingleBatch, 0
+- GpuCoalesceBatches RequireSingleBatch
+- GpuShuffleCoalesce 2147483647
+- GpuColumnarExchange gpurangepartitioning(c_customer_sk#790 ASC NULLS FIRST, 48, StructField(c_customer_sk,IntegerType,true)), true, [id=#770]
+- GpuFileGpuScan parquet default.customer[c_customer_sk#790,c_customer_id#791,c_current_cdemo_sk#792,c_current_hdemo_sk#793,c_current_addr_sk#794,c_first_shipto_date_sk#795,c_first_sales_date_sk#796,c_salutation#797,c_first_name#798,c_last_name#799,c_preferred_cust_flag#800,c_birth_day#801,c_birth_month#802,c_birth_year#803,c_birth_country#804,c_login#805,c_email_address#806,c_last_review_date#807] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_cdemo_sk:int,c_current_hdemo_sk:int,c_cur...
Note: "GpuFileGpuScan", "GpuColumnarExchange" , "GpuShuffleCoalesce" , "GpuCoalesceBatches" and "GpuSort" are the key words which show the differences.

Group-by:

spark-sql> explain cost select c_customer_sk,count(*) from customer group by c_customer_sk;
== Optimized Logical Plan ==
Aggregate [c_customer_sk#246], [c_customer_sk#246, count(1) AS count(1)#264L], Statistics(sizeInBytes=9.5 MiB)
+- Project [c_customer_sk#246], Statistics(sizeInBytes=5.7 MiB, rowCount=5.00E+5)
+- Relation[c_customer_sk#246,c_customer_id#247,c_current_cdemo_sk#248,c_current_hdemo_sk#249,c_current_addr_sk#250,c_first_shipto_date_sk#251,c_first_sales_date_sk#252,c_salutation#253,c_first_name#254,c_last_name#255,c_preferred_cust_flag#256,c_birth_day#257,c_birth_month#258,c_birth_year#259,c_birth_country#260,c_login#261,c_email_address#262,c_last_review_date#263] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[c_customer_sk#246], functions=[count(1)], output=[c_customer_sk#246, count(1)#264L])
+- Exchange hashpartitioning(c_customer_sk#246, 48), true, [id=#136]
+- HashAggregate(keys=[c_customer_sk#246], functions=[partial_count(1)], output=[c_customer_sk#246, count#266L])
+- FileScan parquet default.customer[c_customer_sk#246] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c_customer_sk:int>

Above Group-by did not show expected GPU related operators due to AQE is on by default. 

This is due to this FAQ shared:

"When running an explain() on a query where AQE is on, it is possible that AQE has not finalized the plan. In this case a message stating AdaptiveSparkPlan isFinalPlan=false will be printed at the top of the physical plan, and the explain output will show the query plan with CPU operators. As the query runs, the plan on the UI will update and show operations running on the GPU. This can happen for any AdaptiveSparkPlan where isFinalPlan=false."

We can try to disable it and try it again:

spark-sql> set spark.sql.adaptive.enabled=false;
spark.sql.adaptive.enabled false

spark-sql> explain cost select c_customer_sk,count(*) from customer group by c_customer_sk;
== Optimized Logical Plan ==
Aggregate [c_customer_sk#415], [c_customer_sk#415, count(1) AS count(1)#433L], Statistics(sizeInBytes=9.5 MiB)
+- Project [c_customer_sk#415], Statistics(sizeInBytes=5.7 MiB, rowCount=5.00E+5)
+- Relation[c_customer_sk#415,c_customer_id#416,c_current_cdemo_sk#417,c_current_hdemo_sk#418,c_current_addr_sk#419,c_first_shipto_date_sk#420,c_first_sales_date_sk#421,c_salutation#422,c_first_name#423,c_last_name#424,c_preferred_cust_flag#425,c_birth_day#426,c_birth_month#427,c_birth_year#428,c_birth_country#429,c_login#430,c_email_address#431,c_last_review_date#432] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)

== Physical Plan ==
GpuColumnarToRow false
+- GpuHashAggregate(keys=[c_customer_sk#415], functions=[gpucount(1)], output=[c_customer_sk#415, count(1)#433L])
+- GpuShuffleCoalesce 2147483647
+- GpuColumnarExchange gpuhashpartitioning(c_customer_sk#415, 48), true, [id=#300]
+- GpuHashAggregate(keys=[c_customer_sk#415], functions=[partial_gpucount(1)], output=[c_customer_sk#415, count#435L])
+- GpuFileGpuScan parquet default.customer[c_customer_sk#415] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c_customer_sk:int>

Note: "GpuHashAggregate", "GpuColumnarExchange" , "GpuShuffleCoalesce" and "GpuHashAggregate" are the key words which show the differences.

Inner-Join(Broadcast Hash Join):

spark-sql> explain cost select a.* from customer a, customer b where a.c_customer_sk=b.c_current_cdemo_sk;
== Optimized Logical Plan ==
Project [c_customer_sk#373, c_customer_id#374, c_current_cdemo_sk#375, c_current_hdemo_sk#376, c_current_addr_sk#377, c_first_shipto_date_sk#378, c_first_sales_date_sk#379, c_salutation#380, c_first_name#381, c_last_name#382, c_preferred_cust_flag#383, c_birth_day#384, c_birth_month#385, c_birth_year#386, c_birth_country#387, c_login#388, c_email_address#389, c_last_review_date#390], Statistics(sizeInBytes=50.9 TiB, rowCount=2.50E+11)
+- Join Inner, (c_customer_sk#373 = c_current_cdemo_sk#393), Statistics(sizeInBytes=51.8 TiB, rowCount=2.50E+11)
:- Filter isnotnull(c_customer_sk#373), Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)
: +- Relation[c_customer_sk#373,c_customer_id#374,c_current_cdemo_sk#375,c_current_hdemo_sk#376,c_current_addr_sk#377,c_first_shipto_date_sk#378,c_first_sales_date_sk#379,c_salutation#380,c_first_name#381,c_last_name#382,c_preferred_cust_flag#383,c_birth_day#384,c_birth_month#385,c_birth_year#386,c_birth_country#387,c_login#388,c_email_address#389,c_last_review_date#390] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)
+- Project [c_current_cdemo_sk#393], Statistics(sizeInBytes=5.7 MiB, rowCount=5.00E+5)
+- Filter isnotnull(c_current_cdemo_sk#393), Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)
+- Relation[c_customer_sk#391,c_customer_id#392,c_current_cdemo_sk#393,c_current_hdemo_sk#394,c_current_addr_sk#395,c_first_shipto_date_sk#396,c_first_sales_date_sk#397,c_salutation#398,c_first_name#399,c_last_name#400,c_preferred_cust_flag#401,c_birth_day#402,c_birth_month#403,c_birth_year#404,c_birth_country#405,c_login#406,c_email_address#407,c_last_review_date#408] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)

== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [c_customer_sk#373, c_customer_id#374, c_current_cdemo_sk#375, c_current_hdemo_sk#376, c_current_addr_sk#377, c_first_shipto_date_sk#378, c_first_sales_date_sk#379, c_salutation#380, c_first_name#381, c_last_name#382, c_preferred_cust_flag#383, c_birth_day#384, c_birth_month#385, c_birth_year#386, c_birth_country#387, c_login#388, c_email_address#389, c_last_review_date#390]
+- GpuBroadcastHashJoin [c_customer_sk#373], [c_current_cdemo_sk#393], Inner, BuildRight
:- GpuProject [c_customer_sk#373, c_customer_id#374, c_current_cdemo_sk#375, c_current_hdemo_sk#376, c_current_addr_sk#377, c_first_shipto_date_sk#378, c_first_sales_date_sk#379, c_salutation#380, c_first_name#381, c_last_name#382, c_preferred_cust_flag#383, c_birth_day#384, c_birth_month#385, c_birth_year#386, c_birth_country#387, c_login#388, c_email_address#389, c_last_review_date#390]
: +- GpuCoalesceBatches TargetSize(2147483647)
: +- GpuFilter gpuisnotnull(c_customer_sk#373)
: +- GpuFileGpuScan parquet default.customer[c_customer_sk#373,c_customer_id#374,c_current_cdemo_sk#375,c_current_hdemo_sk#376,c_current_addr_sk#377,c_first_shipto_date_sk#378,c_first_sales_date_sk#379,c_salutation#380,c_first_name#381,c_last_name#382,c_preferred_cust_flag#383,c_birth_day#384,c_birth_month#385,c_birth_year#386,c_birth_country#387,c_login#388,c_email_address#389,c_last_review_date#390] Batched: true, DataFilters: [isnotnull(c_customer_sk#373)], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_cdemo_sk:int,c_current_hdemo_sk:int,c_cur...
+- GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#274]
+- GpuProject [c_current_cdemo_sk#393]
+- GpuCoalesceBatches TargetSize(2147483647)
+- GpuFilter gpuisnotnull(c_current_cdemo_sk#393)
+- GpuFileGpuScan parquet default.customer[c_current_cdemo_sk#393] Batched: true, DataFilters: [isnotnull(c_current_cdemo_sk#393)], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_current_cdemo_sk)], ReadSchema: struct<c_current_cdemo_sk:int>

Note: "GpuBroadcastExchange", "GpuBroadcastHashJoin" are the key words which show the differences.

Inner-Join(Shuffle Hash Join):

spark-sql> set spark.sql.autoBroadcastJoinThreshold=1;
spark.sql.autoBroadcastJoinThreshold 1
Time taken: 0.019 seconds, Fetched 1 row(s)

spark-sql> explain cost select a.* from customer a, customer b where a.c_customer_sk=b.c_current_cdemo_sk;
== Optimized Logical Plan ==
Project [c_customer_sk#453, c_customer_id#454, c_current_cdemo_sk#455, c_current_hdemo_sk#456, c_current_addr_sk#457, c_first_shipto_date_sk#458, c_first_sales_date_sk#459, c_salutation#460, c_first_name#461, c_last_name#462, c_preferred_cust_flag#463, c_birth_day#464, c_birth_month#465, c_birth_year#466, c_birth_country#467, c_login#468, c_email_address#469, c_last_review_date#470], Statistics(sizeInBytes=50.9 TiB, rowCount=2.50E+11)
+- Join Inner, (c_customer_sk#453 = c_current_cdemo_sk#473), Statistics(sizeInBytes=51.8 TiB, rowCount=2.50E+11)
:- Filter isnotnull(c_customer_sk#453), Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)
: +- Relation[c_customer_sk#453,c_customer_id#454,c_current_cdemo_sk#455,c_current_hdemo_sk#456,c_current_addr_sk#457,c_first_shipto_date_sk#458,c_first_sales_date_sk#459,c_salutation#460,c_first_name#461,c_last_name#462,c_preferred_cust_flag#463,c_birth_day#464,c_birth_month#465,c_birth_year#466,c_birth_country#467,c_login#468,c_email_address#469,c_last_review_date#470] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)
+- Project [c_current_cdemo_sk#473], Statistics(sizeInBytes=5.7 MiB, rowCount=5.00E+5)
+- Filter isnotnull(c_current_cdemo_sk#473), Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)
+- Relation[c_customer_sk#471,c_customer_id#472,c_current_cdemo_sk#473,c_current_hdemo_sk#474,c_current_addr_sk#475,c_first_shipto_date_sk#476,c_first_sales_date_sk#477,c_salutation#478,c_first_name#479,c_last_name#480,c_preferred_cust_flag#481,c_birth_day#482,c_birth_month#483,c_birth_year#484,c_birth_country#485,c_login#486,c_email_address#487,c_last_review_date#488] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)

== Physical Plan ==
GpuColumnarToRow false
+- GpuProject [c_customer_sk#453, c_customer_id#454, c_current_cdemo_sk#455, c_current_hdemo_sk#456, c_current_addr_sk#457, c_first_shipto_date_sk#458, c_first_sales_date_sk#459, c_salutation#460, c_first_name#461, c_last_name#462, c_preferred_cust_flag#463, c_birth_day#464, c_birth_month#465, c_birth_year#466, c_birth_country#467, c_login#468, c_email_address#469, c_last_review_date#470]
+- GpuShuffledHashJoin [c_customer_sk#453], [c_current_cdemo_sk#473], Inner, BuildRight, false
:- GpuShuffleCoalesce 2147483647
: +- GpuColumnarExchange gpuhashpartitioning(c_customer_sk#453, 48), true, [id=#367]
: +- GpuProject [c_customer_sk#453, c_customer_id#454, c_current_cdemo_sk#455, c_current_hdemo_sk#456, c_current_addr_sk#457, c_first_shipto_date_sk#458, c_first_sales_date_sk#459, c_salutation#460, c_first_name#461, c_last_name#462, c_preferred_cust_flag#463, c_birth_day#464, c_birth_month#465, c_birth_year#466, c_birth_country#467, c_login#468, c_email_address#469, c_last_review_date#470]
: +- GpuCoalesceBatches TargetSize(2147483647)
: +- GpuFilter gpuisnotnull(c_customer_sk#453)
: +- GpuFileGpuScan parquet default.customer[c_customer_sk#453,c_customer_id#454,c_current_cdemo_sk#455,c_current_hdemo_sk#456,c_current_addr_sk#457,c_first_shipto_date_sk#458,c_first_sales_date_sk#459,c_salutation#460,c_first_name#461,c_last_name#462,c_preferred_cust_flag#463,c_birth_day#464,c_birth_month#465,c_birth_year#466,c_birth_country#467,c_login#468,c_email_address#469,c_last_review_date#470] Batched: true, DataFilters: [isnotnull(c_customer_sk#453)], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk)], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_cdemo_sk:int,c_current_hdemo_sk:int,c_cur...
+- GpuCoalesceBatches RequireSingleBatch
+- GpuShuffleCoalesce 2147483647
+- GpuColumnarExchange gpuhashpartitioning(c_current_cdemo_sk#473, 48), true, [id=#370]
+- GpuProject [c_current_cdemo_sk#473]
+- GpuCoalesceBatches TargetSize(2147483647)
+- GpuFilter gpuisnotnull(c_current_cdemo_sk#473)
+- GpuFileGpuScan parquet default.customer[c_current_cdemo_sk#473] Batched: true, DataFilters: [isnotnull(c_current_cdemo_sk#473)], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_current_cdemo_sk)], ReadSchema: struct<c_current_cdemo_sk:int>

Note: "GpuColumnarExchange", "GpuShuffleCoalesce" and "GpuShuffledHashJoin" are the key words which show the differences.

Cartesian Product:

spark-sql> explain cost select a.* from customer a, customer b;
== Optimized Logical Plan ==
Join Inner, Statistics(sizeInBytes=50.9 TiB, rowCount=2.50E+11)
:- Relation[c_customer_sk#698,c_customer_id#699,c_current_cdemo_sk#700,c_current_hdemo_sk#701,c_current_addr_sk#702,c_first_shipto_date_sk#703,c_first_sales_date_sk#704,c_salutation#705,c_first_name#706,c_last_name#707,c_preferred_cust_flag#708,c_birth_day#709,c_birth_month#710,c_birth_year#711,c_birth_country#712,c_login#713,c_email_address#714,c_last_review_date#715] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)
+- Project, Statistics(sizeInBytes=3.8 MiB, rowCount=5.00E+5)
+- Relation[c_customer_sk#716,c_customer_id#717,c_current_cdemo_sk#718,c_current_hdemo_sk#719,c_current_addr_sk#720,c_first_shipto_date_sk#721,c_first_sales_date_sk#722,c_salutation#723,c_first_name#724,c_last_name#725,c_preferred_cust_flag#726,c_birth_day#727,c_birth_month#728,c_birth_year#729,c_birth_country#730,c_login#731,c_email_address#732,c_last_review_date#733] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)

== Physical Plan ==
CartesianProduct
:- GpuColumnarToRow false
: +- GpuFileGpuScan parquet default.customer[c_customer_sk#698,c_customer_id#699,c_current_cdemo_sk#700,c_current_hdemo_sk#701,c_current_addr_sk#702,c_first_shipto_date_sk#703,c_first_sales_date_sk#704,c_salutation#705,c_first_name#706,c_last_name#707,c_preferred_cust_flag#708,c_birth_day#709,c_birth_month#710,c_birth_year#711,c_birth_country#712,c_login#713,c_email_address#714,c_last_review_date#715] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_cdemo_sk:int,c_current_hdemo_sk:int,c_cur...
+- GpuColumnarToRow false
+- GpuFileGpuScan parquet default.customer[] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

Note: "GpuFileGpuScan" and "GpuColumnarToRow false" are the key words which show the differences.

Broadcast Nested Loop Join:

spark-sql> set spark.sql.autoBroadcastJoinThreshold=1000000000;
spark.sql.autoBroadcastJoinThreshold 1000000000

Time taken: 0.018 seconds, Fetched 1 row(s)
spark-sql> explain cost select a.* from customer a, customer b;
== Optimized Logical Plan ==
Join Inner, Statistics(sizeInBytes=50.9 TiB, rowCount=2.50E+11)
:- Relation[c_customer_sk#749,c_customer_id#750,c_current_cdemo_sk#751,c_current_hdemo_sk#752,c_current_addr_sk#753,c_first_shipto_date_sk#754,c_first_sales_date_sk#755,c_salutation#756,c_first_name#757,c_last_name#758,c_preferred_cust_flag#759,c_birth_day#760,c_birth_month#761,c_birth_year#762,c_birth_country#763,c_login#764,c_email_address#765,c_last_review_date#766] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)
+- Project, Statistics(sizeInBytes=3.8 MiB, rowCount=5.00E+5)
+- Relation[c_customer_sk#767,c_customer_id#768,c_current_cdemo_sk#769,c_current_hdemo_sk#770,c_current_addr_sk#771,c_first_shipto_date_sk#772,c_first_sales_date_sk#773,c_salutation#774,c_first_name#775,c_last_name#776,c_preferred_cust_flag#777,c_birth_day#778,c_birth_month#779,c_birth_year#780,c_birth_country#781,c_login#782,c_email_address#783,c_last_review_date#784] parquet, Statistics(sizeInBytes=106.8 MiB, rowCount=5.00E+5)

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Inner
:- GpuColumnarToRow false
: +- GpuFileGpuScan parquet default.customer[c_customer_sk#749,c_customer_id#750,c_current_cdemo_sk#751,c_current_hdemo_sk#752,c_current_addr_sk#753,c_first_shipto_date_sk#754,c_first_sales_date_sk#755,c_salutation#756,c_first_name#757,c_last_name#758,c_preferred_cust_flag#759,c_birth_day#760,c_birth_month#761,c_birth_year#762,c_birth_country#763,c_login#764,c_email_address#765,c_last_review_date#766] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c_customer_sk:int,c_customer_id:string,c_current_cdemo_sk:int,c_current_hdemo_sk:int,c_cur...
+- BroadcastExchange IdentityBroadcastMode, [id=#752]
+- GpuColumnarToRow false
+- GpuFileGpuScan parquet default.customer[] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[gs://mytestbucket123456/customer], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

Note: "GpuFileGpuScan" and "GpuColumnarToRow false" are the key words which show the differences.

13. Delete the dataproc cluster immediately to save $ once we have completed our project.

gcloud dataproc clusters delete $CLUSTER_NAME --region=$REGION

Note: remember to clean the unused storage buckets.

Reference:

https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/rapids
https://nvidia.github.io/spark-rapids/docs/get-started/getting-started-gcp.html
https://cloud.google.com/dataproc/docs/concepts/compute/gpus 

install_gpu_driver.sh

rapids.sh



No comments:

Post a Comment

Popular Posts