'Flink schema evolution not working for broadcast state

I am using broadcast state pattern in flink where I am trying to connect the two streams, one stream being the control stream of Rules and other stream being stream of Integers(for dummy play purpose).
I have a following Rule class

public class Rule {
String id;
int val;
RuleType ruleType;
//Newly added field
//int val2 = 0;

public Rule() {}

public Rule(String id, int val, RuleType ruleType) {
    this.id = id;
    this.val = val;
    this.ruleType = ruleType;
    //this.val2 = val2;
}

public String getId() {
    return id;
}

public void setId(String id) {
    this.id = id;
}

public int getVal() {
    return val;
}

public void setVal(int val) {
    this.val = val;
}

public RuleType getRuleType() {
    return ruleType;
}

public void setRuleType(RuleType ruleType) {
    this.ruleType = ruleType;
}

//public int getVal2() {
//    return val2;
//}

//public void setVal2(int val2) {
//    this.val2 = val2;
//}

@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    Rule rule = (Rule) o;
    return val == rule.val && id.equals(rule.id) && ruleType == rule.ruleType;
}

@Override
public int hashCode() {
    return Objects.hash(id, val, ruleType);
}

@Override
public String toString() {
    return "Rule{" +
            "name='" + id + '\'' +
            ", val=" + val +
            ", ruleType=" + ruleType +
            '}';
}

}

This is the RuleType class

public enum RuleType {
X,
Y,
Z
}

In the BroadcastState I am storing List<Rule> ruleList;. I tried following steps to check if schema evolution works for this as mentioned in docs :

  1. Start flink cluster

  2. Submit job jar

  3. Take a savepoint using flink savepoint <jobId> command.

  4. Stop the job.

  5. Modify the code to add a int field val2 to the Rule class as shown above. Create a new jar.

  6. Try to restore the job using flink -s <savepoint> command.
    With this the job is not able to restart because the schema evolution fails with following error :

    Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ... 11 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 11 at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)

Can somebody help with this ? My suspicion is that the Rule class is not being by the POJO serializer for some reason, but I don't understand why ? It follows all the criteria to be a POJO.



Solution 1:[1]

Here's the answer I've found after a lot of research. Flink doc is really bad at providing examples, so this might help others going through the same problem.
I was able to create my own TypeInfoFactory for this class as follows :

public static class MyPojoTypeInfoFactoryForRule extends TypeInfoFactory<Rule> {
    @Override
    public TypeInformation<Rule> createTypeInfo(
        Type t, Map<String, TypeInformation<?>> genericParameters) {
      Map<String, TypeInformation<?>> fields =
          new HashMap<String, TypeInformation<?>>() {
            {
              put("id", Types.STRING);
              put("val", Types.INT);
              put("ruleType", Types.ENUM(RuleType.class));
              put("val2", Types.INT);
            }
          };
      return Types.POJO(Rule.class, fields);
    }
  }

And then annotate the Rule class with this factory so that Rule class is serialized as a POJO.
Unanswered question is, how to make this better ? Could I have writter the factory just for the enum class and not the whole Rule class ?

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 voidMainReturn