Invalid Pojo Issue

  1. Invalid Pojo Type
    1. It means flink is not able to serialize your type .
    2. You need to specify corresponding serializer in en configuraion .

see : https://github.com/EsotericSoftware/kryo

Lets say You have :

Person.java

import java.util.TreeMap;

public class Persons extends TreeMap<String, Object> {

    private String id;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Persons{" +
                "id='" + id + '\'' +
                "} " + super.toString();
    }


}

Flink Streaming client

DummyClient.java

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;


public class DemoClient {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        DataStreamSource<String> personAges = env.fromElements("1:Mike:40", "2:John:20");


        // Process
        DataStream<Persons> enrichables = personAges.map(new MapFunction<String, Persons>() {
            @Override
            public Persons map(String value) throws Exception {
                String[] nameAges = value.split(":");
                Persons p = new Persons();
                p.setId(nameAges[0]);
                p.put("name", nameAges[1]);
                p.put("age", nameAges[2]);
                return p;
            }
        });


        enrichables.addSink(new SinkFunction<Persons>() {
            @Override
            public void invoke(Persons value) throws Exception {
                System.out.println("Persona Details " + value.toString());
            }
        });

        env.execute();
    }


}

In Maven pom.xml

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>1.0.0</version>
   </dependency>
  <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.0.0</version>
  </dependency>

This will throw :

Caused by: java.lang.ClassCastException: java.util.TreeMap cannot be cast to Persons

also show warning TypeExtractor:1525 - class in.dailyhunt.p13n.cis.runner.localTest.Persons is not a valid POJO type

To remove this warning you have to add follwoing code below where we have set parallelism

below line : env.setParallelism(1);
env.getConfig().addDefaultKryoSerializer(Persons.class,PersonSerializer.class);

PersonSerializer.java

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;


public class PersonSerializer extends Serializer<Persons> {
    @Override
    public void write(Kryo kryo, Output output, Persons object) {
        output.writeString(object.getId());
        kryo.writeObject(output, object, new MapSerializer());

    }

    @Override
    public Persons read(Kryo kryo, Input input, Class<Persons> type) {
        String id = input.readString();
        Persons persons = kryo.readObject(input, type, new MapSerializer());
        persons.setId(id);
        return persons;

    }
}

results matching ""

    No results matching ""