Stream Data with python and AWS Kinesis

I know that I promised a more general knowledge, but since I’m working on my AWS Ceritifed Developer exam, I try to write examples of the code that utilizes the services I learn about. There’s a big description of one of my projects coming before I turn it off (check it out on GitHub). For now, I’ll write about data streaming.

What is data streaming?

In general you can send data in 2 ways: buffered and streamed. Streamed means continuous reading data. It can be used for streams you know about, like on Twitch or YouTube to have live sessions. So the idea is that you produce data continuously on 1 end and consume it on the other end. This can be logs, video, audio, whatever.

What is AWS Kinesis Data Stream

According to the AWS documentation:

You can use Amazon Kinesis Data Streams to collect and process large streams of data records in real time. You can create data-processing applications, known as Kinesis Data Streams applications. A typical Kinesis Data Streams application reads data from a data stream as data records. These applications can use the Kinesis Client Library, and they can run on Amazon EC2 instances. You can send the processed records to dashboards, use them to generate alerts, dynamically change pricing and advertising strategies, or send data to a variety of other AWS services. For information about Kinesis Data Streams features and pricing, see Amazon Kinesis Data Streams.

So, a producer produces data, Kinesis acts as an intermediate layer that lets consumers consume the data, and at the other end the consumers… Consume the data. The concept is simple.

There are also shards that are kind of boxes for data, which can work independently. So if you need to process a lot of data, you can add more shards and distribute workload evenly between them.

How to use Kinesis with Python?

For this example I’ll use the boto3 library (there are other solutions that are more oriented on this single service). First, I need to define a producer:

#!/usr/bin/env python3

import time
import random

import boto3

client = boto3.client("kinesis", region_name="eu-west-1")

STREAM_NAME = "MyFirstStream"

    while True:
        data = bytes(str(random.randint(1, 100)).encode("utf-8"))
        print(f"Sending {data=}")
        response = client.put_record(StreamName=STREAM_NAME, Data=data, PartitionKey="A")
        # print(f"Received {response=}")
except KeyboardInterrupt:
    print("Finishing due to keyboard interrupt")

The data stream consists here of a few bytes from randomly-generated number. The put_record method is used to send data.

The consumer is a bit more complicated:

#!/usr/bin/env python3
import time
import boto3

STREAM_NAME = "MyFirstStream"

# import ipdb; ipdb.set_trace()
    print(f"Getting connection, iterator and shit...")
    client = boto3.client("kinesis", region_name="eu-west-1")
    stream = client.describe_stream(StreamName=STREAM_NAME)
    shard_id = stream["StreamDescription"]["Shards"][0]["ShardId"]
    print(f"Got {shard_id=}")
    iterator = client.get_shard_iterator(
    print(f"Reading data...")
    response = client.get_records(ShardIterator=iterator, Limit=1)
    while "NextShardIterator" in response:
        data = response["Records"]
        if len(data) < 1:
            print("No data received")
            data = data[0]["Data"]
            print(f"Received {data=}")
        response = client.get_records(ShardIterator=response["NextShardIterator"], Limit=1)

except KeyboardInterrupt:
    print("Finishing due to keyboard interrupt")

There’s more happening here. From the client we get stream name, which can give us possible shards (in my case I use provisioned throughput, so I know I only have 1 of them). Then we get iterator to go through the data. In the while loop we extract the data we’ve sent. The result is following:

Kinesis Data Streaming Python example working: consumer and producer share some data.

Read more

This Stack Overflow question shows how to build a solution for actual data streaming.