Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / artificial-intelligence / machine-learning

Big Data

4.98/5 (24 votes)
3 Apr 2019CPOL11 min read 48.2K  
It is almost everything about big data.
This article explains with a practical example how to process big data (>peta byte = 10^15 byte) by using hadoop with multiple cluster definition by spark and compute heavy calculations with the aid of tensorflow libraries in Python.

Image 1

Introduction

Nowadays, we encounter the phenomena of growing volume of data. Our desire to keep all of this data for a long term and process and analyze them with high velocity has been caused to build a good solution for that. Finding a suitable substitute for traditional databases which can store any type of structured and unstructured data is the biggest challenge for data scientists.

I have selected a complete scenario from the first step until the result, which is hard to find throughout the internet. I selected MapReduce which is a processing data by Hadoop and Scala inside Intellij idea, while all of this story will happen under the Ubuntu Linux as operating system.

How Much Data Can Be Big?

Image 2

What is Big Data?

Big data is huge volume of massive data which are structured, unstructured or semi structured and it is difficult to store and manage with traditional databases. Its volume is variable from terabytes to petabytes. Relational databases such as SQL Server are not suitable for storing unstructured data. As you see in the below picture, big data is set of all kinds of structured or unstructured data which has fundamental requirements such as storing, management, share, analyze.

Image 3

Big data needs the process of importing and streaming huge data into storage box in order to process and analyze data.

There are two important challenges in big data:

  1. Collecting large volume of data (such as importing, transferring and loading data)
  2. Analyzing data (such as process, sorting, counting, aggregating data)

To perform the above steps, we need to establish a message system. The message system is a solution to transfer data from one application to another. There are two types of messaging systems, point to point and publish subscriber. In the point to point, the sending message can be consumed by only one consumer, while in pub-sub message system, consumer can use more than one topic from publisher.

Image 4

Processing Types

Continuous Processing

In the continuous processing, there is only one task to process. It is a contrast of batch processing.

Asynchronous Processing

In the asynchronous processing, each task will be processed in a separated thread. It can execute multiple tasks at the same time. The major difference between synchronization and asynchronization is paralleling process for multiple tasks. Assume two kind of eating, one is to stand on the queue of Doner kebab at the street (synchronization process) and another is to sit on the restaurant (asynchronization process).
At Doner kebab, we have to wait until each person takes their own sandwich while in the restaurant , waitress takes note of everybody's orders and randomly based on empty cooking place, the food will be ready.

Batch Processing – Off Line

In the batch processing, firstly data store on disk which can be included one millions of records, and then processing, analyzing can be happened for all of those huge data. Hadoop via MapReduce is a good sample. There is no need to know instance and immediate result in real time in the batch processing. It is good for analyzing large data in off line mode.

Streaming Processing - Real Time

In the Streaming processing, firstly data would be injected into analytical tools such as Spark or Storm and then data will be analyzed immediately. This solution is good for processing few recent records which instance answer is required such as authentication or authorization systems. This process is quick because of processing in memory rather than disk. It has high latency due to less record to analyze and in-memory process.

Fundamental and Related Concepts

What Is Data Pipeline?

Data pipeline is a series of related and connected elements which each parts of it, process data to make it reusable for next element and also produce flexible output with the desirable shape and format.
In order to see a practical example, please look at [Read CSV file with the aid of pipeline].

Image 5

Latency

Time duration for traversing data packet from one point to another. A low latency shows high network efficiency.

Throughput

The whole number of all actions in specific time from start to send until complete in the end of process.

Reliable

Reliable systems guarantee that all data will be processed properly without failure, and in the case of crashing it will re-execute it.

Fault Tolerance

Fault tolerance systems guarantee that all data will be kept properly without failure, it means that when a cluster has been crashed, the specific task will be directed to another machine.

Durable

When the process is completed, then successful message will be sent to customer. Although it sacrifices performance, it is worthy.

Scalable

Scalability will be defined mostly on distributed systems and parallel programming. It can guarantee speed in growth on the traffic and also data.

Performance

Performance systems guarantee Throughput even in the case of encountering big big data.

No SQLs

MongoDB

Mongo was written in C++. It is document oriented. It is flexible for data processing with many queries.

