Goal:
How to modify hbase thrift client code if Hbase Thrift Service enables framed transportation and compact protocol.The background is:
To avoid thrift service crash issue mentioned in HBASE-11052, we need to enable framed transport and compact protocol in hbase-site.xml and restart Hbase Thrift Service as below:
<property> <name>hbase.regionserver.thrift.framed</name> <value>true</value> </property> <property> <name>hbase.regionserver.thrift.framed.max_frame_size_in_mb</name> <value>2</value> </property> <property> <name>hbase.regionserver.thrift.compact</name> <value>true</value> </property>After that, the old Hbase thrift client code need to be modified, otherwise it will fail with below error:
thrift.transport.TTransport.TTransportException: TSocket read 0 bytesThis article explains what to modify in hbase thrift code to make the job compatible with framed transport and compact protocol.
Env:
HBase 1.1.1MapR 5.2
Solution:
1. If framed transport is enabled
We need to modify FROM:from thrift.transport import TTransport transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))TO:
from thrift.transport.TTransport import TFramedTransport transport = TFramedTransport(TSocket.TSocket(host, port))
2. If compact protocol is enabled
We need to modify FROM:from thrift.protocol import TBinaryProtocol protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)TO:
from thrift.protocol import TCompactProtocol protocol = TCompactProtocol.TCompactProtocol(transport)
One complete example code in python is as below:
from thrift.transport import TSocket
#from thrift.protocol import TBinaryProtocol
from thrift.protocol import TCompactProtocol
#from thrift.transport import TTransport
from thrift.transport.TTransport import TFramedTransport
from hbase import Hbase
host = "localhost"
port = "9090"
tablename = "/user/mapr/some_table"
numRows = 100
columnName = "cf1:col1"
# Connect to HBase Thrift server
#transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))
transport = TFramedTransport(TSocket.TSocket(host, port))
#protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
protocol = TCompactProtocol.TCompactProtocol(transport)
# Create and open the client connection
client = Hbase.Client(protocol)
transport.open()
# Scan the maprdb table
scan = Hbase.TScan(startRow="111", stopRow="222")
scannerId = client.scannerOpenWithScan(tablename, scan, None)
row = client.scannerGet(scannerId)
rowList = client.scannerGetList(scannerId,numRows)
while rowList:
for row in rowList:
message = row.columns.get(columnName).value
rowKey = row.row
print "rowKey = " + rowKey + ", columnValue = " + message
rowList = client.scannerGetList(scannerId,numRows)
client.scannerClose(scannerId)
transport.close()
No comments:
Post a Comment