'FLINK1.8 , ERROR INFO : state is larger than the maximum permitted memory-backed state. use RocksDBStateBackend not useful

I'm running hibench's flinkbench with FLINK1.8

Stateful Wordcount (wordcount)

This workload counts words cumulatively received from Kafka every few seconds. This tests the stateful operator performance and Checkpoint/Acker cost in the streaming frameworks.

RUN COMMAND : flink run -yjm 1gb -ytm 1gb -ys 1 -p 1 -m yarn-cluster -c xxxxx

ERROR INFO :

Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=1766987 , maxSize=51200 . Consider using a different state backend, like the File System State backend.
at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64)
at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:145)
at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:126)
at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:136)
at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1618)
at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1843)
at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:419)
at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:315)
at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:258)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) 

I had use RocksDBStateBackend. and the rate of data transmission is as low as 100 per second. but none of them seem to work.

SOURCE CODE:

public class WordCount extends StreamBase {

  @Override
  public void processStream(final FlinkBenchConfig config) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setBufferTimeout(config.bufferTimeout);
    env.enableCheckpointing(config.checkpointDuration);
    **//use RocksDBStateBackend**
    env.setStateBackend(new RocksDBStateBackend("hdfs:///fink-checkpoints", true));
    createDataStream(config);
    DataStream<Tuple2<String, String>> dataStream = env.addSource(getDataStream());
    dataStream
        .map(new MapFunction<Tuple2<String, String>, Tuple2<String, Tuple2<String, Integer>>>() {
          @Override
          public Tuple2<String, Tuple2<String, Integer>> map(Tuple2<String, String> input) throws Exception {
            String ip = UserVisitParser.parse(input.f1).getIp();
            //map record to <browser, <timeStamp, 1>> type
            return new Tuple2<String, Tuple2<String, Integer>>(ip, new Tuple2<String, Integer>(input.f0, 1));
          }
        })
        .keyBy(0)
        .map(new RichMapFunction<Tuple2<String, Tuple2<String, Integer>>, Tuple2<String, Tuple2<String, Integer>>>() {
          private transient ValueState<Integer> sum;

          @Override
          public Tuple2<String, Tuple2<String, Integer>> map(Tuple2<String, Tuple2<String, Integer>> input) throws Exception {
            int currentSum = sum.value();
            currentSum += input.f1.f1;
            sum.update(currentSum);
            KafkaReporter kafkaReporter = new KafkaReporter(config.reportTopic, config.brokerList);
            kafkaReporter.report(Long.parseLong(input.f1.f0), System.currentTimeMillis());
            return new Tuple2<String, Tuple2<String, Integer>>(input.f0, new Tuple2<String, Integer>(input.f1.f0, currentSum));
          }

          @Override
          public void open(Configuration config) {
            ValueStateDescriptor<Integer> descriptor =
                new ValueStateDescriptor<Integer>(
                    "count", // the state name
                    TypeInformation.of(new TypeHint<Integer>() {
                    }), // type information
                    0); // default value of the state, if nothing was set
            sum = getRuntimeContext().getState(descriptor);
          }
        });
    env.execute("Word Count Job");}

Ive tried multiple variations of this, but none of them seem to work. Any ideas? Thanks in advance.



Solution 1:[1]

Is it getting -yjm 1gb -ytm 1gb properly? I would try with -yjm 1024 -ytm 1024 since it expects this value to be in megabytes.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Gerard Garcia