Invalid Pojo Issue
- Invalid Pojo Type
- It means flink is not able to serialize your type .
- You need to specify corresponding serializer in en configuraion .
see : https://github.com/EsotericSoftware/kryo
Lets say You have :
Person.java
import java.util.TreeMap;
public class Persons extends TreeMap<String, Object> {
private String id;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
@Override
public String toString() {
return "Persons{" +
"id='" + id + '\'' +
"} " + super.toString();
}
}
Flink Streaming client
DummyClient.java
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class DemoClient {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> personAges = env.fromElements("1:Mike:40", "2:John:20");
// Process
DataStream<Persons> enrichables = personAges.map(new MapFunction<String, Persons>() {
@Override
public Persons map(String value) throws Exception {
String[] nameAges = value.split(":");
Persons p = new Persons();
p.setId(nameAges[0]);
p.put("name", nameAges[1]);
p.put("age", nameAges[2]);
return p;
}
});
enrichables.addSink(new SinkFunction<Persons>() {
@Override
public void invoke(Persons value) throws Exception {
System.out.println("Persona Details " + value.toString());
}
});
env.execute();
}
}
In Maven pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.0.0</version>
</dependency>
This will throw :
Caused by: java.lang.ClassCastException: java.util.TreeMap cannot be cast to Persons
also show warning TypeExtractor:1525 - class in.dailyhunt.p13n.cis.runner.localTest.Persons is not a valid POJO type
To remove this warning you have to add follwoing code below where we have set parallelism
below line : env.setParallelism(1);
env.getConfig().addDefaultKryoSerializer(Persons.class,PersonSerializer.class);
PersonSerializer.java
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;
public class PersonSerializer extends Serializer<Persons> {
@Override
public void write(Kryo kryo, Output output, Persons object) {
output.writeString(object.getId());
kryo.writeObject(output, object, new MapSerializer());
}
@Override
public Persons read(Kryo kryo, Input input, Class<Persons> type) {
String id = input.readString();
Persons persons = kryo.readObject(input, type, new MapSerializer());
persons.setId(id);
return persons;
}
}