Serverless processing of CSV files using BigQuery

CSV or comma separated files are a lifeline for many organizations and managers as we receive these files from our clients, dump data from databases like Oracle or MySQl in the delimited formats and load them to various tools to process them. Here CSV doesn’t necessarily be comma delimited but can be any delimeter like a “|” character or ; etc. Organizations can get millions of records every hour in this format and need to quickly process them and summarize them.

Serverless computing is the new paradigm for data processing  and is becoming very popular now with cloud providers. Here you pay only for the data you are processing on-demand and don’t have to worry about the machines required to run the queries. The provider will use all the processing power available to run the query and give the results. There is no need to provision and manage the servers, scale up or down.

Google BigQuery is becoming a popular service for server-less processing of data. You can store your data in BigQuery and run queries to process terabytes and petabytes of data without provisioning any machines. But what if your data is already stored in Google Cloud Storage and you want to run queries against the data?   With big query external tables, you can query the data as is in Google Storage without loading them to BigQuery.

So you have say, the sales data in Google storage, saved as CSV files with a file each hour named as ‘retail_YYYYMMDD_HH.csv. You have more than a million records every hour and 24 hours a day from global sales and records with some fields like below:

BranchID,Sale Time,NAICS Code,Business Kind,Item Name,Cost,Transaction Number
1,2017-12-01 02:51:31,451216,Book Stores,Fiction,12.45,11893

You can create an external table based on this data with infer schema but you want to make sure that data is interpreted with correct type, so it is better to provide the schema with this data.

First create a dataset to hold the data. We will use the bq command from the cloud shell. First time you run this, it may authenticate and setup the BigQuery APIs.

bq mk my_dataset

Next create a sample table definition from the csv files using the bq mkdef command.

bq mkdef --autodetect --source_format="CSV" "gs://my_retail_bucket/retail*csv" > auto_tabledef

You can modify the above file to correct the schema as below:

{
"schema": {
  "fields": [
    { "name": "branchId", "type": "INT64" },
    { "name": "saleTime", "type": "datetime" },
    { "name": "NAICS", "type": "string" },
    { "name": "businessKind", "type": "string" },
    { "name": "itemName", "type": "string" },
    { "name": "itemCost", "type": "NUMERIC" },
    { "name": "trnNumber", "type": "INT64" }
  ]
},
  "autodetect": false,
  "csvOptions": {
    "encoding": "UTF-8",
    "quote": "\"",
    delimeter: ",",
    skip_leading_rows=0
  },
  "sourceFormat": "CSV",
  "sourceUris": [
    "gs://my_retail_bucket/retail*csv"
  ]
}
With this table definition, you can create the external table using bq mk command. Let us create the table my_dataset.sales:

bq mk --external_table_definition auto_tabledef --table my_dataset.sales

Since the data is already in GCS, there is no need to load the data into BigQuery. Now we can directly query the data with a query like:

bq query “select NAICS, sum(itemCost) from my_dataset.sales group by NAICS”

If you want to see the count for just one hour, then you can avoid scanning all the other data by just specifying the file name in the query. Big query provides _FILE_NAME pseudo column which matches the full file path of the file and is evaluated first so that other files are not scanned. Note that this pseudo column needs to be aliased for selection.

bq query "select _FILE_NAME as file,count(*) as records from my_dataset.sales group by file"




filerecords
gs://my_retail_bucket/retail_20171101_10.csv479223
gs://my_retail_bucket/retail_20171101_11.csv495084
gs://my_retail_bucket/retail_20171101_12.csv 496017
gs://my_retail_bucket/retail_20171101_13.csv 479223
Query complete (2.8 sec elapsed, 114.4 MB processed)

You can see that the query scanned about 114 MB of data. You can query with the file name to restrict the files processed by BigQuery like below:

          
bq query "select _FILE_NAME as file,count(*) as records from my_dataset.sales where _FILE_NAME like '%20171101_10%' group by file"
filerecords
gs://my_retail_bucket/retail_20171101_10.csv479223

Query complete (2.2 sec elapsed, 37.3 MB processed)

You can see that the amount of data processed has reduced drastically as it had to scan only one file. Now suppose you want to just get the counts for each hour and store into a separate table, then you can use export data to store the query results back to Google storage like below:

The * wild card is used to store data as separate files if it is greater than 10GB.

Export data options:

bq query --use_legacy_sql=false "EXPORT DATA OPTIONS(
  uri='gs://my_summary_bucket/retailOut_*.csv',
  format='CSV',
  overwrite=true,
  header=true,
  field_delimiter=',') AS
select _FILE_NAME as file,count(*) as records from my_dataset.sales where _FILE_NAME like '%20171101_10%' group by file
"

Check the output with:

gsutil ls gs://my_summary_bucket
gsutil cat gs://my_summary_bucket/retailOut*
file, records
gs://my_retail_bucket/retail_20171101_10.csv,479223

Now you know how to use the power of server-less computing with BigQuery and the convenience of csv storage. In this blog, I have created an external table using existing data in GCS, run a query on that using the pseudo column _FILE_NAME and stored the results as CSV file on Google Cloud Storage. This is a very popular use case for server-less processing of big data.

BigQuey also supports other formats like AVRO and Parquet for external table data stored on Google Cloud Storage. In these formats, the schema can be auto detected and no need to edit or supply the schema separately.

About the Author

Ganapathi_image2

Ganapathi is a practicing expert in Big data, Hadoop, Storm and Spark. He is a Google Certified Cloud Architect. He has been managing database projects for many years and is consulting clients on Big Data implementations. He is a Cloudera certified Hadoop administrator and also a Sybase certified database administrator. He has worked with clients in US, UK, Australia, Japan and India on many large projects. He has helped in implementing large database projects in Sybase, Oracle, Informix, DB2, MySQL and recently SAPHANA. He has been using big data technologies like Apache Hadoop and Apache Spark and has been providing strategies for dealing with large databases and performance issues and helping in setting up big data clusters. He also conducts lot of trainings on Big Data and hadoop ecosystem of products like HDFS, mapreduce, hive, pig, Hbase, sqoop, flume, Cassandra, Kafka, Storm and Spark. He is based out of Bangalore, India. He can be reached at ganapathid@spideropsnet.com. He also has online big data courses on Udemy and you can subscribe to these courses at https://www.udemy.com/user/ganapathi-devappa/

Leave a Reply

Your email address will not be published. Required fields are marked *