Thriving.dev Learning Resources for Software Architects and Engineers
Blog Post

Interactive Queries with 'kafka-streams-cassandra-state-store' (Part 1) `globalKeyValueStore`

Posted on Jul 14, 2023 · 5min read · advanced

Series: Interactive Queries with 'kafka-streams-cassandra-state-store'

Today's article covers accessing Kafka Streams Cassandra State Stores via 'Interactive Queries' and how to expose the state of your applications via a REST API.

While interactive queries had been supported since the initial release 0.1.0, the recent version 0.5.0 ships with convenient helper methods to get hold of a correctly set up read-only store facade to query your state.

Integration Diagram for a Kafka Streams State Store exposed via REST-API using Interactive Queries v1

While for the regular Kafka Streams Stores (RocksDB & InMemory), state always is partitioned and local to the individual running application instance, for Cassandra Stores all state resides in a common external database (cluster) that all instances can access.

This means that for 'global' Cassandra State Stores there's no need for an RPC layer proxying and fanning out requests to all instances of your streams application. Stay tuned for more details...

Basics Recap

Kafka Streams 'Interactive Queries' is a feature in Apache Kafka Streams that allows applications to query the state of a stream processor. It enables real-time interactive access to the internal state stores of a Kafka Streams application, allowing applications to perform dynamic lookups.

'kafka-streams-cassandra-state-store' is a Kafka Streams State Store implementation that persists data to Apache Cassandra. It's a 'drop-in' replacement for the official Kafka Streams state store solutions, notably RocksDB (default) and InMemory. @see the blog post introducing the library.

Querying Local State Stores for an App Instance

Interface ReadOnlyKeyValueStore

The process of accessing local stores from the streams instance is a well known and well documented pattern. Below the most basic example is provided, for mor information, please refer to the Confluent docs.

// get the key-value store named "word-counts"
ReadOnlyKeyValueStore<String, Long> store = streams.store(
        StoreQueryParameters.fromNameAndType("word-counts", QueryableStoreTypes.keyValueStore()));

// get value by key
System.out.println("count for 'hello': " + store.get("hello"));

The ReadOnlyKeyValueStore interface provides following methods:

public interface ReadOnlyKeyValueStore<K, V> {
    V get(K key);
    KeyValueIterator<K, V> range(K from, K to);
    KeyValueIterator<K, V> reverseRange(K from, K to);
    KeyValueIterator<K, V> all();
    KeyValueIterator<K, V> reverseAll();
    <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(
            P prefix, PS prefixKeySerializer
    );
    long approximateNumEntries();
}

Full source code incl. Javadoc: https://github.com/apache/kafka/.../ReadOnlyKeyValueStore.java

Querying Remote State Stores for the Entire App

In order to make the application's complete state available, it is necessary to access the app's remote states, including those operating on different instances.

Confluent named this pattern 'Adding an RPC layer'. RPC endpoints are configured in clients via application.server property (=> unique host:port pair), synchronised, and made available to all instances via StreamsMetadata.

Integration Diagram for a Kafka Streams State Store exposed via REST-API using Interactive Queries v1

To get the value by key, the host can be read from streams metadata and looked up from local state or RPC.
Simplified logic as follows:

// We first find the one app instance that has the state for given `key`
StreamsMetadata metadata = streams.metadataForKey("word-count", key, Serdes.String().serializer());

// check if key resides on the own instance 
if (metadata.hostInfo().equals(this.hostInfo)) {
    // access local store
    return store.get(key);
} else {
    // from the metadata we can read the host + port and construct
    String baseUrl = "https://" + metadata.host() + ":" + metadata.port() + "/api/word-count/" + key;
    // make RPC...
    {...}
}
Info

The number of remote calls depends on the operation (query).
While for a get(K key) it's sufficient to make a single call, for a range(K from, K to) query all instances have to be involved and the results combined.

Tip

To improve availability and the chance to have data required in the local state, standby tasks may be added.

⚠ It's important to keep in mind that standby tasks are processed with lower priority and consistently lag behind the original task/instance.

Querying Cassandra Key Value Store

Cassandra Store Type 'globalKeyValueStore'

