'Get nested fields from Kafka message using Apache Flink SQL
I'm trying to create a source table using Apache Flink 1.11 where I can get access to nested properties in a JSON message. I can pluck values off root properties but I'm unsure how to access nested objects.
The documentation suggests that it should be a MAP
type but when I set that, I get the following error
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
Here is my SQL
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
And my JSON looks something like this:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
}
}
Solution 1:[1]
You can use ROW
to extract nested fields in your JSON messages. Your DDL statement would look something like:
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
Solution 2:[2]
You may also try
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP<STRING, STRING>
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
The only difference is: MAP<STRING, STRING>
vs MAP
Solution 3:[3]
[2022 Update]
In release Apache Flink 1.13 there is no system built-in JSON functions. They are introduced in 1.14 version. Check this
If you are using version <1.14, then see below solution.
How can I create table with nested JSON input ?
JSON input example:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
"nested_foo":{
"prop1" : "value1",
"prop2" : "value2"
}
}
create statement
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR, `nested_foo` ROW(`prop1` VARCHAR, `prop2` VARCHAR))
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
How can I select nested columns?
SELECT properties.foo, properties.nested_foo.prop1 FROM input;
Note that if you output the results with
SELECT properties FROM input
You see the results in row format. The content of the column properties
will be
+I[bar, +I[prop1,prop2]]
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | morsapaes |
Solution 2 | 0xbe1 |
Solution 3 |