Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / web / HTML

Apache Flume: Converting CSV to AVRO and Parquet using Morphline and Kite

4.00/5 (1 vote)
21 Jan 2019CPOL2 min read 4.1K  
How to convert CSV flume events to Avro and Parquet using Morphline interceptor and Kite sink

This blog explains how to convert csv flume events to Avro and Parquet using Morphline interceptor and Kite sink.

The code is available on github here. Look at the two files:

  1. orders.avsc, which describes avro schema for input events, and
  2. part-m-00000 which hold our csv data.

Remember the field names given in avro schema file - "orders.avsc."

Pre-requisites

  • This code is tested on my cloudera setup version 5.15. Explained in this blog.

Step 1 - Create Kite Datasets with Avro and Parquet Format

kite-dataset create orders_parquet --schema orders.avsc --format parquet
kite-dataset create orders_avro --schema orders.avsc --format avro

Step 2 - Create Configuration File for morphline Interceptor (morphline.conf)

morphlines : [{
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    commands : [
      { readCSV {
          charset : UTF-8
          columns : [order_id,order_date,order_customer_id,order_status]
        }
      }
      { toAvro {
           schemaFile : /home/skamalj/learn-flume2/orders.avsc
        }
      }
      { writeAvroToByteArray {
          format : containerlessBinary
        }
      }
   ]
  }
]
  • In morphline, commands are piped to each other where events pass from one command to another. The first one in list is readCSV - Here, we read the line and assign name to each field which is exactly the same as what we defined in our avro schema file - orders.avsc.
  • The next command is to convert the events to avro using schema file. Here, column header names are mapped 1-on-1 to avro schema.
  • Next, we need containerless events, Kite will create container for us (with snappy compression which is default)
  • Make sure you change the highlighted path as per your setup. This is local directory path, not hdfs.

Step 3 - Create Flume Configuration

Only a section of the code is explained below:

a1.sources.k1.interceptors = schemaheader morphlineinterceptor

a1.sources.k1.interceptors.schemaheader.type = static
a1.sources.k1.interceptors.schemaheader.key = flume.avro.schema.url
a1.sources.k1.interceptors.schemaheader.value = 
    hdfs://cloudera-master:8020/user/skamalj/avro-schemas/orders.avsc

a1.sources.k1.interceptors.morphlineinterceptor.type = 
    org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.k1.interceptors.morphlineinterceptor.morphlineFile = 
    /home/skamalj/learn-flume2/morphline.conf
a1.sources.k1.interceptors.morphlineinterceptor.morphlineId = morphline1

a1.sinks.s1.type = org.apache.flume.sink.kite.DatasetSink
a1.sinks.s1.channel = c1
a1.sinks.s1.kite.dataset.uri = dataset:hive://cloudera-master:9083/default/orders_parquet
  • We need two interceptors: 
    • One to inject "flume.avro.schema.url" header value (lines 3-5). This is used by Kite sink to parse avro events. To use this, load the orders.avsc file to hdfs and provide the url as value.
    • Number two is the morphline interceptor (lines 7-9) which we created in step 2. This one converts the events to avro.
  • Lines 11-13 configure Kite sink, this one is for parquet.  
  • If you look at the whole code in github, you will see that we have two channels and two sinks - One set is for parquet and another is for Avro.
  • Make sure you change the highlighted path as per your setup. (Morphline path is local directory not hdfs.)

Step 4 - Execute

To execute, you need to set hive and hcat home variables. In addition, ensure that you bump up JVM memory (-X...) as well, default memory setting is insufficient to work with Morphline and Kite.

export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export HCAT_HOME=/opt/cloudera/parcels/CDH/lib/hive-hcatalog
flume-ng agent --name a1 --conf . --conf-file flume_morph_kite.conf -Xms4096m -Xmx8192m

Check your results for both as shown in the screenshot.

Image 1

All done!

History

  • 21st January, 2019: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)