Monday, June 1, 2015

Hive transaction feature in Hive 1.0

Env:

Hive 1.0

Goal:

This article introduces the new feature -- Hive transaction based on the behavior of Hive 1.0.
This feature brings all 4 traits of database transactions -- Atomicity,Consistency,Isolation and Durability at row level, so that one application can add rows while another reads from the same partition without interfering with each other. Up until Hive 0.13, ACID is only at partition level.
If your use case requires row level ACID like:
  1. Streaming ingest of data.
  2. Slow changing dimensions.
  3. Data restatement.
you may want to enable this new feature at the cost of periodical data file compaction.
Take “slow changing dimensions” use case for example, users may need to insert or update one or more rows of some dimension tables, you can do that now with this new feature.

Solution:

1. How to enable Hive transactions

The Hive metastore service is required to use this feature, because it needs to spawn a set of compaction related threads.
The minimum required configurations are:
For Client or HiveServer2:
  • If Hive CLI is used, set the below configuration in hive-site.xml on the client nodes.
  • If HiveServer2 is used to accept connections, set the below configuration in hive-site.xml before starting HiveServer2.
  • If you do not want this feature to be enabled for all queries, set the below configuration at the session level.
<property>
  <name>hive.support.concurrency</name>
  <value>true</value>
</property>

<property>
  <name>hive.enforce.bucketing</name>
  <value>true</value>
</property>

<property>
  <name>hive.exec.dynamic.partition.mode</name>
  <value>nonstrict</value>
</property>

<property>
  <name>hive.txn.manager</name>
  <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>

For Hive Metastore:
<property>
  <name>hive.compactor.initiator.on</name>
  <value>true</value>
</property>

<property>
  <name>hive.compactor.worker.threads</name>
  <value>1</value>
</property>
After updating the configuration in the corresponding hive-site.xml, restart the services -- HiveServer2 and Hive Metastore.
More transaction related configurations are here.

2. Only ORC bucketed non-sorted table supports transactions.

2.a Non-ORC table will fail with below error:

create table h1_test_normal(id int, id2 string) 
TBLPROPERTIES ('transactional'='true'); 

insert into table h1_test_normal values(1,'abc');
FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table h1_test_normal that does not use an AcidOutputFormat or is not bucketed

2.b ORC non-bucketed table will fail with below error:

create table h1_test(id int, id2 string) 
stored as orc TBLPROPERTIES ('transactional'='true'); 

insert into table h1_test values(1,'abc');
FAILED: SemanticException [Error 10297]: Attempt to do update or delete on table h1_test that does not use an AcidOutputFormat or is not bucketed

2.c ORC bucketed sorted table will fail with below error:

create table h1_test4(id int, id2 string) 
clustered by (id) sorted by (id2) into 8 buckets 
stored as orc TBLPROPERTIES ('transactional'='true'); 

insert into table h1_test4 values(1,'abc');
FAILED: SemanticException [Error 10298]: ACID insert, update, delete not supported on tables that are sorted, table h1_test4
Note: above 3 different error messages will tell why transaction is not supported.

2.d ORC bucketed non-sorted table is the only supported table in current version.

create table h1_test2(id int, id2 string) 
clustered by (id) into 8 buckets 
stored as orc TBLPROPERTIES ('transactional'='true'); 

insert into table h1_test2 values(1,'abc');
delete from h1_test2 where id=1;

3. Minor and Major Compaction

To support transactions, each transaction of Insert/Update/Delete(aka DML) is stored in delta files. Occasionally these changes need to be merged into the base files.
Both base and delta directory names contain the transaction IDs, just like SCN in Oracle.
drwxr--r--  2 mapr mapr  8 May 30 01:31 base_0000028
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000029_0000029
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000030_0000030
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000031_0000031
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000032_0000032
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000033_0000033
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000034_0000034
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000035_0000035
drwxr-xr-x  2 mapr mapr  8 May 30 01:34 delta_0000036_0000036

3.1 The compactions are done by a set of threads of Hive Metastore.

hive.compactor.worker.threads(default is 0 but value required for transactions: > 0 on at least one instance of the Thrift metastore service) controls the number of compactor worker threads that will be spawned by the Hive Metastore service. Besides worker threads, there is one initiator thread and one cleaner thread.
This can be seen by "jstack <pid of metastore>", here hive.compactor.worker.threads is set to 2.
$ jstack 18220  |grep -i compact
 at org.apache.hadoop.hive.ql.txn.compactor.Cleaner.run(Cleaner.java:141)
 at org.apache.hadoop.hive.ql.txn.compactor.Worker.run(Worker.java:83)
 at org.apache.hadoop.hive.ql.txn.compactor.Worker.run(Worker.java:83)
 at org.apache.hadoop.hive.ql.txn.compactor.Initiator.run(Initiator.java:137)
If compactions are in progress, we can use the "show compactions" command to check. For example, below is one major compaction:
hive> show compactions;
OK
Database Table Partition Type State Worker Start Time
default h1_test2 NULL MAJOR working h3.poc.com-25 1432949449000
And also in hive.log of metastore node, below lines show up:
2015-05-30 01:20:49,158 INFO  [Thread-8]: compactor.Initiator (Initiator.java:requestCompaction(282)) - Requesting MAJOR compaction for default.h1_test2
2015-05-30 01:20:49,512 INFO  [h3.poc.com-29]: compactor.Worker (Worker.java:run(139)) - Starting MAJOR compaction for default.h1_test2

