I love the Amazon Kinesis real-time data processing service.  The possibilities are endless when thinking about the products that could be created around it.  However, it can be a pain to use. There are streams that contain shards that have records in them.  You need iterators that expire in short amounts of time to get the data, and you must make sure to not make too many requests in a small amount of time or Amazon’s API will freak out.  That’s alot to keep in mind when writing a robust piece of code to read from one of these streams.Amazon Kinesis Workflow Diagram

 

Recently, at InfoTrust, I had the pleasure of writing such a piece of code in Python using the boto library.  Our use case was to read data from a stream that contained only a single shard for a set period of time. The code is written as a Python generator.  This makes using it much simpler, keeping your code much more readable.

First, the code uses boto to create a connection to Kinesis.  It then gets all information about that stream.  This information is used to get the shards in the stream from which we extract a list containing just the IDs of these shards.  

Next, we get an iterator that points at the first piece of data in the stream.  Notice that for our use case, since we only have a single shard in the stream, we only get an iterator for the shard with an ID that is first in the list.  This part could easily be extended to accommodate more shards without changing the code that uses it.

We calculate when the end time will be so that we know when to stop looping over data and exit the generator.  We enter a loop that will never end.  This is the best way to emulate a traditional do-while loop in Python.  We get the next set of records using the shard iterator we obtained earlier.  Then we check to make sure that the end time has not passed and that we received data.  The loop will exit if either is true.  We then iterate over each of the records in the response and yield them to the calling piece of code.  It automatically dumped it from JSON since our data was stored as JSON strings.  The last step in the loop is to get the next iterator that was returned in the response and set it to the current iterator so that it will be used in the current request.

There is also some error handling provided since a few exceptions can be thrown by the boto module.  First is a ProvisionedThroughputExceededException, which means we have been hitting the Kinesis API too much, and we have been rate limited.  This rate limiting is based on the number of hits per second.  The code simply catches the error and sleeps for half a second before continuing the loop.  The other exception thrown is an ExpiredIteratorException.  Shard iterators expire after five minutes.  If the data cannot be processed within five minutes, then the generator will attempt to use an expired iterator.  Boto will throw this exception, which is easily caught.  We then make a request to get a new iterator and continue on with processing data.

Reading from Kinesis can be strange.  Hopefully this code above gives you what you need to make this part of your application much easier to create.

Join thousands of subscribers.

No sales pitches, no spam, and an easy one-click unsubscribe.

About The Author: InfoTrust is a web analytics company and a Google partner. Visit us at http://infotrustllc.com to learn more.

  • sb

    Thank you. Do you happen to have the source code? “Hopefully this code above gives you what you need to make this part of your application much easier to create.” point to code above but can’t see any code.

    • Andy

      Hi sb. It looks like the code got accidentally got stripped out when we migrated our blog. You can access the source here: https://gist.github.com/searsaw/688a2dd1a7e288102178

      • sb

        @disqus_L0Jve8WZLQ:disqus Thank you. great example for consuming data from kinesis.