Migrating data warehouse to big data technology

Many organizations have massive amount of data in their data warehouse that keeps growing every year. Business use of this data has also increased with analysis and getting actionable insights from data becoming more relevant. However, analyzing this huge amount of data is taking increasingly more time due to massive data growth within organizations. Analysis of data has also become a specialty with many analytical firms in each field accumulating data from multiple organizations and then deriving meaning out of the data.

As more data is added, the data processing time has become prohibitive. In one of the organizations I am working with, processing of insurance customer data of 10 TB is taking more than 48 hours to process to produce intermediate tables that are used by the data scientists. This limits multiple iterations of processing and is frustrating for the analysts to get even small changes in data. With the advent of big data technology, these organizations are currently in the process of moving this massive processing to big data tools like Hadoop and Spark. In this blog, I will look at how some of the common data processing paradigms can be done faster with the big data technology.

Data Storage

Data warehouses store massive amount of data in tables that are terabytes or more in size. Columnar storage instead of row-based storage is preferred in this case as there will be lot of duplicate data in each column of a table. Spark with Hive provides Parquet format for storing data so that data can be stored in columnar format. Also data compression can be done using snappy or gzip formats so that data occupies less space. This is not to save disk space (as hard disks have become cheaper) but to save the time for writing data to disk, getting data from disk as well transferring data across the network. For example the Spark statement like ‘spark.sql(“create table customerData stored as parquet tblproperties(‘compress.mode’=’SNAPPY’) as select ….. ” can be used to create a table that is stored in parquet format with snappy compression.

Procedures, functions and SQL

In a datawarehouse, data is organized as relational tables and  lot of data processing is pushed to the server using sql stored procedures and stored functions. Many of the analysis systems have massive amount of sql code in these procedures. Moving these to big data technology has become easier with Hive sql supporting most of the sql supported by the popular RDBMS like Oracle and Mysql.

Spark, Yarn and Hive

With Apache spark making it easier to develop distributed data processing applications and its support for Hive SQL has made it easier to migrate SQL that is in popular databases like Oracle or SQL Server to big data based technology. With Spark also supporting scheduling of processes through Yarn, memory and task management have also become more robust. Yarn takes care of assigning cluster resources like memory and processor cores to the spark tasks. Spark master takes over the application master of Yarn and controls the distribution of tasks during data processing.

Distributed processing

With Spark and Yarn, massive parallelism can be achieved with horizontal scalability of processing. For example, with a cluster of 10 machines with each machine having 40 CPU cores for processing, and each core executing 2 tasks in parallel, we could get 800 tasks running and processing data in parallel. How does this translate into performance? With each task processing about 20MB of data per second (in our benchmarking), we could process 10TB of data in less than 15 minutes!

Let us look at some of the common SQL tasks and how they can be done with Spark and Hive.

Filtering

In a relational database, filtering is normally sub-second response when the column is indexed. When the column is not indexed, the table has to be scanned, resulting in performance bottlenecks. In a datawarehouse type of situation, you may need to filter any of the fields,whether it is indexed or not and with multiple conditions. Like filter records where user comment includes ‘awesome’ and user rating less than 3. With Hive and Spark, you can use a similar SQL query and the filtering gets distributed to each task and using data locality, Terabytes of data can be filtered in a reasonable time.

Transformation

Transformation is an operation which is applied to each record of data. For example producing a new field for each record with the item price and number of items multiplied to get the total amount. If each transformation is stored on disk, then the amount of storage required will multiply. Using the storage format like parquet with snappy compression, you can reduce the time required for network transfer of data as well as writing of data to disk.

Sorting/Distinct

You may want to present the data in a certain order. The data may have lot of duplicate records and you may need to get distinct data for some fields. For both of these, the data has to be sorted by the required fields. Sorting is an expensive operation, but with distributed processing, it can be done faster using merge sorting algorithms that sort each block of data locally and then merge thee sorted blocks. This still needs lot of network transfer of data but with so many tasks in parallel, can be accomplished quite quickly.

Grouping of data

From a datawarehouse, lot of reports need grouping of data based on some fields. For example the sum of amount for each item type is calculated by grouping the data using the item type field. These statements can be translated as is from relational database to Hive. Grouping involves re-distribution of data like sorting but unlike sorting, only the partial-aggregate of data needs to be shuffled across network for final aggregation. for example if you are doing grouping based on itemtype, then the aggregate within each partition is calculated for each item type and then calculated across the cluster.

Windowing and Deduplication

Windowing is an extension of grouping, where the records within a group are considered for further processing. For example, you may want to retain the first record based on a field in a group of records. Windowing is provided by the partition key word like below:

spark.sql(“SELECT  product,  category,revenue, row_number() OVER (PARTITION BY product,category ORDER BY revenue DESC) as rownumber FROM productRevenue “).show(false)

In the above query, the table productRevenue contains the fields product, category and revenue.

Windowing is many times used for de-duplication of data. In the example above, suppose there is an updateTime field and you want to retain the revenue records that have the latest updateTime, then you can use a query like below:

spark.sql(“SELECT product,category, revenue from (SELECT  product,  category,revenue, row_number() OVER (PARTITION BY product,category ORDER BY updateTime DESC) as rownumber FROM productRevenue) where rownumber = 1 “).show(false)

Like sorting and grouping, above query also needs data to be grouped and ordered within each partition and then shuffled across the network.

User defined functions

Spark has made it easier to create hive user defined functions in Scala. You can create any scala function, register it and use it in spark sql. The udf code is automatically distributed to each node in the cluster and executed for each partition. Following example shows creating an unique id for each row of a table in a distributed manner.

object myudf {

    var sequence : Long = 0

    def getId(partitionNumber: Int) : String = {

        sequence += 1

        partitionNumber + “-” + sequence

    }

}

spark.udf.register(“getId”,myudf.getId _)

spark.sql(“select *, getId(spark_partition_id()) from productRevenue”).show(false)

You can see that above udf maintains a sequence for each partition and as partition id is passed as a parameter, the sequence becomes unique across the partitions. Above udf may be similar to the virtual column ROW_ID of latest version of hive.

Dataframe vs SQL

Spark provides dataframes that are extensively used for processing of data. Spark sql provides a way to process the data using the familiar sql instead of using dataframe code. Underlying distributed database engine is same for both. I haven’t found much performance difference between these two. Spark SQL with hive is preferable as it makes it easier to migrate already existing procedure code in an RDBMS to big data technology. It is also easier to understand for data scientists who are normally familiar with SQL.

Conclusion

Organizations can make their datawarehouse processing faster by moving to big data technology. This blog discusses one such big data technology combination of Spark, Yarn and Hive that is being used by many large organizations for migrating their data warehouse. Some common aspects of moving your code from RDBMS sql to Hive sql is discussed including complicated  sql like windowing and distributed user define functions. Storage solutions for more efficient storage are also suggested.

About the Author

Ganapathi_image2

Ganapathi is a practicing expert in Big data, Hadoop, Storm and Spark. He has been managing database projects for many years and is consulting clients on Big Data implementations. He is a Cloudera certified Hadoop administrator and also a Sybase certified database administrator. He has worked with clients in US, UK, Australia, Japan and India on many large projects. He has helped in implementing large database projects in Sybase, Oracle, Informix, DB2, MySQL and recently SAPHANA. He has been using big data technologies like Apache Hadoop and Apache Spark and has been providing strategies for dealing with large databases and performance issues and helping in setting up big data clusters. He also conducts lot of trainings on Big Data and hadoop ecosystem of products like HDFS, mapreduce, hive, pig, Hbase, sqoop, flume, Cassandra, Kafka, Storm and Spark. He is based out of Bangalore, India. He can be reached at ganapathid@spideropsnet.com. He also has online big data courses on Udemy and you can subscribe to these courses at https://www.udemy.com/user/ganapathi-devappa/

Leave a Reply

Your email address will not be published. Required fields are marked *