'Windowing on mapped stream is giving different results

Use Case : Map the input stream of sometype to another type and sink. Also apply the aggragator on the mapped stream to count the number of events of each event_type and sink it to another topic.

I have tried something like this -

DataStream<Event> eventStream  = env.fromSource(...)

eventStream.map((MapFunction<Event, TrafficSource>) value -> {
            TrafficSource trafficSource = new TrafficSource();
            // transform logic 
            return trafficSource;
}).sinkTo(topic-1)


eventStream.keyBy((KeySelector<Event, String>) Event::getType)
           .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
           .aggregate(new TrafficAggregator())
           .sinkTo(topic-2);
            

When I test it with 1000 events. topic-1 is having 5000 events where it is just a map from one type to another.

Am I doing something wrong here ?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source