Introduction
This project demonstrates the use of Apache HBase (http://hbase.apache.org/) with the Java API. It is intended as a starting point for exploring the capabilities of HBase and to give developers new to HBase an overview on getting started. Updated code for this project can be found at GitHub (https://github.com/ggraham-412/HBaseJavaExample, commit b1fdec) and in the accompanying zip file.
Background
Apache HBase is a column-oriented, key-value NoSQL database modeled after Google's BigTable
(http://research.google.com/archive/bigtable.html). HBase is designed to work with Hadoop Distributed File Store (HDFS), and it is designed from the outset for scalability on clusters of commodity hardware. As with other NoSQL database projects, HBase delivers on its scalability promise by giving up on some of the features of a traditional RDBMS, such as transactional integrity, referential integrity, and ACID (https://en.wikipedia.org/wiki/ACID) guarantees. HBase preserves some of these guarantees, and only under certain conditions.
HBase implements a horizontally partitioned key value map. Every item in HBase is addressable by a row key, a column family, and a column name within the family. Furthermore, every item is versioned by timestamp. HBase will store up to N versions of data with N being settable on the column family. When querying HBase, if the version is not given, then the most recent data is returned.
{row key, column family:column, version} -> {data item}
The row key, column family, and column are represented as byte arrays. Although string
s are commonly used for all three, only the column family has the restriction of using printable characters. The version must be a long integer.
Records are clustered lexicographically by row key. It is the only sortable key, and so it is common practice to make it a munged compound key. Care is required in the design of the row key, and the choice of munging must reflect the expected nature of the queries to be supported. In a fully distributed HBase
system, data will be housed on "region servers" based on regions of the row key space.
Using the Code
Installation and Deployment
This project contains example code for accessing HBase from Java. The example code will import daily stock price data from Google Finance into HBase and run simple queries against it. The example was developed with HBase 1.0.1.1 or compatible, Java 8 JDK update 60, and Fedora 22 linux (4.1.6-200.fc22.x86_64). It should also run on Windows using Cygwin (http://hbase.apache.org/cygwin.html), but this is untested by me.
Unpack the HBase archive and edit the configuration scripts if desired. HBase should start up running against the /tmp folder by default, and not using HDFS. To change the folder HBase uses for its store, edit the configuration file conf/hbase-site.xml as follows:
<configuration>
<property>
<name>hbase.rootdir</name>
<value>file:///data/hbase</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/data/zookeeper</value>
</property>
</configuration>
The foregoing configuration will cause HBase to use the /data folder on the local host. Note that it is not necessary to create the /data/hbase and the /data/zookeeper folders; HBase will do that for you. However, the /data folder should be writable by whatever user is running the HBase daemon.
To start HBase, issue the command:
bin/start-hbase.sh
The example code contained in this archive was compiled under Java version 8 update 60 from http://www.java.com.
Compilation
The Java code must be compiled against a rather large number of jar files that come with HBase. I do not know if all of the jar files are really needed, but including them all works. There is a small shell script called makeCPATH
to help with this. The script must be sourced with the location of the lib folder as first argument.
. ./makeCPATH.sh /path/to/hbase/lib
echo $CPATH
Afterwards, the variable CPATH
should contain a list of all of the jar files in the HBase lib folder. On Windows, you'll need to write an equivalent .bat file to do the same thing. Alternatively, you can import this code into an IDE like eclipse and set the project build path to include all of the jar files using a dialog box interface.
To compile the Java code, change to the folder containing the Java source code for this example, e.g.- TestHBase.java. Execute the command:
javac -cp $CPATH *.java
Running the Example
The location of the configuration folder of HBase should be set in the environment variable HBASE_CONF_DIR
. This allows the Java code to find and read the HBase configuration. (The file hbase-site.xml should be in this folder.)
In addition, the Java environment variable JAVA_HOME
should be set to the folder containing the partial path "bin/java" for your Java installation. (NOTE: Make sure this is the installation folder and not a folder containing a symbolic link. For example, it should look like "/usr/java/jdk1.8.0_60/jre".)
The example code comes with four stock price datasets from Google Finance obtained through http://www.quandl.com for the symbols ABT, BMY, MRK, and PFE. These datasets are contained in the folder FinData
. The TestHBase
class is defined outside of a package, so you can run it by just:
java -cp $CPATH:. TestHBase
The code will connect to the HBase instance defined in the conf/hbase-site.xml configuration file. Then, it will drop the table (if it already exists from a previous run), (re)create the table, load the four example stock datasets into the table, and run some example queries.
The name of the table is BarData
. It will contain daily "candlestick" bars of stock price movements: opening, high, low, and closing prices, and the daily volume. This table can be inspected offline with the hbase shell. (See https://learnhbase.wordpress.com/2013/03/02/hbase-shell-commands/ for more information on the HBase shell.)
Points of Interest
The center of the design is a DAO inspired class called BarDatabase
. The schema is specified by byte array constants in the class to avoid unnecessary overhead of repeatedly converting table, row and column string
names to byte arrays for the Java HBase API. The class avoids use of intermediate data objects, and instead delegates responsibility to specialized interfaces for reading data from a data source and for processing data returned from queries. More can be done here to use raw streams and to avoid row oriented lines of text.
Creating/Deleting a Table
try (Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin()) {
HTableDescriptor table =
new HTableDescriptor(TableName.valueOf(TABLE_NAME));
table.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
if (!admin.tableExists(table.getTableName())) {
System.out.print("Creating table. ");
admin.createTable(table);
System.out.println(" Done.");
}
}
Every operation in HBase takes place in the context of a connection. We use the AutoClose
feature to guarantee that the connection is closed at the end of the try
block. A table is then created (or dropped) using an HTableDescriptor
and the HBase Admin interface. The only significant difference between creating a new table or deleting an existing table is that an existing table must be disabled before it can be deleted.
if (admin.tableExists(table.getTableName())) {
System.out.print("Dropping table. ");
admin.disableTable(table.getTableName());
admin.deleteTable(table.getTableName());
System.out.println(" Done.");
}
Column families are administrative scopes for the columns they logically contain. The limit on the number of versions to keep for columns is an example of an administrative parameter that is defined at the column family level.
Importing Data
Data can be imported using the Put
object. This can be done on a row by row basis, or a List of Put
objects can be imported at once. (I am not sure if this is a true bulk operation, but it is about twice as fast loading data on the local system this way rather than importing each Put
after reading every row record.
In the example code, since the data is resident in CSV files, this is achieved by a class called LineImporter
, which is an inner class of BarDatabase
that implements a callback interface. After each line is read by a text file LineReader
instance, the LineImporter
creates a Put
object and saves it in a list called currentImport
. When the file stream is closed, the LineImporter
instance bulk loads the data into HBase
.
@Override
public void close() throws Exception {
if ( currentImport.isEmpty() ) return;
try (Connection conn = ConnectionFactory.createConnection(config)) {
Table table = conn.getTable(TableName.valueOf(TABLE_NAME));
table.put(currentImport);
table.close();
}
finally {
currentImport.clear();
}
}
Querying Data
There are two query methods in general: Get
and Scan
. Get
is intended to find single rows (or single cells of single rows), whereas Scan
is intended to return row sets. Get
is parameterized by a row key, and an optional column family, an option column within the family, and an optional version number. Here is an example query to retrieve only the closing price of a given stock.
public String GetCell(String date, String symbol, byte[] column)
throws IOException {
try (Connection conn = ConnectionFactory.createConnection(config)){
Table table = conn.getTable(TableName.valueOf(TABLE_NAME));
Get get = new Get(makeKey(date, symbol));
get.addColumn(COLUMN_FAMILY, column);
Result r = table.get(get);
if ( r.isEmpty() ) return null;
return new String(r.value());
}
}
In this case, both a column family and column within the family are specified. If we had specified only the family with an addFamily
invocation, then all columns in that family would be returned.
For a scan example, consider the following code. Instead of specifying a particular row key, you specify a starting row key and a limit. This may seem somewhat limited (no pun intended) in terms of the query power, but you can also specify row filters via the setFilter()
method that execute server-side.
However, a Scan
remains true to its name: it will hit every row key in the scan. The row limit is implemented purely in the client side code. (It is also implemented client side in the official HBase shell interactive version of scan.) But you can set the cache limit server side and use a PageFilter
to keep the servers from churning through every row when you only wanted a few.
public void ScanRows(String startDate, String symbol,
int limit, DataScanner scanner) throws IOException {
ResultScanner results = null;
try (Connection conn = ConnectionFactory.createConnection(config)){
Table table = conn.getTable(TableName.valueOf(TABLE_NAME));
Scan scan = new Scan();
scan.setStartRow(makeKey(startDate, symbol));
scan.setCaching(limit);
scan.setFilter(new PageFilter(limit));
results = table.getScanner(scan);
int count = 0;
for ( Result r : results ) {
scanner.ProcessRow(r);
if ( count++ >= limit ) break;
}
}
finally {
if ( results != null ) results.close();
}
}
Also in the above code, we see that the query results are not process in-situ, but are sent directly to a callback interface. Again, this is done to avoid creation of intermediate objects. In the example case, the DataScanner
provided simply dumps the output to stdout.
The Row Key
We mentioned above the importance of designing a good row key. In this example, the row key was chosen to be a munge of stock name followed by date. In a fully distributed system, this means that rows are going to be allocated to servers first based on their stock symbol and then based on their date. This has important benefits. Queries against historical data are likely going to be focused mainly on yesterday's data, and following that in popularity will be the day before yesterday, etc. Analysts are rarely going to look at the stock price of, say, IBM in 1987. So if the row key had been designed the other way with the date first, days would tend to be clustered together, and real query traffic would tend to bombard a small fraction of the HBase
region servers in your cluster!
Conclusion
I didn't talk about Zookeeper. Zookeeper is a kind of distributed task manager that synchronizes configuration and coordinates distributed services. Since this was intended to be a simple example running on a single node, I didn't talk about it.
NoSQL databases offer improved scalability for hosting large data stores, in the range of billions of rows and millions of columns. These database systems deliver on that promise by relaxing various constraints imposed by the relational model, and the choice of relaxed constraint is driven by particular use cases. I think the plethora of NoSQL databases owes to the many different use cases and many different ways and degrees of relaxing the relational model. (http://nosql-database.org/).