'FLINK1.8 , ERROR INFO : state is larger than the maximum permitted memory-backed state. use RocksDBStateBackend not useful
I'm running hibench's flinkbench with FLINK1.8
Stateful Wordcount (wordcount)
This workload counts words cumulatively received from Kafka every few seconds. This tests the stateful operator performance and Checkpoint/Acker cost in the streaming frameworks.
RUN COMMAND : flink run -yjm 1gb -ytm 1gb -ys 1 -p 1 -m yarn-cluster -c xxxxx
ERROR INFO :
Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=1766987 , maxSize=51200 . Consider using a different state backend, like the File System State backend.
at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64)
at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:145)
at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:126)
at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:136)
at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1618)
at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1843)
at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
at org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:419)
at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:315)
at org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:258)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
I had use RocksDBStateBackend. and the rate of data transmission is as low as 100 per second. but none of them seem to work.
SOURCE CODE:
public class WordCount extends StreamBase {
@Override
public void processStream(final FlinkBenchConfig config) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setBufferTimeout(config.bufferTimeout);
env.enableCheckpointing(config.checkpointDuration);
**//use RocksDBStateBackend**
env.setStateBackend(new RocksDBStateBackend("hdfs:///fink-checkpoints", true));
createDataStream(config);
DataStream<Tuple2<String, String>> dataStream = env.addSource(getDataStream());
dataStream
.map(new MapFunction<Tuple2<String, String>, Tuple2<String, Tuple2<String, Integer>>>() {
@Override
public Tuple2<String, Tuple2<String, Integer>> map(Tuple2<String, String> input) throws Exception {
String ip = UserVisitParser.parse(input.f1).getIp();
//map record to <browser, <timeStamp, 1>> type
return new Tuple2<String, Tuple2<String, Integer>>(ip, new Tuple2<String, Integer>(input.f0, 1));
}
})
.keyBy(0)
.map(new RichMapFunction<Tuple2<String, Tuple2<String, Integer>>, Tuple2<String, Tuple2<String, Integer>>>() {
private transient ValueState<Integer> sum;
@Override
public Tuple2<String, Tuple2<String, Integer>> map(Tuple2<String, Tuple2<String, Integer>> input) throws Exception {
int currentSum = sum.value();
currentSum += input.f1.f1;
sum.update(currentSum);
KafkaReporter kafkaReporter = new KafkaReporter(config.reportTopic, config.brokerList);
kafkaReporter.report(Long.parseLong(input.f1.f0), System.currentTimeMillis());
return new Tuple2<String, Tuple2<String, Integer>>(input.f0, new Tuple2<String, Integer>(input.f1.f0, currentSum));
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<Integer>(
"count", // the state name
TypeInformation.of(new TypeHint<Integer>() {
}), // type information
0); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
});
env.execute("Word Count Job");}
Ive tried multiple variations of this, but none of them seem to work. Any ideas? Thanks in advance.
Solution 1:[1]
Is it getting -yjm 1gb -ytm 1gb
properly? I would try with -yjm 1024 -ytm 1024
since it expects this value to be in megabytes.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Gerard Garcia |