'Get nested fields from Kafka message using Apache Flink SQL

I'm trying to create a source table using Apache Flink 1.11 where I can get access to nested properties in a JSON message. I can pluck values off root properties but I'm unsure how to access nested objects.

The documentation suggests that it should be a MAP type but when I set that, I get the following error

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP

Here is my SQL

        CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

And my JSON looks something like this:

{
  "id": "message-1",
  "title": "Some Title",
  "properties": {
    "foo": "bar"
  }
}


Solution 1:[1]

You can use ROW to extract nested fields in your JSON messages. Your DDL statement would look something like:

CREATE TABLE input(
             id VARCHAR,
             title VARCHAR,
             properties ROW(`foo` VARCHAR)
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );

Solution 2:[2]

You may also try

CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP<STRING, STRING>
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

The only difference is: MAP<STRING, STRING> vs MAP

Solution 3:[3]

[2022 Update]

In release Apache Flink 1.13 there is no system built-in JSON functions. They are introduced in 1.14 version. Check this

If you are using version <1.14, then see below solution.

How can I create table with nested JSON input ?

JSON input example:

{
    "id": "message-1",
    "title": "Some Title",
    "properties": {
    "foo": "bar"
    "nested_foo":{
        "prop1" : "value1",
        "prop2" : "value2"
    }
}

create statement

CREATE TABLE input(
                id VARCHAR,
                title VARCHAR,
                properties ROW(`foo` VARCHAR, `nested_foo` ROW(`prop1` VARCHAR, `prop2` VARCHAR))
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );

How can I select nested columns?

SELECT properties.foo, properties.nested_foo.prop1 FROM input;

Note that if you output the results with

SELECT properties FROM input

You see the results in row format. The content of the column properties will be

+I[bar, +I[prop1,prop2]]

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 morsapaes
Solution 2 0xbe1
Solution 3