3.2 Minor compaction takes a set of existing delta files and rewrites them to a single delta file per bucket.

By default, if the number of delta directories in a table or partition is larger than 10 (hive.compactor.delta.num.threshold), minor compaction will be triggered.
So when the 11th delta directory is created:
drwxr--r--   - 8 2015-06-01 21:08 /user/hive/warehouse/h1_test2/base_0000059
drwxr--r--   - 8 2015-06-01 21:16 /user/hive/warehouse/h1_test2/delta_0000060_0000070
drwxr-xr-x   - 8 2015-06-01 21:18 /user/hive/warehouse/h1_test2/delta_0000071_0000071
drwxr-xr-x   - 8 2015-06-01 21:18 /user/hive/warehouse/h1_test2/delta_0000072_0000072
drwxr-xr-x   - 8 2015-06-01 21:18 /user/hive/warehouse/h1_test2/delta_0000073_0000073
drwxr-xr-x   - 8 2015-06-01 21:18 /user/hive/warehouse/h1_test2/delta_0000074_0000074
drwxr-xr-x   - 8 2015-06-01 21:18 /user/hive/warehouse/h1_test2/delta_0000075_0000075
drwxr-xr-x   - 8 2015-06-01 21:19 /user/hive/warehouse/h1_test2/delta_0000076_0000076
drwxr-xr-x   - 8 2015-06-01 21:19 /user/hive/warehouse/h1_test2/delta_0000077_0000077
drwxr-xr-x   - 8 2015-06-01 21:19 /user/hive/warehouse/h1_test2/delta_0000078_0000078
drwxr-xr-x   - 8 2015-06-01 21:19 /user/hive/warehouse/h1_test2/delta_0000079_0000079
drwxr-xr-x   - 8 2015-06-01 21:19 /user/hive/warehouse/h1_test2/delta_0000080_0000080
After minor compaction:
drwxr--r--   - 8 2015-06-01 21:08 /user/hive/warehouse/h1_test2/base_0000059
drwxr--r--   - 8 2015-06-01 21:21 /user/hive/warehouse/h1_test2/delta_0000060_0000080
So the base directory is still at transaction ID 59, but the aggregated delta directory contains data from transaction ID 60 to 80.

3.3 Major compaction takes one or more delta files and the base file for the bucket and rewrites them into a new base file per bucket.

By default, if total size of delta data reaches 10%(hive.compactor.delta.pct.threshold) of base data, major compaction will be triggered.
Before major compaction:
drwxr--r--   - 3 2015-06-02 00:26 /user/hive/warehouse/h1_test2/base_0000082
drwxr-xr-x   - 1 2015-06-02 01:43 /user/hive/warehouse/h1_test2/delta_0000083_0000083
drwxr-xr-x   - 1 2015-06-02 01:43 /user/hive/warehouse/h1_test2/delta_0000084_0000084
After major compaction:
drwxr--r--   - 2 2015-06-02 01:45 /user/hive/warehouse/h1_test2/base_0000084
So the base directory name will contain the latest transaction ID -- 84.

3.4 Compaction can be initiated manually.

ALTER TABLE table_name [PARTITION (partition_key = 'partition_value' [, ...])]
  COMPACT 'compaction_type';
If you do not want the automatic compaction to affect the system performance in peak time, here is an option to set "NO_AUTO_COMPACTION" table property, eg:
create table test_nocompact(id int) TBLPROPERTIES("NO_AUTO_COMPACTION"="true");
alter table test_nocompact set TBLPROPERTIES("NO_AUTO_COMPACTION"="true");
With automatic compactions disabled compactions can be triggered manually in quiet time window.
Note, this command just triggers the compaction, and it does not wait for its completion.
Sometimes the compaction is enqueued, and you can always use "show compactions" to check the status.
If auto compactions are not happening or not enabled, the tables could have too many small files so that the performance will be hugely impacted. It will be important to regularly run manual compaction if auto compaction is disabled. 

3.5 After a compaction the system waits until all readers of the old files have finished and then removes the old files.

Take above 3.2 minor compaction for example. It firstly generates "delta_0000060_0000080", and waits for all old queries to finish, then it removs old delta directories.
If cleaning starts, the below log message shows up in the Hive metastore log:
[Thread-12]: compactor.Cleaner (Cleaner.java:clean(178)) - Starting cleaning for default.h1_test2
Assume before compaction starts, there is a huge query running for days, then the old delta directories can not be cleaned until the query finishes or fails.

3.6 Cleaner thread is a separate thread to clean directories after compaction.

By default, every 5 seconds(hive.compactor.cleaner.run.interval) the cleaner thread will check if any directories need to be cleaned.
The logic of cleaner thread is well documented in org.apache.hadoop.hive.ql.txn.compactor.Cleaner.java:
// First look for all the compactions that are waiting to be cleaned.  If we have not
// seen an entry before, look for all the locks held on that table or partition and
// record them.  We will then only clean the partition once all of those locks have been
// released.  This way we avoid removing the files while they are in use,
// while at the same time avoiding starving the cleaner as new readers come along.
// This works because we know that any reader who comes along after the worker thread has
// done the compaction will read the more up to date version of the data (either in a
// newer delta or in a newer base).
If you find the cleaner thread is not cleaning some really old delta directories, please run "show locks" to see if any query is holding the lock on that table for a long time.

Reference:

Hive Transactions
LanguageManual DML
==

No comments:

Post a Comment

Popular Posts