Thriving.dev Learning Resources for Software Architects and Engineers
Blog Post

Kafka Streams Cassandra State Store 0.8.0 ships VersionedKeyValueStore<K, V>

Posted on Aug 27, 2023 · 4min read · intermediate

Kafka Streams Cassandra State Store 0.8.0 is out, dropping support for versioned state stores.

Meme picture calling for 'TRUE TEMPORAL STREAM-TABLE JOIN ALL THE THINGS!!!'

Recap: Versioned State Stores

KIP-889 (+ KIP-914) introduces versioned state stores in Kafka Streams.
A RocksDB based implementation was released with Kafka 3.5.0.

Victoria Xia's sums up the new feature

"KIP-889 is the first in a sequence of KIPs to introduce versioned key-value stores into Kafka Streams. Versioned key-value stores enhance stateful processing capabilities by allowing users to store multiple record versions per key, rather than only the single latest version per key as is the case for existing key-value stores today. Storing multiple record versions per key unlocks use cases such as true temporal stream-table joins: when an out-of-order record arrives on the stream-side, Kafka Streams can produce the correct join result by looking 'back in time' for the table state at the timestamp of the stream-side record. Foreign-key joins will see similar benefits, and users can also support custom use cases in their applications by running interactive queries to look up older record versions from versioned state stores, or by using them in custom processors."

in the pitch for her excellent session at Kafka Summit London 2023 that introduces versioned state stores.

VersionedKeyValueStore<K, V> Implementation

The library 'kafka-streams-cassandra-state-store' 0.8.0 ships two new store types:

  • partitionedVersionedKeyValueStore
  • globalVersionedKeyValueStore

Both are persistent implementations of VersionedKeyValueStore<Bytes, byte[]> and the two variants partitioned & global build on the same fundamentals as the non-versioned stores...

partitionedVersionedKeyValueStore

The underlying cassandra table is partitioned by the store context task partition.

It behaves exactly like the official versioned state store. All CRUD operations against this store always query by and return results for a single stream task.

globalVersionedKeyValueStore

The underlying cassandra table uses the record key + validTo as composite PRIMARY KEY (validTo as the clustering key).

Therefore, all CRUD operations against this store work from any streams task and therefore always are “global”.

Interactive Queries

With Kafka 3.5 interactive queries interfaces are not yet available for versioned key value stores. Plans exist to add this in the future.

Follow-up KIPs will be opened / are in-progress.
(asOfTImestamp 2023-08-25: KIP-960, KIP-968, KIP-969)

Usage Example

See following code snippet of declaring a KTable, materialized with a versioned store and historyRetention of 1min:

StreamsBuilder builder = new StreamsBuilder();
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();

// prices table
KTable<String, Long> prices = builder
    .stream(INPUT_TOPIC_PRICES, Consumed.with(stringSerde, longSerde))
    .toTable(Materialized.<String, Long>as(
            CassandraStores.builder(session, STORE_NAME)
                .partitionedVersionedKeyValueStore(Duration.ofMinutes(1)))
        .withCachingDisabled()
        .withLoggingDisabled()
        .withKeySerde(stringSerde)
        .withValueSerde(longSerde));

Operational Considerations for Users Who'd Like to Use Them

While versioned stores enable true temporal stream-table joins, aggregations and other operations by correctly handling out-of-order records, they also come with extra costs. Storing multiple record versions per key rather than only the single latest version requires additional interactions with the store.

Maintaining the audit per key comes with

  • extra lookups and patching of versioned point-in-time values, many of them in sequential order
  • increase in overall data volumes of the store
  • 'cleanup' operations for history retention

With the state maintained external to the streams application (Cassandra) these additional store interactions are blocking IO and therefore will directly affect the throughput.

Next Steps

  • Add interactive queries support once follow-up KIPs are delivered
  • Benchmark
  • Consider (in-memory) caching options to improve performance (ref #18)

Footnotes

At the time of writing this blog post, the latest versions of relevant libs were

  • Kafka / Streams API: 3.5.0
  • kafka-streams-cassandra-state-store: 0.8.0

References

Live Coding Series

This feature was live coded and recorded over a course of 6 parts, available on YouTube:

  • Part 1: Requirement Engineering, Analysis, Design, POC. During this session, I’m defining the new table schema, defining the different queries to lookup records, as well as the business process for use cases such as ‘get current’, ‘get asOfTimestamp’, ‘put new current record’, ‘put point-in-time record’, ‘delete point-in-time record’, ‘cleanup as per historic retention’. https://youtu.be/zuMvGdmRqfs
  • Part 2: First integration test with testcontainers. In this part, the first new integration test is implemented using JUnit5 and the testcontainers framework. The test setup had to be changed ad-hoc because no access to the store is provided with Kafka Streams 3.5.0, which took me by surprise. https://youtu.be/aXyAUc-lJR4
  • Part 3: Integration test improvements and adding more test cases. This follow-up part still focuses on integration tests (before the actual implementation), improving the test setup to allow accessing the state store and adding additional test cases to cover all versioned store interface methods and data & access edge cases. https://youtu.be/pb7SoPGdGz4
  • Part 4: 🔥 Implementation of the new feature. Next comes the highlight of this series: the actual implementation of the new feature, putting together all the gained knowledge. Designs, the prepared CQL statements and business processes. The result is a complete implementation of ‘CassandraVersionedKeyValueStore’ with a ‘partitioned’ type underlying data schema (‘global’ type to be added in a subsequent part). https://youtu.be/eaNximD1LKY
  • Part 5: Debugging, subsequent works, and making the tests pass! Having completed an initial implementation of the versioned store interface, we now run the integration tests written in part2 & part3. There was a considerable amount of debugging and logic left to be done to work out and fix all the nitty gritty details. Due to tiredness, this took quite some time to complete but ended with passing tests and a code complete state for the CassandraVersionedKeyValueStore & PartitionedCassandraVersionedKeyValueStoreRepository. 💪 https://youtu.be/XydzLX4sa00
  • Part 6: ‘Global’ type db schema based store. In this final part, we implement the ‘global’ type store and do some extra refactoring. With this, the feature is completed!! 🥳https://youtu.be/SRdLZhH2roE
CC BY-NC-SA 4.0 2022-2024 © Thriving.dev