visit
Flink provides fault-tolerance by using a mechanism called checkpointing. It periodically takes a snapshot of all the stateful operators/functions of your programs and stores them in a highly durable store such as HDFS.
Checkpointing allows the Flink program to resume from this snapshot. This is helpful in the cases of failures due to some error such as a simple exception not handled or a loss of data node in your YARN/Mesos/k8s cluster.
public class TestCheckpointJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties kafkaConsumerProperties = new Properties();
kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaConsumerProperties.setProperty("group.id", "test_group_id");
ObjectMapper objectMapper = new ObjectMapper();
FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<>("test_topic", new SimpleStringSchema(), kafkaConsumerProperties);
DataStream<String> kafkaSource = env.addSource(kafkaConsumer010).name("kafka_source").uid("kafka_source");
DataStream<TestData> aggregatedStream = kafkaSource
.map(row -> objectMapper.readValue(row, TestData.class))
.keyBy(TestData::getKey)
.timeWindow(Time.hours(1))
.reduce((rowA, rowB) -> {
TestData result = new TestData();
result.setKey(rowA.getKey());
result.setValue(rowA.getValue() + rowB.getValue());
result.setCreatedAt(System.currentTimeMillis());
return result;
}).name("aggregate_stream").uid("aggregate_stream");
DataStream<LabeledTestData> labeledTestDataDataStream = aggregatedStream.keyBy(TestData::getKey).flatMap(new ClassifyData()).name("classify_data").uid("classify_data");
labeledTestDataDataStream.map(row -> objectMapper.writeValueAsString(row)).print();
env.execute();
}
}
class ClassifyData extends RichFlatMapFunction<TestData, LabeledTestData>{
ValueState<Integer> threshold;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class));
}
@Override
public void flatMap(TestData testData, Collector<LabeledTestData> collector) throws Exception {
LabeledTestData labeledTestData = new LabeledTestData();
labeledTestData.setKey(testData.getKey());
labeledTestData.setValue(testData.getValue());
labeledTestData.setCreatedAt(testData.getCreatedAt());
String label = "UNCLASSIFIED";
if(threshold.value() != null){
label = (testData.getValue() > threshold.value()) ? "L1" : "L2";
}
labeledTestData.setLabel(label);
collector.collect(labeledTestData);
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.11</artifactId>
<version>1.9.0</version>
</dependency>
class ConfigBootstrapper extends KeyedStateBootstrapFunction<String, TestConfig> {
ValueState<Integer> threshold;
@Override
public void open(Configuration parameters) throws Exception {
threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class));
}
@Override
public void processElement(TestConfig testConfig, Context context) throws Exception {
threshold.update(testConfig.getThresholdValue());
}
}
BootstrapTransformation<TestConfig> getConfigTransformation(ExecutionEnvironment executionEnvironment){
TestConfig testConfig = new TestConfig();
testConfig.setKey("global");
testConfig.setThresholdValue(10);
DataSet<TestConfig> configDataSet = executionEnvironment.fromElements(testConfig);
BootstrapTransformation<TestConfig> transformation = OperatorTransformation
.bootstrapWith(configDataSet)
.keyBy(TestConfig::getKey)
.transform(new ConfigBootstrapper());
return transformation;
}
public class TestCheckpointJob {
public static void main(String[] args) throws Exception {
bootstrapConfig();
//Rest same as previous code
}
}
static void bootstrapConfig() throws IOException {
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint existingSavepoint = Savepoint.load(executionEnvironment, "oldSavepointPath", new MemoryStateBackend());
BootstrapTransformation<TestConfig> configTransformation = getConfigTransformation(executionEnvironment);
String newSavepointPath = "newSavepointPath";
existingSavepoint.withOperator("classify_data", configTransformation).write(newSavepointPath);
}
bin/flink run -s newSavepointPath test-checkpoint.jar
You can even create a new Savepoint instead of updating the old one. For that, you need to do Savepoint.create() instead of Savepoint.load()
Flink’s State Processor API was one of the most requested features and now it’s finally here. The API is available only in 1.9.0 and above versions.You can explore the whole API in .Connect with me on or or drop a mail to