'Flink schema evolution not working for broadcast state
I am using broadcast state pattern in flink where I am trying to connect the two streams, one stream being the control stream of Rules and other stream being stream of Integers(for dummy play purpose).
I have a following Rule
class
public class Rule {
String id;
int val;
RuleType ruleType;
//Newly added field
//int val2 = 0;
public Rule() {}
public Rule(String id, int val, RuleType ruleType) {
this.id = id;
this.val = val;
this.ruleType = ruleType;
//this.val2 = val2;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getVal() {
return val;
}
public void setVal(int val) {
this.val = val;
}
public RuleType getRuleType() {
return ruleType;
}
public void setRuleType(RuleType ruleType) {
this.ruleType = ruleType;
}
//public int getVal2() {
// return val2;
//}
//public void setVal2(int val2) {
// this.val2 = val2;
//}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Rule rule = (Rule) o;
return val == rule.val && id.equals(rule.id) && ruleType == rule.ruleType;
}
@Override
public int hashCode() {
return Objects.hash(id, val, ruleType);
}
@Override
public String toString() {
return "Rule{" +
"name='" + id + '\'' +
", val=" + val +
", ruleType=" + ruleType +
'}';
}
}
This is the RuleType class
public enum RuleType {
X,
Y,
Z
}
In the BroadcastState
I am storing List<Rule> ruleList;
. I tried following steps to check if schema evolution works for this as mentioned in docs :
Start flink cluster
Submit job jar
Take a savepoint using
flink savepoint <jobId>
command.Stop the job.
Modify the code to add a
int
fieldval2
to the Rule class as shown above. Create a new jar.Try to restore the job using
flink -s <savepoint>
command.
With this the job is not able to restart because the schema evolution fails with following error :Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ... 11 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 11 at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
Can somebody help with this ? My suspicion is that the Rule class is not being by the POJO serializer for some reason, but I don't understand why ? It follows all the criteria to be a POJO.
Solution 1:[1]
Here's the answer I've found after a lot of research. Flink doc is really bad at providing examples, so this might help others going through the same problem.
I was able to create my own TypeInfoFactory for this class as follows :
public static class MyPojoTypeInfoFactoryForRule extends TypeInfoFactory<Rule> {
@Override
public TypeInformation<Rule> createTypeInfo(
Type t, Map<String, TypeInformation<?>> genericParameters) {
Map<String, TypeInformation<?>> fields =
new HashMap<String, TypeInformation<?>>() {
{
put("id", Types.STRING);
put("val", Types.INT);
put("ruleType", Types.ENUM(RuleType.class));
put("val2", Types.INT);
}
};
return Types.POJO(Rule.class, fields);
}
}
And then annotate the Rule class with this factory so that Rule class is serialized as a POJO.
Unanswered question is, how to make this better ? Could I have writter the factory just for the enum class and not the whole Rule class ?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | voidMainReturn |