Interactive Queries with 'kafka-streams-cassandra-state-store' (Part 2) `partitionedKeyValueStore`
Series: Interactive Queries with 'kafka-streams-cassandra-state-store'
Today's article covers accessing partitioned Kafka Streams Cassandra KeyValueStores via 'Interactive Queries', including a demo showing how to expose the state of your applications via a REST API.
With the latest release 0.7.0 a new feature + example have been added:
- Advanced, optimised, efficient, custom implementation of
ReadOnlyKeyValueStore
for 'Interactive Queries' with for 'partitioned' type Cassandra KeyValueStore, provided viaCassandraStateStore
static methods. - New example 'partitioned-store-restapi' that is using the new
CassandraPartitionedReadOnlyKeyValueStore
. It features a REST API with endpoints for all 'Interactive Query' methods to play with hands-on.
This is a follow-up post of Interactive Queries with 'kafka-streams-cassandra-state-store' (Part 1).
Part 1 covers the basics of 'Interactive Queries' and how using Cassandra State Stores renders having an 'RPC layer' unnecessary. Please note this post does not repeat any of the fundamentals already covered in part 1.
☝️The idea for this new feature shipped with 0.7.0, actually came up while writing the previous post...
Querying Cassandra Key Value Store
All Interactive Queries can be fulfilled directly from the local instance with one or multiple queries to Cassandra.
At the time of writing the first post, we learned how this is done for 'globalKeyValueStore'.
On the other hand accessing 'partitionedKeyValueStore' still required following the 'RPC layer' approach, because data
access using the CassandraKeyValueStore
is always bound and restricted to the belonging streams task (:= partition).
This constraint is now lifted through a new class CassandraPartitionedReadOnlyKeyValueStore
that implements ReadOnlyKeyValueStore<K, V>
.
UPDATE: Cassandra Store Type 'partitionedKeyValueStore'
The Cassandra partitionedKeyValueStore is partitioned (CQL table primary key > partition key) by the streams taskId and supports all methods of the KeyValueStore 'Interactive Queries' interface.
To get an instance of this brand-new shiny CassandraPartitionedReadOnlyKeyValueStore
(which is package private) you need to use the static methods of the interface CassandraStateStore
.
Here's an example:
// get a read-only store to exec interactive queries ('partitioned' type cassandra KeyValueStore)
ReadOnlyKeyValueStore<String, Long> store = CassandraStateStore.readOnlyPartitionedKeyValueStore(
streams, // streams
"word-count", // storeName
session, // session
"kstreams_wordcount", // keyspace
true, // isCountAllEnabled
"dml", // dmlExecutionProfile
stringSerde, // keySerde
longSerde, // valueSerde
CassandraStateStore.DEFAULT_TABLE_NAME_FN, // tableNameFn
new DefaultStreamPartitioner<>(keySerde.serializer()) // partitioner
);
// Get the value from the store
Long value = store.get(key);
The instance can only be created when the streams app is in RUNNING state and can (+should) then be re-used!
In addition, the implementation of CassandraPartitionedReadOnlyKeyValueStore
requires application.server
config to be set (to be able to access metadata).
The number of method parameters is not low - but each argument to be passed in is needed to construct the repo and underlying class. Unfortunately, there's some overhead/redundancy with how the store is registered with the topology. I couldn't think of a better way of doing this yet.
Demo
Source code for this demo: partitioned-store-restapi
Next Steps & Considerations
- The behaviour for large state stores has yet to be tested.
- Cassandra fetches data in chunks, so reading / iterating over a large number of rows should still be possible when timeouts are not breached.
- Current implementation, for the non-single result queries (all, range, prefixScan, query), is executing the cql query for all partitions in parallel, then iterating the (result) iterators one by one in sequential order.
- This should be correct from a consistency point of view, but for large states, would the cassandra
ResultSet
iterators on-hold time out before being consumed? - While results are fetched in chunks, with the 'parallel query' pattern, first chunks for all partition would be fetched at once and demand RAM.
- Maybe it would be better to switch to a sequential query+processing pattern iterating over the partitions (see
org.apache.kafka.streams.state.internals.CompositeKeyValueIterator
). Or provide both and allow the user to choose which one to use. - It would be interesting to test and compare both options applied for different use cases and streams architectures.
- This should be correct from a consistency point of view, but for large states, would the cassandra
Conclusion
The new CassandraPartitionedReadOnlyKeyValueStore
is a good addition to the tool belt of 'kafka-streams-cassandra-state-store'
and may proof valuable when your business logic involves 'Interactive Queries' that requires accessing the state of your
entire streams app.
The library still is in an experimental phase of life (version 0.x
).
Feedback, ideas, and contributions are welcome. Please reach out any time!
Footnotes
At the time of writing this blog post, the latest versions of relevant libs were
- Kafka / Streams API: 3.5.0
- kafka-streams-cassandra-state-store: 0.7.2