I have a Flink application on AWS Kinesis Analytics service. I need to filter some values on a data stream based on a threshold. Also, I'm passing the threshold parameter using AWS Systems Manager Parameter Store service. For now, I got this:
- In my Main class:
val threshold: Int = ssmParameter.getParameterRequest(ssmClient, "/kinesis/threshold").toInt
val kinesis_deserialization_schema = new KinesisDeserialization[ID]
val KinesisConsumer = new FlinkKinesisConsumer[ID](
"Data-Stream",
kinesis_deserialization_schema,
consumerProps
)
val KinesisSource = env.addSource(KinesisConsumer).name(s"Kinesis Data")
val valid_data = KinesisSource
.filter(new MyFilter[ID](threshold))
.name("FilterData")
.uid("FilterData")
- Filter class:
import cl.mydata.InputData
import org.apache.flink.api.common.functions.FilterFunction
class MyFilter[ID <: InputData](
threshold: Int
) extends FilterFunction[ID] {
override def filter(value: ID): Boolean = {
value.myvalue > threshold
}
}
}
This works fine, the thing is that I need to update the threshold parameter every hour, because that value can be changed by my client.
CodePudding user response:
Perhaps you can implement the ProcessingTimeCallback interface in the MyFilter class, which supports timer operations, and you can update the threshold in the onProcessingTime function
public class MyFilter extends FilterFunction<...> implements ProcessingTimeCallback {
int threshold;
@Override
public void open(Configuration parameters) throws Exception {
scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now 3600000, this);
}
@Override
public boolean filter(IN xxx) throws Exception {
return xxx > threshold;
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
threshold = XXXX;
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now 3600000, this);
}
}
CodePudding user response:
You could turn the FilterFunction into a BroadcastProcessFunction, and broadcast new thresholds as they become available.
