'Apache Flink + CEP - Detect same events
I'd like to detect events that share the same property. Suppose I have a simple case class:
case class Record(name: String, value: Int)
Suppose there is the following stream:
Record("A", 1)
Record("B", 2)
Record("A", 3)
Record("C", 4)
Then I'd like to detect the double "A" record. Is this possible? I now have this:
val start: Pattern[Record, _] = myStream
.begin("first")
.followedBy("second").where(previous_record.name == _.name)
Solution 1:[1]
i think you are defining what is Event detection, have you tried this:
val start: Pattern[Record, _] = myStream
.begin("first").where(name == "A")
.followedBy("second").where(name == "A")
Update: For example:
val patternIG: Pattern[(String,String), _] = Pattern.begin[String,String)]("start").where(_.name == "Ignition").where(_.ac == "ON").next("end").where(_.name == "Door").where(_.ac == "ON")
val patternStream: PatternStream[(String,String)] = CEP.pattern(mystream, patternIG)
def selectFn(pattern : mutable.Map[String,(String,String)]): String = {
val startEvent = pattern.get("start").get
val endEvent = pattern.get("end").get
"ALERT Door Open"
}
val patternStreamSelected = patternStream.select(selectFn(_)).print()
Solution 2:[2]
You can do a keyBy(name) operation before CEP.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | Xingxiang Zhuo |