func (c *client) Init(conf config.Config, topic string) (err error) {
defer func() {
p := recover()
switch v := p.(type) {
case error:
err = v
}
}()
c.reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: conf.Brokers,
GroupID: conf.GroupID,
Topic: topic,
MaxWait: 1 * time.Second, // maximum time to wait for new messages
MinBytes: 1, // minimum message size
MaxBytes: 10e6, // maximum message size 1 MByte (= maximum size Kafka can handle)
RetentionTime: time.Hour * 24 * 7, // keep ConsumerGroup for 1 week
WatchPartitionChanges: true, // watch for changes to the partitions (e.g. increase of partitions)
})
if conf.TlsEnabled {
d := &kafka.Dialer{
TLS: &tls.Config{},
}
}
return err
}
Long story short: what I wanna do is adding the field Dialer: d to c.reader if TlsEnabled is true! c.reader is of type ReaderConfig which already contains Dialer field which is in my case:
d := &kafka.Dialer{
TLS: &tls.Config{},
}
CodePudding user response:
If I understand your question correctly, you want to set the Dialer field on kafka.ReaderConfig if, and only if conf.TlsEnabled is true. In that case, you should just move your if conf.TlsEnabled check before you call kafka.NewReader, and assign the kafka.ReaderConfig to a variable, like so:
rConf := kafka.ReaderConfig{
Brokers: conf.Brokers,
GroupID: conf.GroupID,
Topic: topic,
MaxWait: 1 * time.Second, // maximum time to wait for new messages
MinBytes: 1, // minimum message size
MaxBytes: 10e6, // maximum message size 1 MByte (= maximum size Kafka can handle)
RetentionTime: time.Hour * 24 * 7, // keep ConsumerGroup for 1 week
WatchPartitionChanges: true, // watch for changes to the partitions (e.g. increase of partitions)
}
if conf.TlsEnabled {
rConf.Dialer = &kafka.Dialer{
TLS: &tls.Config{},
}
}
// now assign c.reader
c.reader = kafka.NewReader(rConf)
Just a small nit-pick: in golang, acronyms and other initialisms should be in all-caps. Your config type should not have a field called TlsEnabled, but rather it should be TLSEnabled.
CodePudding user response:
You can't add a field to an existing type but you can embed the client in a custom type. The fields can then be accessed directly via the custom type
type myType struct {
something string
foo int
bar int
}
type myExtendedType struct {
myType
dialer *kafka.Dialer
}
func main() {
ext := myExtendedType{
myType: myType{
something: "some",
foo: 24,
bar: 42,
},
dialer: &kafka.Dialer{
TLS: &tls.Config{},
},
}
println(ext.something, ext.foo, ext.bar, ext.dialer)
}