Redis

Redis is open source and in-memory data structure store with cache and message broker.

Big Data Tools

Hadoop/Hbase

It is open source, non-relational database and distributed. Apache HBase is on top of Hadoop.

Cassandra

It has good scalability, fault-tolerance and also lower latency while it has perfect caching.

Data Streaming

Flink

It is either batch or real-time processing. Flink has APIs for streaming, sql-query.

Storm

Storm is real-time system, with high performance and scalability. It can process more than a million records per second per node.

Kinesis

Kinesis has real-time processing.

Kafka

Kafka is publish-subscribe messaging system and real-time processing with good performance, reliability, scalability and durability.

What is Hadoop?

Indeed, we expect two issues from all databases. Firstly, we need to store data, secondly we want to process stored data properly in a fast and accurate way. Because of arbitrary shape and large volume of big data, it is impossible to store them in traditional databases. We need to think about new one which can handle either storing or processing of big data.

Hadoop is as a revolutionary database for big data, which has the capacity to save any shape of data and process them cluster of nodes. It also reduces dramatically the cost of data maintenance. With the aid of hadoop, we can store any sort of data, for example, all of user clicks for long period. So it makes easy historical analysis. Hadoop has distributed storage and also distributed process system such as Map Reduce.

Image 6

What is Hadoop Ecosystem?

As I mentioned above, hadoop is proper for either storing unstructured databases or processing them. There is an abstract definition for Hadoop ecosystem. Saving data is on the left side with two different storage possibilities as HDFS and HBase. HBase is on top of HDFS and both have been written by Java.

Image 7

HDFS:

  1. Hadoop Distributed File System allows to store large data in distributed flat files.
  2. HDFS is good for sequential access to data.
  3. There is no random real-time read/write access to data. It is more proper for offline batch processing.

HBase:

  1. Store data I key/value pairs in columnar fashion.
  2. HBase has possibility to read/write in real time.

Hadoop was written in Java but you can also implement by R, Python, Ruby.

Spark is an open source cluster computing which is implemented in Scala and is suitable to support job iteration on distributed computation. Spark has high performance.

Hive, pig, Sqoop and mahout are data access which make query to database possible. Hive and pig are SQL-like; mahout is for machine learning.

MapReduce and Yarn are both working to process data. MapReduce is built to process data in distributed systems. MapReduce can handle all tasks such as job and task trackers, monitoring and executing as parallel. Yarn, Yet Another Resource Negotiator is MapReduce 2.0 which has better resource management and scheduling.

Sqoop is a connector to import and export data from/to external databases. It makes easy and fast transfer data in parallel way.

How Hadoop Works?

Hadoop follows master/slaves architecture model. For HDFS, the name node in master monitors and keeps tracking all of slaves which are bunch of storage cluster.

Image 8

There are two different types of jobs which do all of the magic for MapReduce processing. Map job sends query for processing various nodes in cluster. This job will be broken down to smaller tasks. Then Reduce job collects all of the output which each node has produced and combines them to one single value as final result.

Image 9

This architecture makes Hadoop an inexpensive solution which is very quick and reliable. Divide big job into smaller ones and put each task in a different node. This story might remind you about multithreading process. In multithreading, all of concurrent processes are shared with the aid of locks and semaphores, but data accessing in MapReduce is under the control of HDFS.

Practical Example

There are three text files at the below picture for practicing word count. MapReduce starts to split each file to cluster of nodes as I explained at the top of this article. At Mapping phase, each node is responsible for count word. At intermediate splitting in each node is just hommogeous word and each number of that specific word in the previous node. Then in reducing phase, each node will be summed up and collects its own result to produce a single value.

Image 10

Tensorflow on Windows - Python - CPU

Image 11

Download Anaconda Python 3.6

If you want to feel easy with a comfortable IDE and professional editor, without needing to install libraries, you can use Anaconda & Spider.

Image 12

Then open Anaconda Navigator from star and select and lunch “Spider”:

Image 13

There are some points:

  1. Python is object oriented
  2. Dynamic Typing
  3. Rich Libraries
  4. Simple to read
  5. Python is case sensitive
  6. Indent is important for Python

