Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Hosted-services / AWS

Sending Data to Kinesis Firehose Using Python

5.00/5 (1 vote)
7 Mar 2020CPOL5 min read 14.7K  
In this tutorial, you create a simple Python client that sends records to an AWS Kinesis Firehose stream.
In this tutorial, you write a simple Python client that sends data to the stream created in the last tutorial. Specifically, you use the put-record and put-record-batch functions to send individual records and then batched records respectively.

In this tutorial, you create a simple Python client that sends records to an AWS Kinesis Firehose stream. The stream was created in a previous tutorial Using the AWS Toolkit for PyCharm to Create and Deploy a Kinesis Firehose Stream with a Lambda Transformation Function. You must complete that tutorial prior to this tutorial.

Here, you use the put_record and the put_record_batch functions to write data to Firehose. If after completing the previous tutorial, you wish to refer to more information on using Python with AWS, refer to the following information sources:

In the previous tutorial, you created an AWS Firehose Stream for streaming data to an S3 bucket. Moreover, you wrote a Lambda function that transformed temperature data from celsius or fahrenheit to kelvin. You also sent individual records to the stream using the Command Line Interface (CLI) and its firehose put-record function.

In this tutorial, you write a simple Python client that sends data to the stream created in the last tutorial. Specifically, you use the put-record and put-record-batch functions to send individual records and then batched records respectively.

Creating Sample Data

  • Navigate to mockaroo.com and create a free account.
  • Click Schemas to create a new schema.
  • Name the schema, here I named it SampleTempDataForTutorial.

Image 1

Creating a schema in Mockaroo
  • Create a field named station and assign its type as State (abbrev).
  • Create a field named temp and assign it as Number with a min of one, max of 100, and two decimals.

Image 2

Creating the SampleTempDataForTutorial data in Mockaroo:
  • Click the fx button and create the formula as follows:
Python
if random(0,10) == 10 then this = this + 1000 end
if this > random(0,100) then format(this,2) + 'F' 
elseif this < random(0,100) then format(this,2) + 'f'
elseif this > random(0,75) then format(this,2) + 'c'
else format(this,2) + 'C' end

The formula randomly generates temperatures and randomly assigns an F, f, C, or c postfix. Note that it also generates some invalid temperatures of over 1000 degrees. You will use this aberrant data in a future tutorial illustrating Kinesis Analytics.

Image 3

Creating a formula in Mockaroo for a field
  • Click Apply to return to the main screen.
  • Enter 1000 for rows, select Json as the format, and check the array checkbox.
  • Click download to download the data.
Python
[{"station":"OH","temp":"26.39f"},
{"station":"WA","temp":"40.00F"},
{"station":"TX","temp":"15.01F"},
{"station":"NC","temp":"32.36f"},
{"station":"WA","temp":"62.86F"},
{"station":"NC","temp":"49.43f"},
{"station":"MD","temp":"2.30f"},
<< snip >>

You should have a file named SampleTempDataForTutorial.json that contains 1,000 records in Json format. Be certain the data is an array, beginning and ending with square-brackets.

Python Client (PsyCharm)

Here, I assume you use PsyCharm, you can use whatever IDE you wish or the Python interactive interpreter if you wish. Let’s first use the put-record command to write records individually to Firehose and then the put-record-batch command to batch the records written to Firehose.

Writing Records Individually (put_record)

  • Start PsyCharm. I assume you have already installed the AWS Toolkit and configured your credentials. Note, here we are using your default developer credentials.
In production software, you should use appropriate roles and a credentials provider, do not rely upon a built-in AWS account as you do here.
  • Create a new Pure Python application named StreamingDataClient.

Image 4

Create a new Pure Python project in PsyCharm
  • Create a new file named FireHoseClient.py and import Boto3 and json.
  • Create a new session using the AWS profile you assigned for development.
  • Create a new firehose client from the session.

Image 5

Creating a session using default AWS credentials
  • Write the following code:
    Python
    import json
    import boto3
    
    session = boto3.Session(profile_name='default')
    temperatureClient = session.client('firehose')
    
    with open("sampleTempDataForTutorial.json") as json_file:
        observations = json.load(json_file)
        for observation in observations:
            print(observation)
            response = temperatureClient.put_record(
               DeliveryStreamName='temperatureStream',
               Record={
                    'Data': json.dumps(observation)
                }
            )
            print(response)

In the preceding code, you open the file as a json and load it into the observations variable. You then loop through each observation and send the record to Firehose using the put_record method. Note that you output the record from json when adding the data to the Record.

You should see the records and the response scroll through the Python Console.

Image 6

  • Navigate to the AWS Console and then to the S3 bucket.

Image 7

Data created in S3
  • You should see the records written to the bucket.
  • Open the file to ensure the records were transformed to kelvin.

Image 8

Data converted to kelvin in S3

Batch Writing Records (put_record_batch)

Writing records individually are sufficient if your client generates data in rapid succession. However, you can also batch data to write at once to Firehose using the put-record-batch method.

  • Replace the code with the following code:
    Python
    import json
    import boto3
    
    session = boto3.Session(profile_name='default')
    temperatureClient = session.client('firehose')
    records = []
    
    with open("sampleTempDataForTutorial.json") as json_file:
        observations = json.load(json_file)
        count = 1
        for observation in observations:
            if count % 500 == 0:
                response = temperatureClient.put_record_batch(
                    DeliveryStreamName='temperatureStream',
                    Records= records
                )
                print(response)
                print(len(records))
                records.clear()
            record = {
                "Data": json.dumps(observation)
            }
            records.append(record)
            count = count + 1
    
        if len(records) > 0:
            print(len(records))
            response = temperatureClient.put_record_batch(
                    DeliveryStreamName='temperatureStream',
                    Records= records
                )
            print(response)

In the preceding code, you create a list named records. You also define a counter named count and initialize it to one. The code loops through the observations. Each observation is written to a record and the count is incremented. When the count is an increment of 500, the records are then written to Firehose. Note that Firehose allows a maximum batch size of 500 records. After looping through all observations, any remaining records are written to Firehose.

The data is written to Firehose using the put_record_batch method. Instead of writing one record, you write list of records to Firehose.

  • Before executing the code, add three more records to the Json data file.
  • Run the code and you should see output similar to the following in the Python Console.

Image 9

Python Console output
  • Navigate to the S3 bucket in the AWS Console and you should see the dataset written to the bucket.

Image 10

Data written to S3 bucket
  • Open the records and ensure the data was converted to kelvin.

Image 11

Data converted to kelvin in S3 bucket

Summary

In this tutorial, you wrote a simple Python client that wrote records individually to Firehose. You then wrote a simple Python client that batched the records and wrote the records as a batch to Firehose. Refer to the Python documentation for more information on both commands. In the next tutorial, you will create a Kinesis Analytics Application to perform some analysis to the firehose data stream.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)