Home > Enterprise >  When Kafka consumer don't use subscribe method, do it check session.timeout.ms or max.pool.inte
When Kafka consumer don't use subscribe method, do it check session.timeout.ms or max.pool.inte

Time:01-29

this is consumer code with not using subscribe method.

val consumer = KafkaConsumer<String, String>(properties)
val topics = listOf(TopicPartition("TEST", 1)

consumer.assgin(topics)

try {
    do {
        val records = consumer.poll(Duration.ofMillis(1000))
        records.forEach {
            println("result : $it")
        }
    } while (!records.isEmpty)
} catch (e: Exception) {
    println(e.message)
} finally {
    consumer.close()
}

do it check session.timeout.ms, max.pool.interval.ms or hearbeat.interval.ms? i think if kafka consumer don't use subscribe method, it don't check.

CodePudding user response:

Both subscribe() and assign() will check for session.timeout.ms,max.pool.interval.ms and other properties that you specify during consumer creation.

Difference between subscribe() and assign() is that when using

subscribe() uses group-id in the consumer properties and helps in dynamic partition assignment and consumer group coordination

assign will manually assign a list of partitions to this consumer. and this method does not use the consumer's group management functionality (where no need of group.id)

More information here : https://stackoverflow.com/questions/53938125/kafkaconsumer-java-api-subscribe-vs-assign#:~:text=Subscribe makes use of the,to a list of topics.

CodePudding user response:

Thanks. I tested like it.

properties[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 10000
val consumer = KafkaConsumer<String, String>(properties)
val partitions = mutableListOf<TopicPartition>().apply {
    for (i in 0 until 1) {
        add(TopicPartition("TEST", i))
    }
}
consumer.assign(partitions)
Threed.sleep(20000L)

try {
    do {
        val records = consumer.poll(Duration.ofMillis(1000))
        records.forEach {
            println("result : $it")
        }
    } while (!records.isEmpty)
} catch (e: Exception) {
    println(e.message)
}

but, it worked well. Did I do something wrong on the test?

  •  Tags:  
  • Related