In this tutorial, you use the AWS Toolkit for Pycharm to create a Lambda transformation function that is deployed to AWS CloudFormation using a Serverless Application Model (SAM) template. You then create the Kinesis Firehose stream and attach the lambda function to the stream to transform the data.
Amazon Web Services Kinesis Firehose is a service offered by Amazon for streaming large amounts of data in near real-time. Streaming data is continuously generated data that can be originated by many sources and can be sent simultaneously and in small payloads. Logs, Internet of Things (IoT) devices, and stock market data are three obvious data stream examples. Kinesis Streams Firehose manages scaling for you transparently. Firehose allows you to load streaming data into Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. You can also transform the data using a Lambda function. Firehose also allows easy encryption of data and compressing the data so that data is secure and takes less space. For more information, refer to Amazon’s introduction to Kinesis Firehose.
If you prefer watching a video introduction, the following is a good Kinesis Firehose overview.
AWS Introduction to Kinesis Firehose
Other Tutorials
Although this tutorial stands alone, you might wish to view some more straight-forward tutorials on Kinesis Firehose before continuing with this tutorial. Here, we add complexity by using Pycharm and an AWS Serverless Application Model (SAM) template to deploy a Lambda function.
The following is a good video demonstration of using Kinesis Firehose by Arpan Solanki. The example project focuses on the out of the box functionality of Kinesis Firehose and will make this tutorial easier to understand.
AWS Kinesis Firehose demo by Arpan Solanki
Tasks Performed Here
In this tutorial, you add more complexity to the more straightforward demonstrations on using Kinesis Firehose. Rather than creating the Lambda function while creating the Kinesis Stream, you create a more realistic Lambda function using Pycharm. Moreover, you deploy that function using as an AWS Serverless Application. We will perform the following tasks in this tutorial.
- Create and test a Kinesis Firehose stream.
- Create a Lambda function that applies a transformation to the stream data.
- Deploy the Lambda function using a Serverless Application Model (SAM) template.
- Modify the Kinesis Firehose stream to use the Lambda data transformer.
- Test the Kinesis Firehose stream.
- Trace and fix an error in the Lambda function.
- Redeploy the Lambda function.
- Test the Kinesis Firehose stream
Sample Project Architecture
Assume we have many locations that record the ambient temperature. We need to aggregate this data from the many different locations in almost real-time. We decide to use AWS Kinesis Firehose to stream data to an S3 bucket for further back-end processing.
Data is recorded as either fahrenheit or celsius depending upon the location sending the data. But the back-end needs the data standardized as kelvin. To transform data in a Kinesis Firehose stream, we use a Lambda transform function. The following illustrates the application’s architecture.
Tutorial application architecture
Prerequisites
This tutorial expects you to have an AWS developer account and knowledge of the AWS console. You should have PyCharm with the AWS Toolkit installed. You should also have the AWS CLI installed.
You should also have a rudimentary knowledge of S3, CloudFormation and SAM templates, Lambda functions, and of course, Python. The following links should help if you are missing prerequisites.
Kinesis Firehose
AWS Kinesis Firehose is a fully managed service.
Create Stream
- Log in to the AWS Console and select Services and then Kinesis.
- Click Get Started if it is your first time visiting Kinesis.
- Click Create delivery stream in the Firehose panel.
Create delivery stream option on Amazon Kinesis dashboard (if no defined streams)
- Name the delivery stream
temperatureStream
. - Accept the default values for the remaining settings.
- Click Next.
Create delivery stream – first step
A data producer is any application that sends data records to Kinesis Firehose. By selecting Direct PUT or other sources, you are allowing producers to write records directly to the stream.
- Accept the default setting of Disabled for Transform source records with AWS Lambda and Convert record format.
- Click Next.
Create delivery stream – second step
The Transform source records with AWS Lambda allows you to define a Lambda function. Later in this tutorial, you will change this setting and define a Lambda function. For now, leave it disabled.
- Select Amazon S3 as the Destination.
- Under the S3 destination, click Create new.
- Name the S3 bucket with a reasonable name (remember all names must be globally unique in S3). Here, I use the name
temperaturebucket123
as the bucket name and select the appropriate Region.
Create S3 bucket for stream
- Click Next.
- Accept the defaults and scroll to the Permissions section.
- Click Create new or choose to associate an IAM role to the stream.
Create new or choose IAM role for stream
- Create a role named
temperature_stream_role
(we return to this role in a moment) by accepting the defaults. - Click Allow.
- Click Next after returned to the stream creation.
Create Role
- Review the delivery stream and click Create delivery stream to create the stream.
Select newly created role by clicking temperature_stream_role
- You should be taken to the list of streams and the Status of
temperatureStream
should be …Creating.
Delivery stream console after created.
- After the stream’s status is Active, click on
temperatureStream
to be taken to the stream’s configuration page. - Click on the IAM role to return to the role settings in IAM.
- Now, we are being very lazy…you would not do this in production, but delete the attached policy and attach the
AWSLambdaFullAccess
, AmazonS3FullAccess
, and AmazonKinesisFirehoseFullAccess
roles.
Here we are granting the role too much access. In reality, you should grant the minimal access needed in a production setting.
For simplicity (not for production use), delete policy and add the following three policies to role
Test Stream
For a simple stream such as what you just developed, AWS provides an easy means of testing your data. Let’s test your data before continuing development.
- If not on the stream configuration screen, select the stream on the Kinesis dashboard to navigate to the stream’s configuration screen.
- Expand the Test with demo data section.
- Click the Start sending demo data button.
- Wait about a minute and click the Stop sending demo data button.
Test data option on stream summary on AWS console
- From the Amazon S3 destination section, click on the bucket name to navigate to the S3 bucket. Be certain to wait five minutes to give the data time to stream to the S3 bucket.
If you tire of waiting five minutes, return to the stream’s configuration and change the buffer time to a smaller interval than 300 seconds.
The Buffer interval allows configuring the time frame for buffering data.
S3 bucket link on stream summary on AWS console
- Click on the sub-folders until taken to the data file. If you do not see the top level folder, then wait five minutes and refresh the page. Remember, the data is buffered.
S3 Bucket top level folder after test data written
- Open the file and you should see the test records written to the file.
Test data written to S3 bucket by Kinesis Firehose
- Navigate to the top level folder and delete the test data. Be certain you delete the top level folder and not the bucket itself.
Delete test data by deleting top level folder
- Open a command-line terminal on your computer and enter the following
aws firehose put-record
commands.
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="99.55F"'
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="33.22C"'
> aws firehose put-record --delivery-stream-name temperatureStream --record='Data="57.99f"'
You should see something similar to the following in your command-line terminal.
AWS firehose put-record commands in command-line terminal
For details on the put-record command, refer to the AWS reference page on the command (AWS CLI Command Reference: put-record).
- Return to the AWS Console and navigate to the S3 bucket and note the data was written to the bucket. Remember to allow the records time to process by waiting five minutes.
- Rather than sending a simple string, modify the commands to send Json. Note that you escape the double-quotes.
> aws firehose put-record --delivery-stream-name temperatureStream
--record='Data="{\"station\":\"A1\",\"temp\":\"57.99f\"}"'
- Return to the AWS Console and you should see a file in the S3 bucket with data formatted as follows. Do not forget to give the record time to stream before checking the S3 bucket.
{"station":"A1","temp":"57.99f"}{"station":"A1","temp":"57.99f"}
In the sample architecture, note that you need to convert the temperature data to kelvin. To accomplish this transformation, you create a Lambda transform function for the Kinesis Firehose stream.
Lambda Function
Recall when creating the stream, you were provided the option of transforming the data.
Transform source records option
Although you left this feature disabled, the requirements dictate that you need to modify temperature readings from fahrenheit or celsius to kelvin. Kinesis firehose provides an easy way to transform data using a Lambda function. If you referred to any of the linked tutorials above, then you know that you can create and edit the Lambda function directly in the AWS console.
Here, you develop the Lambda function in a local development environment, debug the function, and then deploy the function to AWS. Here, you develop a Python Lambda function locally and deploy it to AWS using a CloudFormation SAM template.
PyCharm
Hopefully, you have installed PyCharm and the AWS Toolkit. If not, do so now. Refer to the prerequisites above for information on installing both.
- Start PyCharm.
- Create a new AWS Serverless Application named
kelvinTempConversion
.
Creating a new AWS SAM Project
- Click No if the following Create Project popup appears.
Select No to this dialog to create a project with new resources
- Open the template.yaml folder and notice the generated SAM template.
- Modify the timeout from 3 to 60 seconds (Kinesis Firehose requires a 60 second timeout).
SAM template generated by PyCharm
- Right click the hello_world folder and select Refactor | Rename to rename the folder to kelvinConversion.
- After reviewing the changes to be made, click the Do Refactor button.
Refactoring Hello World to kelvinConversion
- Change all instances of
HelloWorld
with KelvinConversion
in template.yaml. - Modify the function timeout (
Globals:Function:Timeout:
) to 60 seconds, the minimum for Kinesis Firehose. - Remove the Events section and the
KelvinConversionApi
section. These two sections are for building a public rest API. As we are developing a transformation function for our stream, neither is needed. - After modifying all instances of the hello world text, template.yaml should appear similar to the following:
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
AWS
Sample SAM Template for AWS
Globals:
Function:
Timeout: 60
Resources:
KelvinConversionFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: kelvinConversion/
Handler: app.lambda_handler
Runtime: python3.8
Outputs:
KelvinConversionFunction:
Description: "Kelvin Conversion Lambda Function ARN"
Value: !GetAtt KelvinConversionFunction.Arn
KelvinConversionFunctionIamRole:
Description: "Implicit IAM Role created for Kelvin Conversion function"
Value: !GetAtt KelvinConversionFunctionRole.Arn
- From the upper right drop down, select Edit Configurations.
- Modify the template to reflect the new folder.
- Click Ok.
Runtime configuration
- Select the dropdown item and click the green arrow to run the application.
/usr/local/bin/sam local invoke
--template /Users/jamesabrannan/PycharmProjects/
kelvinTempConversion/.aws-sam/build/template.yaml
--event "/private/var/folders/xr/j9kyhs2n3gqcc0n1mct4g3lr0000gp/T/
[Local] KelvinConversionFunction-event.json" KelvinConversionFunction
Invoking app.lambda_handler (python3.8)
Fetching lambci/lambda:python3.8 Docker container image......
Mounting /Users/jamesabrannan/PycharmProjects/kelvinTempConversion/
.aws-sam/build/KelvinConversionFunction as /var/task:ro,delegated inside runtime container
START RequestId: 1ffa20fa-486e-1827-e987-e92f16101778 Version: $LATEST
END RequestId: 1ffa20fa-486e-1827-e987-e92f16101778
REPORT RequestId: 1ffa20fa-486e-1827-e987-e92f16101778
Init Duration: 531.94 ms Duration: 14.75 ms
Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 24 MB
{"statusCode":200,"body":"{\"message\": \"hello world\"}"}
- Now that you are assured the project is configured correctly and executes locally, open app.py and replace the sample code with the following. Note that the line using the index string function is in error. This error is by design and you will fix it later in the tutorial.
import base64
import json
from decimal import Decimal
def lambda_handler(event, context):
output = []
for record in event['records'] :
print(record['recordId'])
payload = base64.b64decode(record['data']).decode('utf-8')
print(payload)
reading = json.loads(payload)
print(reading)
temp = reading['temp']
print(temp)
isfarenheit = bool(temp.upper().index('F') > 0)
kelvin = 0
if isfarenheit:
print(float(temp.upper().strip('F')))
kelvin = (float(temp.upper().strip('F')) + 459.67) * 5.0 / 9.0
else:
kelvin = float(temp.upper().strip('C')) + 273.15
print("{:.2f}".format(kelvin))
reading['temp'] = str("{:.2f}".format(kelvin))
print(reading)
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(reading).encode('UTF-8'))
}
output.append(output_record)
print('Processed {} records.'.format(len(event['records'])))
return {'records': output}
Local Testing
To test the record, you need to use an event template. There are event types you can choose, depending upon how the Lambda function is to be used.
- From Event Templates, select Kinesis Firehose.
Select Kinesis Firehose template to generate test data.
- Create the sample record
{“station”:”A1″,”temp”:”99.33F”}
and base64 encode the record. A good site to encode and decode is the base64encode.org website.
Encoding a simple Json record to Base64
- Replace the data string generated when you selected the Kinesis Firehose Event Template and replace it with the base64 encoded string.
Modify data value with the newly encoded value
- Run the application locally and you should see the returned record.
Console output from running application locally.
- Copy the data string and decode the record from base64.
Decode result from Base64 to string
- Validate the converted kelvin measurement is correct.
Note, you only tested fahrenheit. This is by design to illustrate debugging in the AWS Console. You fix this error later in this tutorial.
Deploying Serverless Application
- Right click on template.yaml and select Deploy Serverless Application from the popup menu.
Right click on template.yaml and select Deploy Serverless Application
- Select Create Stack and name the stack
kelvinTemperatureConversionStack
. - Select or create an S3 Bucket.
- Click Deploy.
- If you receive a credentials error, then you need to configure the AWS Toolkit correctly.
- At the extreme lower right of the window, click the message telling you the issue.
Error if AWS Toolkit credentials are not configured correctly
Profile settings configured for AWS Toolkit
- After fixing credentials (if applicable), then try again. A dialog window should appear informing you of the deployment progress.
- Notice that the window is using CLI Sam commands to deploy the function to AWS.
Deploying application output
Deploying application output
Verifying
After deploying, you should verify the function was deployed correctly.
Lambda Function
S3 Bucket
AWS CloudFormation
Modifying Firehose Stream
- Navigate to the
temperatureStream
configuration page. - Click Edit.
- Enable source record transformation in the Transform source records with AWS Lambda section.
- Select the Lambda function created and deployed by PyCharm.
- Click Save.
Testing Using CLI
- Open a command-line window and send several records to the stream. Be certain to escape the double-quotes, with the exception of the double quotes surrounding the data record.
> aws firehose put-record --delivery-stream-name temperatureStream
--record='Data="{\"station\":\"A1\",\"temp\":\"57.99f\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream
--record='Data="{\"station\":\"A1\",\"temp\":\"89.90F\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream
--record='Data="{\"station\":\"A1\",\"temp\":\"22.20C\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream
--record='Data="{\"station\":\"A1\",\"temp\":\"12.76C\"}"'
- After waiting five minutes, navigate to the S3 bucket and you should see a new folder entitled processing-failed.
Processing-failed folder when Kinesis Firehose fails
- Navigate down the processing-failed folder hierarchy and open the failure records.
Errors written to S3 Bucket
- The error messages are not very informative. But at least they tell you the Lambda function processing caused the error.
- Navigate to the stream and select Amazon S3 Logs.
- The log message is also not very informative.
- Navigate to the Lambda function details.
- Select the
LogStream
from the most recent invocation of the Lambda function. - The detailed log records the exact cause of the error, the index function. Unlike some languages such as Java, the Python index function returns an error if the string is not found.
Fixing Error
- Return to the
PyCharm
project to fix the error and redeploy the Lambda function to AWS.
You might notice that you can edit a function directly in the AWS Console. DO NOT EDIT! Remember, you deployed this application using SAM in CloudFormation. The correct process is to fix the function and then redeploy it using SAM.
Python implementation in the AWS Console
Data replaced with celsius value after encoding
- Modify the function to use find rather than the index function.
isfarenheit = bool(temp.upper().find('F') > 0)
Lambda function results in error due to the index function
- Run the application locally using a celsius value. As before, encode and decode and test the converted value.
Lambda function successfully ran with celsius data
- After testing, right click on template.yaml and redeploy the serverless application.
- Accept the Update Stack defaults.
Update Stack option in Deploy Serverless Application
- After clicking Deploy, a popup window informs you of the deployment progress.
Redeploying SAM application to AWS
- Navigate to the Lambda function details in the AWS Console and you should see the corrected source code.
Transformation function reflects changes made in PyCharm
- From your command-line, send several records to the stream.
> aws firehose put-record --delivery-stream-name temperatureStream
--record='Data="{\"station\":\"A1\",\"temp\":\"12.76C\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream
--record='Data="{\"station\":\"A1\",\"temp\":\"57.99f\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream
--record='Data="{\"station\":\"A1\",\"temp\":\"89.90F\"}"'
> aws firehose put-record --delivery-stream-name temperatureStream
--record='Data="{\"station\":\"A1\",\"temp\":\"22.20C\"}"'
- Navigate to the S3 bucket and you should see the transformed records.
Data streamed to S3 bucket
Summary
In this tutorial, you created a Kinesis FIrehose stream and created a Lambda transformation function. You configured the stream manually and used SAM to deploy the Lambda function. An obvious next step would be to add the creation of the Kinesis Firehose and associated bucket to the Cloudformation
template in your PysCharm
project. This tutorial was sparse on explanation, so refer to the many linked resources to understand the technologies demonstrated here better. However, this tutorial was intended to provide a variation on the numerous more straightforward Kinesis Firehose tutorials available.