'where's socketTextStream in pyflink

I want to translate the following code into pyflink and run it in pyflink-shell.sh afterwards.

public class MapDemo {
    private static int index = 1;
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.map操作。
        DataStream<String> result = textStream.map(s -> (index++) + ".您输入的是:" + s);
        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }

But I can not found socketTextStreamin b_env,bt_env,s_env,st_env

so where's the socketTextStream in pyflink api?



Solution 1:[1]

As of Flink 1.12, out-of-the-box PyFlink appears to only support these connectors:

  • FlinkKafkaConsumer
  • FlinkKafkaProducer
  • JdbcSink
  • StreamingFileSink

See https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py.

Because socketTextStream cannot support exactly-once semantics, its use is generally discouraged, and it wasn't included in PyFlink.

Solution 2:[2]

Yes, you can use socketTextStream in pyflink, though it has not been officially supported yet. Example:

from pyflink.datastream import DataStream, StreamExecutionEnvironment

if __name__ == '__main__':
    s_env = StreamExecutionEnvironment.get_execution_environment()
    socket_stream = DataStream(s_env._j_stream_execution_environment.socketTextStream('localhost', 9999))
    socket_stream.print()
    s_env.execute('socket_stream')

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 mvpboss1004