Sqoop - A closer look


This article is for you if you are planning to move data from RDBMS to Hadoop.

The 'R' in RDBMS stands for 'relational', a word that is hardly mentioned in Big Data dictionary. A RDBMS designer spends hours normalizing data to its third normal form only to be told by a big data guru that it is not needed. 3NF allows for efficient operations on data by removing redundancy. A well-designed DB takes the form of tables having references amongst them.

Hadoop in its most basic form comprises of 2 services; a service to store data and another to use it. They go by the name HDFS and MapReduce, respectively. So in essence you are moving data from RDBMS to HDFS (and not Hadoop) . The reason you still need to care about MapReduce is 'cause it drives the use case in HDFS.

Big Data natively doesn't have support for so many things that DBAs’ are used to expect. There are no schema ( schema on read ) , sometimes no consistency (eventual consistency in Cassandra ), no transaction, no rollback, no locking ( locking in distributed system requires additional service ) , no secondary key. But there are workarounds for most shortcomings. Yet the question is not so much as to whether you should move but when. Competition is catching up.

A popular tool to ingest data is Sqoop. It is a component in the data pipeline. Below are a list of things to consider before you move along with my recommendation:

  1. Null – representation
  2. Join – map-side
  3. File format
  4. Load – incremental
  5. Compression – splittable
  6. Mappers

1. Null – representation

‘null’ is represented differently on different systems. Correct representation of null allows queries to use construct like “field is not null” rather than “field != “ but more importantly if null is not handled correctly, then left outer join & right outer join will not have the desired result.

In HDFS null is represented as blank. Hive table sits on HDFS but its representation for null is ‘\\N’

When importing data from RDBMS to Hive one should specify the null flag as
—null-string ‘\\N’ -null-non-string ‘\\N’

In columnar file format, null is not stored, hence it has no space overhead. Using null value for empty string improves query performance.

2. Join – map-side

Join doesn’t come naturally to Hadoop as it does to RDBMS. Join in Hadoop is an expensive operation. Nested Joins can be very inefficient. For join to work efficiently, one of the two tables that participate in the join should fit in memory. In big data, this may not always be possible so the query optimizer may not opt to perform map side join to improve performance.

To port data from RDBMS to Hadoop it is thus desirable to de-normalize data even if it leads to redundancy. Tables from RDBMS shouldn’t be ported to Hadoop ‘as is’. Where ever possible references should be removed with actual data.

As a toy example , lets say we have a transaction table having the fields (Payee_ID, Payer_ID, Amount, Date) and a master table that has the Payee details (Payee_ID, name). One should join the 2 tables at source using free form query import to extract one table out of two ( use --query instead of --table arguments ) .

--query 'select transaction.*, payee.* from transaction join payee on transaction.Payee_ID == payee.Payee_ID'

3. File format

File format effects query performance. Columnar format has some advantages
  • columns can be compressed more than rows, ‘cause of lower cardinality. More data can be compacted in a block so fewer blocks needs to be opened for I/O.
  • vertical partitioning on existing horizontal partition provides another dimension for optimization.
  • entire column can be serialized in a byte buffer in memory, which may not be possible otherwise.

Hadoop was initially used to process log file that is in Text format. A binary format called Sequence was later supported, but Avro became more popular with non-Java programmers.
Parquet is the file format of choice when using Hive. Sqoop import allows the argument --as-parquetfile . If you do not want to be locked to Hive technology then your choices are Text, Sequence & Avro.  

4. Load – incremental

In subsequent import, you should get only the data that has changed. This can be accomplished using incremental load. Sqoop job makes this easier by saving the parameters but more importantly by internally keeping track of the state of the most recent imported row to assist subsequent run. sqoop2 which is offered as a service, takes it to another level by providing REST API as well.

There are 2 types on incremental load. If new data is being appended to existing data then the use case is “append”, else it is “lastmodifified” (read update). In either case you need to identify a column that is indicative of the change appropriately called “check column”. 

--check-column=transaction_id --incremental=append –last_value=100

If you run sqoop as a job you need not specify the –last-value as it is internally retained in saved job. The password can be stored in sqoop metastore but this is not recommended.

5. Compression - splittable

File format is different than Compression. Even though some file format stores data in a form that takes less storage, you should still enable compression. The only time you will not use compression is when the data is already compressed.
Compression is useful for both data in rest and in motion. For data in motion, compression used less network bandwidth to transfer data. For data in rest, compressed data uses fewer files to store the same data, thus saving the cost incurred during disk I/O.

Compression can be enabled using --compress flag. Compression that are splittable are more preferred as such data can be distributed in Hadoop. Snappy & LZOP are popular even when they compress less than the default codec gzip. Use –compression-codec to specify the new codec.

Though more compression is always desirable, it comes with a cost. Compressing & uncompressing uses CPU and memory resource. Import during non-peak hours.

6. Mappers 

If data import takes a long time, you should experiment with number of mappers to increase parallelism. Note, to achieve true parallelism ( as in Hadoop distributed copy distcp ) both ends should be tuned to support more processes.
–num-mappers is how you would tune Hadoop and not RDBMS.

Increase this value from its default of 4 to at least the number of Datanodes in the Hadoop cluster and go further until you see no further improvement.