This article explains with a practical example how to process big data (>peta byte = 10^15 byte) by using hadoop with multiple cluster definition by spark and compute heavy calculations with the aid of tensorflow libraries in Python.
Introduction
Nowadays, we encounter the phenomena of growing volume of data. Our desire to keep all of this data for a long term and process and analyze them with high velocity has been caused to build a good solution for that. Finding a suitable substitute for traditional databases which can store any type of structured and unstructured data is the biggest challenge for data scientists.
I have selected a complete scenario from the first step until the result, which is hard to find throughout the internet. I selected MapReduce
which is a processing data by Hadoop and Scala inside Intellij idea, while all of this story will happen under the Ubuntu Linux as operating system.
How Much Data Can Be Big?
What is Big Data?
Big data is huge volume of massive data which are structured, unstructured or semi structured and it is difficult to store and manage with traditional databases. Its volume is variable from terabytes to petabytes. Relational databases such as SQL Server are not suitable for storing unstructured data. As you see in the below picture, big data is set of all kinds of structured or unstructured data which has fundamental requirements such as storing, management, share, analyze.
Big data needs the process of importing and streaming huge data into storage box in order to process and analyze data.
There are two important challenges in big data:
- Collecting large volume of data (such as importing, transferring and loading data)
- Analyzing data (such as process, sorting, counting, aggregating data)
To perform the above steps, we need to establish a message system. The message system is a solution to transfer data from one application to another. There are two types of messaging systems, point to point and publish subscriber. In the point to point, the sending message can be consumed by only one consumer, while in pub-sub message system, consumer can use more than one topic from publisher.
Processing Types
Continuous Processing
In the continuous processing, there is only one task to process. It is a contrast of batch processing.
Asynchronous Processing
In the asynchronous processing, each task will be processed in a separated thread. It can execute multiple tasks at the same time. The major difference between synchronization and asynchronization is paralleling process for multiple tasks. Assume two kind of eating, one is to stand on the queue of Doner kebab at the street (synchronization process) and another is to sit on the restaurant (asynchronization process).
At Doner kebab, we have to wait until each person takes their own sandwich while in the restaurant , waitress takes note of everybody's orders and randomly based on empty cooking place, the food will be ready.
Batch Processing – Off Line
In the batch processing, firstly data store on disk which can be included one millions of records, and then processing, analyzing can be happened for all of those huge data. Hadoop via MapReduce is a good sample. There is no need to know instance and immediate result in real time in the batch processing. It is good for analyzing large data in off line mode.
Streaming Processing - Real Time
In the Streaming processing, firstly data would be injected into analytical tools such as Spark or Storm and then data will be analyzed immediately. This solution is good for processing few recent records which instance answer is required such as authentication or authorization systems. This process is quick because of processing in memory rather than disk. It has high latency due to less record to analyze and in-memory process.
Fundamental and Related Concepts
What Is Data Pipeline?
Data pipeline is a series of related and connected elements which each parts of it, process data to make it reusable for next element and also produce flexible output with the desirable shape and format.
In order to see a practical example, please look at [Read CSV file with the aid of pipeline].
Latency
Time duration for traversing data packet from one point to another. A low latency shows high network efficiency.
Throughput
The whole number of all actions in specific time from start to send until complete in the end of process.
Reliable
Reliable systems guarantee that all data will be processed properly without failure, and in the case of crashing it will re-execute it.
Fault Tolerance
Fault tolerance systems guarantee that all data will be kept properly without failure, it means that when a cluster has been crashed, the specific task will be directed to another machine.
Durable
When the process is completed, then successful message will be sent to customer. Although it sacrifices performance, it is worthy.
Scalable
Scalability will be defined mostly on distributed systems and parallel programming. It can guarantee speed in growth on the traffic and also data.
Performance
Performance systems guarantee Throughput even in the case of encountering big big data.
No SQLs
MongoDB
Mongo was written in C++. It is document oriented. It is flexible for data processing with many queries.
Redis
Redis is open source and in-memory data structure store with cache and message broker.
Big Data Tools
Hadoop/Hbase
It is open source, non-relational database and distributed. Apache HBase is on top of Hadoop.
Cassandra
It has good scalability, fault-tolerance and also lower latency while it has perfect caching.
Data Streaming
Flink
It is either batch or real-time processing. Flink has APIs for streaming, sql-query.
Storm
Storm is real-time system, with high performance and scalability. It can process more than a million records per second per node.
Kinesis
Kinesis has real-time processing.
Kafka
Kafka is publish-subscribe messaging system and real-time processing with good performance, reliability, scalability and durability.
What is Hadoop?
Indeed, we expect two issues from all databases. Firstly, we need to store data, secondly we want to process stored data properly in a fast and accurate way. Because of arbitrary shape and large volume of big data, it is impossible to store them in traditional databases. We need to think about new one which can handle either storing or processing of big data.
Hadoop is as a revolutionary database for big data, which has the capacity to save any shape of data and process them cluster of nodes. It also reduces dramatically the cost of data maintenance. With the aid of hadoop, we can store any sort of data, for example, all of user clicks for long period. So it makes easy historical analysis. Hadoop has distributed storage and also distributed process system such as Map Reduce.
What is Hadoop Ecosystem?
As I mentioned above, hadoop is proper for either storing unstructured databases or processing them. There is an abstract definition for Hadoop ecosystem. Saving data is on the left side with two different storage possibilities as HDFS and HBase. HBase is on top of HDFS and both have been written by Java.
HDFS:
- Hadoop Distributed File System allows to store large data in distributed flat files.
- HDFS is good for sequential access to data.
- There is no random real-time read/write access to data. It is more proper for offline batch processing.
HBase:
- Store data I key/value pairs in columnar fashion.
HBase
has possibility to read/write in real time.
Hadoop was written in Java but you can also implement by R, Python, Ruby.
Spark is an open source cluster computing which is implemented in Scala and is suitable to support job iteration on distributed computation. Spark has high performance.
Hive, pig, Sqoop and mahout are data access which make query to database possible. Hive and pig are SQL-like; mahout is for machine learning.
MapReduce and Yarn are both working to process data. MapReduce is built to process data in distributed systems. MapReduce can handle all tasks such as job and task trackers, monitoring and executing as parallel. Yarn, Yet Another Resource Negotiator is MapReduce 2.0 which has better resource management and scheduling.
Sqoop is a connector to import and export data from/to external databases. It makes easy and fast transfer data in parallel way.
How Hadoop Works?
Hadoop follows master/slaves architecture model. For HDFS, the name node in master monitors and keeps tracking all of slaves which are bunch of storage cluster.
There are two different types of jobs which do all of the magic for MapReduce processing. Map job sends query for processing various nodes in cluster. This job will be broken down to smaller tasks. Then Reduce job collects all of the output which each node has produced and combines them to one single value as final result.
This architecture makes Hadoop an inexpensive solution which is very quick and reliable. Divide big job into smaller ones and put each task in a different node. This story might remind you about multithreading process. In multithreading, all of concurrent processes are shared with the aid of locks and semaphores, but data accessing in MapReduce is under the control of HDFS.
Practical Example
There are three text files at the below picture for practicing word count. MapReduce starts to split each file to cluster of nodes as I explained at the top of this article. At Mapping phase, each node is responsible for count word. At intermediate splitting in each node is just hommogeous word and each number of that specific word in the previous node. Then in reducing phase, each node will be summed up and collects its own result to produce a single value.
Tensorflow on Windows - Python - CPU
Download Anaconda Python 3.6
If you want to feel easy with a comfortable IDE and professional editor, without needing to install libraries, you can use Anaconda & Spider.
Then open Anaconda Navigator from star and select and lunch “Spider”:
There are some points:
- Python is object oriented
- Dynamic Typing
- Rich Libraries
- Simple to read
- Python is case sensitive
- Indent is important for Python
Install Tensorflow
- Open Anaconda Navigator from "Start Menu" -> select "Environments" from "left panel" -> Go to the "root" -> select "All Channels" -> search for "tensor"
- Select "tensorflow", but if you feel that you need to work with R for statistical computing or GPU for having quick result, so select "r-tensorflow" and "tensorflow-gpu".
- Then, press green "Apply".
- Then again, accept the rest of the packages which are dependencies, in the next window.
What is Deep Learning?
Actually, deep learning is a branch of machine learning. Machine learning includes some different types of algorithms which get a few thousands data and try to learn from them in order to predict new events in future. But, deep learning applies neural network as extended or variant shapes. Deep learning has the capacity of handling million points of data.
The most fundamental infrastructure of deep learning could be its ability to pick the best features. Indeed, deep learning summarizes data and computes the result based on compressed data. It is what is really needed in artificial intelligence, especially when we have a huge database with dramatical computation.
Deep learning has sequential layers which is inspired from neural network. These layers have nonlinear function with the duty of feature selection. Each layer has an output which will be used as input for next layers. Deep learning applications are computer vision (such as face or object recognition), speech recognition, natural language process (NLP) and cyber threat detection.
I strongly recommend you to visit and read [this article].
Hadoop Installing and Implementing Step by Step
1. Download and Install Java
Go [here] to download this version jdk1.8.0_144
.
Select drive (C:\Java) as a path for installing.
2. Download and Install Hadoop
Download hadoop from [here] and put on drive (D:\). You should have something like the below picture:
First, make a new folder and name it "data" if there isn't one.
Format
Run as administrator "Windows Command Prompt".
D:\hadoop\bin>hadoop-data-dfs - remove all
D:\hadoop\bin>hadoop namenode -format
Start
- D:\hadoop\sbin>start-dfs.cmd
(wait one min) - D:\hadoop\sbin>yarn-dfs.cmd
You will see four windows:
yarn-resourcemanager
yarn-nodemanager
namenode
datanode
So, if you saw those 4 windows, it means everything went to right.
Adjust Environment
Test and Run Simple and Famous Python Code as wordcount:
Create new folder in D:\hdp, then create and save below python and text file on it.
wordcount-mapper.py
import sys
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print( '%s\t%s' % (word, 1))
wordcount-reducer.py
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if current_word == word:
current_count += count
else:
if current_word:
print( '%s\t%s' % (current_word, current_count))
current_count = count
current_word = word
if current_word == word:
print( '%s\t%s' % (current_word, current_count))
Create text document as "mahsa.txt":
Hello
Hello
Hello
Hello
Hello
Good
Good
Let's run it on hadoop.
D:\hadoop\sbin>hadoop fs -mkdir -p /hdp
D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\wordcount-mapper.py /hdp
D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\wordcount-reducer.py /hdp
D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\mahsa.txt /hdp
D:\hadoop\sbin>hadoop fs -ls /hdp
D:\hadoop\sbin>D:\hadoop\bin\hadoop jar D:\hadoop\share\hadoop\tools\lib\
hadoop-streaming-2.3.0.jar -file /hdp/wordcount-mapper.py -mapper
"python wordcount-mapper.py" -file /hdp/wordcount-reducer.py -reducer
"python wordcount-reducer.py" -input /hdp/mahsa.txt -output /outputpython
Test and run simple "Hello" with tensorflow python code:
Create code on D:\hdp:
@author: Mahsa """ import tensorflow as tf
hello = tf.constant('Hello, TensorFlow!') sess = tf.Session() print(sess.run(hello))
D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\tensortest.py /hdp
D:\hadoop\sbin>D:\hadoop\bin\hadoop jar D:\hadoop\share\hadoop\tools\lib\
hadoop-streaming-2.3.0.jar -D mapreduce.job.reduce=0 -file /hdp/tensortest.py
-mapper "python tensortest.py" -input /hdp/mahsa.txt -output /outputtensortest
D:\hadoop\sbin>hadoop fs -ls /outputtensortest
D:\hadoop\sbin>hadoop fs -cat /outputtensortest/part-00000
Test and run simple "Digit-Recognition" with tensorflow Python code:
Create code on D:\hdp:
"""
Created on Sun Apr 1 15:42:59 2018
@author: Mahsa
"""
from tensorflow.examples.tutorials.mnist import input_data
mnist_train = input_data.read_data_sets("data/", one_hot=True)
import tensorflow as tf
batch = 100
learning_rate = 0.01
training_epochs = 10
x = tf.placeholder(tf.float32, shape=[None, 784])
yt = tf.placeholder(tf.float32, shape=[None, 10])
Weight = tf.Variable(tf.zeros([784, 10]))
bias = tf.Variable(tf.zeros([10]))
y = tf.nn.softmax(tf.matmul(x,Weight) + bias)
cross_ent = tf.reduce_mean(-tf.reduce_sum(yt * tf.log(y), reduction_indices=[1]))
correct_pred = tf.equal(tf.argmax(y,1), tf.argmax(yt,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
train_optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cross_ent)
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
for epoch in range(training_epochs):
batch_num = int(mnist_train.train.num_examples / batch)
for i in range(batch_num):
batch_x, batch_y = mnist_train.train.next_batch(batch)
sess.run([train_optimizer], result={x: batch_x, yt: batch_y})
if epoch % 2 == 0:
print( "Epoch: ", epoch)
print ("Accuracy: ", accuracy.eval
(result={x: mnist_train.test.images, yt: mnist_train.test.labels}))
print( "Complete")
D:\hadoop\sbin>D:\hadoop\bin\hadoop jar D:\hadoop\share\hadoop\tools\lib\hadoop-streaming-2.3.0.jar -D mapreduce.job.reduce=0 -file /hdp/tensordigit.py -mapper "python tensordigit.py" -input /hdp/mahsa.txt -output /outputtensordigitt
D:\hadoop\sbin>hadoop fs -ls /outputtensordigittt
Feedback
Feel free to leave any feedback on this article; it is a pleasure to see your opinions and vote about this code. If you have any questions, please do not hesitate to ask me here.
References
- https://databricks.com/blog/2016/01/25/deep-learning-with-apache-spark-and-tensorflow.html
- http://highscalability.com/blog/2012/9/11/how-big-is-a-petabyte-exabyte-zettabyte-or-a-yottabyte.html
- http://bigdata-madesimple.com/a-deep-dive-into-nosql-a-complete-list-of-nosql-databases/
- https://northconcepts.com/docs/what-is-data-pipeline/
- http://dataaspirant.com/2017/05/03/handwritten-digits-recognition-tensorflow-python/
- http://www.algoworks.com/blog/real-time-data-streaming-tools-and-technologies/
History
- 25th April, 2018: Initial version
- 3rd April, 2019: Article updated