Monday, October 30, 2017

How to modify hbase thrift client code if Hbase Thrift Service enables framed transport and compact protocol

Goal:

How to modify hbase thrift client code if Hbase Thrift Service enables framed transportation and compact protocol.
The background is:
To avoid thrift service crash issue mentioned in HBASE-11052, we need to enable framed transport and compact protocol in hbase-site.xml and restart Hbase Thrift Service as below:
<property> 
  <name>hbase.regionserver.thrift.framed</name> 
  <value>true</value> 
</property> 
<property> 
  <name>hbase.regionserver.thrift.framed.max_frame_size_in_mb</name> 
  <value>2</value> 
</property> 
<property> 
  <name>hbase.regionserver.thrift.compact</name> 
  <value>true</value> 
</property>
After that, the old Hbase thrift client code need to be modified, otherwise it will fail with below error:
thrift.transport.TTransport.TTransportException: TSocket read 0 bytes
This article explains what to modify in hbase thrift code to make the job compatible with framed transport and compact protocol.

Env:

HBase 1.1.1
MapR 5.2

Solution:

1. If framed transport is enabled

We need to modify FROM:
from thrift.transport import TTransport
transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))
TO:
from thrift.transport.TTransport import TFramedTransport
transport = TFramedTransport(TSocket.TSocket(host, port))

2. If compact protocol is enabled

We need to modify FROM:
from thrift.protocol import TBinaryProtocol
protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
TO:
from thrift.protocol import TCompactProtocol
protocol = TCompactProtocol.TCompactProtocol(transport)

One complete example code in python is as below:
from thrift.transport import TSocket
#from thrift.protocol import TBinaryProtocol
from thrift.protocol import TCompactProtocol
#from thrift.transport import TTransport
from thrift.transport.TTransport import TFramedTransport
from hbase import Hbase

host = "localhost"
port = "9090"
tablename = "/user/mapr/some_table"
numRows = 100
columnName = "cf1:col1"

# Connect to HBase Thrift server
#transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))
transport = TFramedTransport(TSocket.TSocket(host, port))

#protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
protocol = TCompactProtocol.TCompactProtocol(transport)

# Create and open the client connection
client = Hbase.Client(protocol)
transport.open()

# Scan the maprdb table
scan = Hbase.TScan(startRow="111", stopRow="222")
scannerId = client.scannerOpenWithScan(tablename, scan, None)
row = client.scannerGet(scannerId)
rowList = client.scannerGetList(scannerId,numRows)

while rowList:
          for row in rowList:
                    message = row.columns.get(columnName).value
                    rowKey = row.row
                    print "rowKey = " + rowKey + ", columnValue = " + message
          rowList = client.scannerGetList(scannerId,numRows)

client.scannerClose(scannerId)

transport.close()


No comments:

Post a Comment

Popular Posts