visit
In the first post of this series, we discussed what event streaming windowing is, and we examined in detail the structure of a windowed aggregate in Kafka Streams and Flink SQL.
KStream<String,Double> iotHeatSensorStream =
builder.stream("heat-sensor-input",
Consumed.with(stringSerde, doubleSerde));
iotHeatSensorStream.groupByKey()
.windowedBy(
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)) <1>
.advanceBy(Duration.ofSeconds(30)) <2>
)
.aggregate(() -> new IotSensorAggregation(tempThreshold),
aggregator,
Materialized.with(stringSerde, aggregationSerde))
.toStream().to("sensor-agg-output",
Produced.with(windowedSerde, aggregationSerde))
By using TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
sets the window size at one minute, the withNoGrace
means Kafka Streams will drop any out-of-order records that would have been included in the window had they arrived in order. We'll get into grace periods more in the blog post on windowing time semantics.
The .advanceBy(Duration.ofSeconds(30)
call makes this a hopping window. It creates a window that is one minute in size and advances every ten seconds.
SELECT window_start,
window_end,
device_id,
AVG(reading) AS avg_reading
FROM TABLE(HOP <1>
(TABLE device_readings, <2>
DESCRIPTOR(ts), <3>
INTERVAL '30' SECONDS, <4>
INTERVAL '1' MINUTES <5>
))
GROUP BY window_start,
window_end,
device_id
Specifying hopping windows by passing the HOP
function to the TABLE
function.
The DESCRIPTOR
is the column with the time attribute used for the window.
This first INTERVAL
is the amount of "hop" or advance of the window.
The second INTERVAL
is the size of the window.
Stepping through this illustration
KStream<String,Double> iotHeatSensorStream =
builder.stream("heat-sensor-input",
Consumed.with(stringSerde, doubleSerde));
iotHeatSensorStream.groupByKey()
.windowedBy(
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)) <1>
)
.aggregate(() -> new IotSensorAggregation(tempThreshold),
aggregator,
Materialized.with(stringSerde, aggregationSerde))
.toStream().to("sensor-agg-output",
Produced.with(windowedSerde, aggregationSerde))
Using TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
without the .advanceBy
clause automatically makes this a tumbling window. Since you didn't specify an advance, Kafka Streams will add one equal to the size.
You could add the advanceBy
clause with the same amount of time if you choose to skip the shortened version.
SELECT window_start,
window_end,
device_id,
AVG(reading) AS avg_reading
FROM TABLE(TUMBLE <1>
(TABLE device_readings, <2>
DESCRIPTOR(timestamp), <3>
INTERVAL '1' MINUTES <4>
))
GROUP BY window_start,
window_end,
device_id
Specifying tumbling windows by passing the TUMBLE
function to the TABLE
function.
The DESCRIPTOR
is the column with the time attribute used for the window.
By passing a single INTERVAL
parameter, Flink SQL will utilize this as the advance and the size.
So, from looking at this image, we could generalize hopping windows as "every <time-period of window advance> give me <aggregate> over the last <window size> period." From our examples here, it would be "every 30 seconds, give me the average temp reading over the last minute."