Thriving.dev Learning Resources for Software Architects and Engineers
Blog Post

Interactive Queries with 'kafka-streams-cassandra-state-store' (Part 2) `partitionedKeyValueStore`

Posted on Jul 24, 2023 · 3min read · advanced

Series: Interactive Queries with 'kafka-streams-cassandra-state-store'

Today's article covers accessing partitioned Kafka Streams Cassandra KeyValueStores via 'Interactive Queries', including a demo showing how to expose the state of your applications via a REST API.

With the latest release 0.7.0 a new feature + example have been added:

  1. Advanced, optimised, efficient, custom implementation of ReadOnlyKeyValueStore for 'Interactive Queries' with for 'partitioned' type Cassandra KeyValueStore, provided via CassandraStateStore static methods.
  2. New example 'partitioned-store-restapi' that is using the new CassandraPartitionedReadOnlyKeyValueStore. It features a REST API with endpoints for all 'Interactive Query' methods to play with hands-on.
Info

This is a follow-up post of Interactive Queries with 'kafka-streams-cassandra-state-store' (Part 1).

Part 1 covers the basics of 'Interactive Queries' and how using Cassandra State Stores renders having an 'RPC layer' unnecessary. Please note this post does not repeat any of the fundamentals already covered in part 1.

☝️The idea for this new feature shipped with 0.7.0, actually came up while writing the previous post...

Querying Cassandra Key Value Store

All Interactive Queries can be fulfilled directly from the local instance with one or multiple queries to Cassandra.

Integration Diagram for a Kafka Streams Cassandra State Store exposed via REST-API using Interactive Queries v1

At the time of writing the first post, we learned how this is done for 'globalKeyValueStore'. On the other hand accessing 'partitionedKeyValueStore' still required following the 'RPC layer' approach, because data access using the CassandraKeyValueStore is always bound and restricted to the belonging streams task (:= partition).

This constraint is now lifted through a new class CassandraPartitionedReadOnlyKeyValueStore that implements ReadOnlyKeyValueStore<K, V>.

UPDATE: Cassandra Store Type 'partitionedKeyValueStore'

The Cassandra partitionedKeyValueStore is partitioned (CQL table primary key > partition key) by the streams taskId and supports all methods of the KeyValueStore 'Interactive Queries' interface.

To get an instance of this brand-new shiny CassandraPartitionedReadOnlyKeyValueStore (which is package private) you need to use the static methods of the interface CassandraStateStore.

Here's an example:

// get a read-only store to exec interactive queries ('partitioned' type cassandra KeyValueStore)
ReadOnlyKeyValueStore<String, Long> store = CassandraStateStore.readOnlyPartitionedKeyValueStore(
        streams,                                                // streams
        "word-count",                                           // storeName
        session,                                                // session
        "kstreams_wordcount",                                   // keyspace
        true,                                                   // isCountAllEnabled
        "dml",                                                  // dmlExecutionProfile
        stringSerde,                                            // keySerde
        longSerde,                                              // valueSerde
        CassandraStateStore.DEFAULT_TABLE_NAME_FN,              // tableNameFn
        new DefaultStreamPartitioner<>(keySerde.serializer())   // partitioner
);
        
// Get the value from the store
Long value = store.get(key);

The instance can only be created when the streams app is in RUNNING state and can (+should) then be re-used!

Info

In addition, the implementation of CassandraPartitionedReadOnlyKeyValueStore requires application.server config to be set (to be able to access metadata).

The number of method parameters is not low - but each argument to be passed in is needed to construct the repo and underlying class. Unfortunately, there's some overhead/redundancy with how the store is registered with the topology. I couldn't think of a better way of doing this yet.

Demo

Source code for this demo: partitioned-store-restapi

Next Steps & Considerations

  • The behaviour for large state stores has yet to be tested.
  • Cassandra fetches data in chunks, so reading / iterating over a large number of rows should still be possible when timeouts are not breached.
  • Current implementation, for the non-single result queries (all, range, prefixScan, query), is executing the cql query for all partitions in parallel, then iterating the (result) iterators one by one in sequential order.
    • This should be correct from a consistency point of view, but for large states, would the cassandra ResultSet iterators on-hold time out before being consumed?
    • While results are fetched in chunks, with the 'parallel query' pattern, first chunks for all partition would be fetched at once and demand RAM.
    • Maybe it would be better to switch to a sequential query+processing pattern iterating over the partitions (see org.apache.kafka.streams.state.internals.CompositeKeyValueIterator). Or provide both and allow the user to choose which one to use.
    • It would be interesting to test and compare both options applied for different use cases and streams architectures.
...

This might be worth to cover in a 'Part 3' post. Issue #26 was created to keep track and potentially re-visit in the future. Contributions are welcome, please PM me on twitter!

Conclusion

The new CassandraPartitionedReadOnlyKeyValueStore is a good addition to the tool belt of 'kafka-streams-cassandra-state-store' and may proof valuable when your business logic involves 'Interactive Queries' that requires accessing the state of your entire streams app.

The library still is in an experimental phase of life (version 0.x).
Feedback, ideas, and contributions are welcome. Please reach out any time!

Footnotes

At the time of writing this blog post, the latest versions of relevant libs were

  • Kafka / Streams API: 3.5.0
  • kafka-streams-cassandra-state-store: 0.7.2

References

CC BY-NC-SA 4.0 2022-2024 © Thriving.dev