This blog explains how to convert csv flume events to Avro and Parquet using Morphline interceptor and Kite sink.
The code is available on github here. Look at the two files:
- orders.avsc, which describes avro schema for input events, and
- part-m-00000 which hold our csv data.
Remember the field names given in avro schema file - "orders.avsc."
Pre-requisites
- This code is tested on my cloudera setup version 5.15. Explained in this blog.
Step 1 - Create Kite Datasets with Avro and Parquet Format
kite-dataset create orders_parquet --schema orders.avsc --format parquet
kite-dataset create orders_avro --schema orders.avsc --format avro
Step 2 - Create Configuration File for morphline Interceptor (morphline.conf)
morphlines : [{
id : morphline1
importCommands : ["org.kitesdk.**"]
commands : [
{ readCSV {
charset : UTF-8
columns : [order_id,order_date,order_customer_id,order_status]
}
}
{ toAvro {
schemaFile : /home/skamalj/learn-flume2/orders.avsc
}
}
{ writeAvroToByteArray {
format : containerlessBinary
}
}
]
}
]
- In morphline, commands are piped to each other where events pass from one command to another. The first one in list is
readCSV
- Here, we read the line and assign name to each field which is exactly the same as what we defined in our avro schema file - orders.avsc. - The next command is to convert the events to avro using schema file. Here, column header names are mapped 1-on-1 to avro schema.
- Next, we need containerless events, Kite will create container for us (with snappy compression which is default)
- Make sure you change the highlighted path as per your setup. This is local directory path, not hdfs.
Step 3 - Create Flume Configuration
Only a section of the code is explained below:
a1.sources.k1.interceptors = schemaheader morphlineinterceptor
a1.sources.k1.interceptors.schemaheader.type = static
a1.sources.k1.interceptors.schemaheader.key = flume.avro.schema.url
a1.sources.k1.interceptors.schemaheader.value =
hdfs://cloudera-master:8020/user/skamalj/avro-schemas/orders.avsc
a1.sources.k1.interceptors.morphlineinterceptor.type =
org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.k1.interceptors.morphlineinterceptor.morphlineFile =
/home/skamalj/learn-flume2/morphline.conf
a1.sources.k1.interceptors.morphlineinterceptor.morphlineId = morphline1
a1.sinks.s1.type = org.apache.flume.sink.kite.DatasetSink
a1.sinks.s1.channel = c1
a1.sinks.s1.kite.dataset.uri = dataset:hive://cloudera-master:9083/default/orders_parquet
- We need two interceptors:
- One to inject "
flume.avro.schema.url
" header value (lines 3-5). This is used by Kite sink to parse avro events. To use this, load the orders.avsc file to hdfs and provide the url as value. - Number two is the morphline interceptor (lines 7-9) which we created in step 2. This one converts the events to avro.
- Lines 11-13 configure Kite sink, this one is for parquet.
- If you look at the whole code in github, you will see that we have two channels and two sinks - One set is for parquet and another is for Avro.
- Make sure you change the highlighted path as per your setup. (Morphline path is local directory not hdfs.)
Step 4 - Execute
To execute, you need to set hive and hcat home variables. In addition, ensure that you bump up JVM memory (-X...) as well, default memory setting is insufficient to work with Morphline and Kite.
export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export HCAT_HOME=/opt/cloudera/parcels/CDH/lib/hive-hcatalog
flume-ng agent --name a1 --conf . --conf-file flume_morph_kite.conf -Xms4096m -Xmx8192m
Check your results for both as shown in the screenshot.
All done!
History
- 21st January, 2019: Initial version