Install Tensorflow

  1. Open Anaconda Navigator from "Start Menu" -> select "Environments" from "left panel" -> Go to the "root" -> select "All Channels" -> search for "tensor"
  2. Select "tensorflow", but if you feel that you need to work with R for statistical computing or GPU for having quick result, so select "r-tensorflow" and "tensorflow-gpu".
  3. Then, press green "Apply".

    Image 14

  4. Then again, accept the rest of the packages which are dependencies, in the next window.

    Image 15

What is Deep Learning?

Actually, deep learning is a branch of machine learning. Machine learning includes some different types of algorithms which get a few thousands data and try to learn from them in order to predict new events in future. But, deep learning applies neural network as extended or variant shapes. Deep learning has the capacity of handling million points of data.

The most fundamental infrastructure of deep learning could be its ability to pick the best features. Indeed, deep learning summarizes data and computes the result based on compressed data. It is what is really needed in artificial intelligence, especially when we have a huge database with dramatical computation.

Deep learning has sequential layers which is inspired from neural network. These layers have nonlinear function with the duty of feature selection. Each layer has an output which will be used as input for next layers. Deep learning applications are computer vision (such as face or object recognition), speech recognition, natural language process (NLP) and cyber threat detection.

I strongly recommend you to visit and read [this article].

Hadoop Installing and Implementing Step by Step

1. Download and Install Java

Go [here] to download this version jdk1.8.0_144.

Select drive (C:\Java) as a path for installing.

2. Download and Install Hadoop

Download hadoop from [here] and put on drive (D:\). You should have something like the below picture:

Image 16

First, make a new folder and name it "data" if there isn't one.

Format

Run as administrator "Windows Command Prompt".

D:\hadoop\bin>hadoop-data-dfs - remove all

D:\hadoop\bin>hadoop namenode -format

Start

  1. D:\hadoop\sbin>start-dfs.cmd
    (wait one min)
  2. D:\hadoop\sbin>yarn-dfs.cmd

Image 17

You will see four windows:

  1. yarn-resourcemanager

    Image 18

  2. yarn-nodemanager

    Image 19

  3. namenode

    Image 20

  4. datanode

    Image 21

    So, if you saw those 4 windows, it means everything went to right.

Adjust Environment

Image 22

Image 23

Test and Run Simple and Famous Python Code as wordcount:

Create new folder in D:\hdp, then create and save below python and text file on it.

wordcount-mapper.py
Python
import sys
for line in sys.stdin:     # Input is read from STDIN and the output of this file 
                           # is written into STDOUT
    line = line.strip()    # remove leading and trailing whitespace
    words = line.split()   # split the line into words
    for word in words:   
        print( '%s\t%s' % (word, 1))   #Print all words (key) individually with the value 1