For the 'global' type store of 'kafka-streams-cassandra-state-store' there's no need for any RPC between instances. (☝️though the query to Cassandra of course is network IO)

Info

The 'global' type store is globally accessible from all instances (no matter the store's context), but a regular KV Store. It's NOT a Kafka Streams Global Store (/GlobalKTable). Please refer to the docs for more details.

All Interactive Queries can be fulfilled directly from the local instance with a single query to Cassandra.

Integration Diagram for a Kafka Streams Cassandra State Store exposed via REST-API using Interactive Queries v1

To safely use Interactive Queries with the 'globalKeyValueStore' there are a few significant details to pay attention to.

🧐 For querying the 'global store', the WrappingStoreProvider must be restricted to a single (assigned) partition. The KafkaStreams instance returns a CompositeReadOnlyKeyValueStore that holds the WrappingStoreProvider, wrapping all assigned tasks' stores.

Warning

Without the correct StoreQueryParameters the same query is executed multiple times (for all local assigned tasks) and combines the same results multiple times 🐛.

After taking everything into account, the outcome is the code snippet presented below:

// get the first active task partition for the first streams thread
final int firstActiveTaskPartition = streams.metadataForLocalThreads()
        .stream().findFirst()
        .orElseThrow(() -> new RuntimeException("no streams threads found"))
        .activeTasks()
        .stream().findFirst()
        .orElseThrow(() -> new RuntimeException("no active task found"))
        .taskId().partition();

// get a WrappingStoreProvider 'withPartition' -> query only a single store (the first active task)!
// (WrappingStoreProvider otherwise iterates over all storeProviders for all assigned tasks and repeatedly query Cassandra)
ReadOnlyKeyValueStore<X, Y> store = streams.store(fromNameAndType(storeName, QueryableStoreTypes.<X, Y>keyValueStore())
        .enableStaleStores() // should be unnecessary -> CassandraStateStore should always be used with logging disabled and without standby tasks...
        .withPartition(firstActiveTaskPartition));

The interface CassandraStateStore that was released with 0.5.0 provides convenient static helper methods to get properly configured stores for interactive queries:

// get a store to exec interactive queries
ReadOnlyKeyValueStore<String, Long> store = CassandraStateStore.readOnlyGlobalKeyValueStore(streams, STORE_NAME);
        
// Get the value from the store
Long value = store.get(key);
Info

Please take note that for 'globalKeyValueStore' store type, not all operations are supported.
Read the the documentation.

Cassandra Store Type 'partitionedKeyValueStore'

The Cassandra partitionedKeyValueStore is partitioned (CQL table primary key > partition key) by the streams taskId to support range and prefixScan queries.

In theory, no RPC would be required since each instance still can access all table rows with the taskId as CQL query condition - but results for interactive queries other than get(K key) still need to be queried separately for all tasks and the results combined/merged.

Currently, no custom ReadOnlyKeyValueStore implementation provided to do that. Just like for RocksDB/InMemory state stores the 'RPC layer' pattern has to be utilised for this store type.

Info

Actually, this idea came up while writing this very post... issue #23 was created to implement this feature. Contributions are welcome, please PM me on twitter!

Assessment & Conclusion

Kafka Streams Interactive Queries are very powerful and allow for many use cases, such as advanced queries to your store from the Processor API, or exposing the state via REST API for access from outside the application.

An 'RPC layer' allows access to the complete state, which is distributed across all instances of your application. Depending on the operation, remote calls to a single or all instances is needed.

With 'global' Cassandra State Stores the RPC layer can be avoided by reading all data directly from Cassandra. The larger the state and scale (no. of instances) of your application, the more benefits Cassandra may be able to provide.

Footnotes

At the time of writing this blog post, the latest versions of relevant libs were

  • Kafka / Streams API: 3.5.0
  • kafka-streams-cassandra-state-store: 0.6.0

UPDATE 24/07/2023

Follow-up Post was Published -> Interactive Queries with 'kafka-streams-cassandra-state-store' (Part 2) for 'partitionedKeyValueStore'

References

CC BY-NC-SA 4.0 2022-2024 © Thriving.dev