this is consumer code with not using subscribe method.
val consumer = KafkaConsumer<String, String>(properties)
val topics = listOf(TopicPartition("TEST", 1)
consumer.assgin(topics)
try {
do {
val records = consumer.poll(Duration.ofMillis(1000))
records.forEach {
println("result : $it")
}
} while (!records.isEmpty)
} catch (e: Exception) {
println(e.message)
} finally {
consumer.close()
}
do it check session.timeout.ms, max.pool.interval.ms or hearbeat.interval.ms? i think if kafka consumer don't use subscribe method, it don't check.
CodePudding user response:
Both subscribe() and assign() will check for session.timeout.ms,max.pool.interval.ms and other properties that you specify during consumer creation.
Difference between subscribe() and assign() is that when using
subscribe() uses
group-idin the consumer properties and helps in dynamic partition assignment and consumer group coordination
assign will manually assign a list of partitions to this consumer. and this method does not use the consumer's group management functionality (where no need of
group.id)
More information here : https://stackoverflow.com/questions/53938125/kafkaconsumer-java-api-subscribe-vs-assign#:~:text=Subscribe makes use of the,to a list of topics.
CodePudding user response:
Thanks. I tested like it.
properties[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 10000
val consumer = KafkaConsumer<String, String>(properties)
val partitions = mutableListOf<TopicPartition>().apply {
for (i in 0 until 1) {
add(TopicPartition("TEST", i))
}
}
consumer.assign(partitions)
Threed.sleep(20000L)
try {
do {
val records = consumer.poll(Duration.ofMillis(1000))
records.forEach {
println("result : $it")
}
} while (!records.isEmpty)
} catch (e: Exception) {
println(e.message)
}
but, it worked well. Did I do something wrong on the test?