wordcount-reducer.py
Python
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:    # input comes from STDIN
    line = line.strip()    # remove leading and trailing whitespace

    word, count = line.split('\t', 1)    # parse the input we got from mapper.py 
                                         # by a tab (space)

    try:    
        count = int(count)        # convert count from string to int
    except ValueError:
        continue                  #If the count is not a number,
                                  #then discard the line by doing nothing


    if current_word == word:      #comparing the current word with the previous word 
                                  #(since they are ordered by key (word))
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print( '%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word

if current_word == word:    # do not forget to output the last word if needed!
    print( '%s\t%s' % (current_word, current_count))

Create text document as "mahsa.txt":

Hello
Hello
Hello
Hello
Hello
Good
Good

Let's run it on hadoop.

D:\hadoop\sbin>hadoop fs -mkdir -p /hdp

Image 24

D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\wordcount-mapper.py /hdp

D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\wordcount-reducer.py /hdp

D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\mahsa.txt /hdp

Image 25

D:\hadoop\sbin>hadoop fs -ls /hdp

D:\hadoop\sbin>D:\hadoop\bin\hadoop jar D:\hadoop\share\hadoop\tools\lib\
hadoop-streaming-2.3.0.jar -file /hdp/wordcount-mapper.py -mapper 
"python wordcount-mapper.py" -file /hdp/wordcount-reducer.py -reducer 
"python wordcount-reducer.py" -input /hdp/mahsa.txt -output /outputpython

Image 26

Image 27

Image 28

Image 29

Test and run simple "Hello" with tensorflow python code:

Create code on D:\hdp:

Python
# -*- coding: utf-8 -*- """ Created on Sun Apr  1 15:42:59 2018 
 @author: Mahsa """ import tensorflow as tf
 hello = tf.constant('Hello, TensorFlow!') sess = tf.Session() print(sess.run(hello))
D:\hadoop\sbin>hadoop fs -copyFromLocal D:\hdp\tensortest.py /hdp

D:\hadoop\sbin>D:\hadoop\bin\hadoop jar D:\hadoop\share\hadoop\tools\lib\
hadoop-streaming-2.3.0.jar -D mapreduce.job.reduce=0 -file /hdp/tensortest.py 
-mapper "python tensortest.py" -input /hdp/mahsa.txt -output /outputtensortest

Image 30

Image 31

D:\hadoop\sbin>hadoop fs -ls /outputtensortest

D:\hadoop\sbin>hadoop fs -cat /outputtensortest/part-00000

Image 32

Test and run simple "Digit-Recognition" with tensorflow Python code:

Create code on D:\hdp:

Python
# -*- coding: utf-8 -*-
"""
Created on Sun Apr  1 15:42:59 2018

@author: Mahsa
"""

from tensorflow.examples.tutorials.mnist import input_data

# Downloading MNIS dataset
mnist_train = input_data.read_data_sets("data/", one_hot=True)

import tensorflow as tf

batch = 100
learning_rate = 0.01
training_epochs = 10

# matrix
x = tf.placeholder(tf.float32, shape=[None, 784])
yt = tf.placeholder(tf.float32, shape=[None, 10])

# Weight
Weight = tf.Variable(tf.zeros([784, 10]))
bias = tf.Variable(tf.zeros([10]))

# model
y = tf.nn.softmax(tf.matmul(x,Weight) + bias)

# entropy
cross_ent = tf.reduce_mean(-tf.reduce_sum(yt * tf.log(y), reduction_indices=[1]))

# Prediction
correct_pred = tf.equal(tf.argmax(y,1), tf.argmax(yt,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

# Gradient Descent 
train_optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cross_ent)

with tf.Session() as sess:
    sess.run(tf.initialize_all_variables())

# Batch Processing
    for epoch in range(training_epochs):
        batch_num = int(mnist_train.train.num_examples / batch)
        for i in range(batch_num):
            batch_x, batch_y = mnist_train.train.next_batch(batch)
    
    sess.run([train_optimizer], result={x: batch_x, yt: batch_y})
    
    if epoch % 2 == 0:
        print( "Epoch: ", epoch)
        print ("Accuracy: ", accuracy.eval
              (result={x: mnist_train.test.images, yt: mnist_train.test.labels}))
        print( "Complete")
D:\hadoop\sbin>D:\hadoop\bin\hadoop jar D:\hadoop\share\hadoop\tools\lib\hadoop-streaming-2.3.0.jar -D mapreduce.job.reduce=0 -file /hdp/tensordigit.py -mapper "python tensordigit.py" -input /hdp/mahsa.txt -output /outputtensordigitt

Image 33

D:\hadoop\sbin>hadoop fs -ls /outputtensordigittt

Image 34

Image 35

Feedback

Feel free to leave any feedback on this article; it is a pleasure to see your opinions and vote about this code. If you have any questions, please do not hesitate to ask me here.

References

  1. https://databricks.com/blog/2016/01/25/deep-learning-with-apache-spark-and-tensorflow.html
  2. http://highscalability.com/blog/2012/9/11/how-big-is-a-petabyte-exabyte-zettabyte-or-a-yottabyte.html
  3. http://bigdata-madesimple.com/a-deep-dive-into-nosql-a-complete-list-of-nosql-databases/
  4. https://northconcepts.com/docs/what-is-data-pipeline/
  5. http://dataaspirant.com/2017/05/03/handwritten-digits-recognition-tensorflow-python/
  6. http://www.algoworks.com/blog/real-time-data-streaming-tools-and-technologies/

History

  • 25th April, 2018: Initial version
  • 3rd April, 2019: Article updated

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)