We're using Kafka Streams with Spring Cloud Stream Functions. We have the typical example application which joins user clicks kstream with user regions ktable.
We know we can force custom names for internal changelog or repartition topics by using appropiate methods that accept a name for materialized store when defining our topology:
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> bifunctionktable() {
return (userClicksStream, userRegionsTable) -> userClicksStream
.leftJoin(userRegionsTable,
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null, "bifunctionktable-leftjoin"))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()).withName("bifunctionktable-groupbykey"))
.reduce((firstClicks, secondClicks) -> firstClicks secondClicks, Materialized.as("bifunctionktable-reduce"))
.toStream();
}
But for the input KTable we cannot change its state-store internal topic name and we always get this topic name: myapp-id-user-regions-STATE-STORE-0000000001-changelog
If we were fully creating our topology by code we do have builder.table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) method, but using functions... Is there any way to customize the internal topic name for the input KTable in this case?
CodePudding user response:
You can add a custom name for the incoming KTable by using the following property:
spring.cloud.stream.kafka.streams.bindings.bifunctionktable-in-1.consumer.materializedAs: <Your-custom-store-name>
This is documented in this section of the reference docs.
