'Avoid ConcurrentModificationException on java stream using cache
I'm getting occasionally ConcurrentModificationException with the following code:
public Set<MyObject> getTypes(Set<Type> names) {
Set<MyObject> myObjects = new HashSet<>();
myObjects = names.stream().filter(Objects::nonNull).mapToInt(Type::getId)
.mapToObj(cache::getMyObject)
.collect(Collectors.toSet());
I'm using cache to convert to MyObject, but exception seems to thrown in collect method (DAOImpl.java line 114)
java.util.ConcurrentModificationException
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1558)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at com.dao.DAOImpl.getTypes(DAOImpl.java:114)
at com.dao.DAOImpl$$FastClassBySpringCGLIB$$affe23c4.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.dao.DAOImpl$$EnhancerBySpringCGLIB$$6000bd7e.getTypes(<generated>)
How can I use cache to map to objects, or must I use cache outside stream?
Notice cache should be updated only 1 per day
Solution 1:[1]
The issue was that 2 threads changed myObjects so the expected count of the Set when looping through it was compromised, therefore the exception
The fix was to make the above service call atomic and therefore to prevent changing Set on runtime with a different thread
Solution 2:[2]
The problem is not related with your cache::getMyObject. The problem is that your names Set is being changed while you stream it, hence the ConcurrentModificationException.
If you wrap (create a new collection) from your names collection the problem is fixed.
public Set<MyObject> getTypes(Set<Type> names) {
Set<MyObject> myObjects = new HashSet<>(names).stream().filter(Objects::nonNull).mapToInt(Type::getId)
.mapToObj(cache::getMyObject).collect(Collectors.toSet());
Nonetheless, I think your cache method (getMyObject) can return null. In that case you should filter them out.
If you want to trigger the exception you can do something like this:
public static void main(String[] args) {
Set<String> names = new HashSet();
names.add("a");
names.add("b");
Thread x = new Thread(){
@Override
public void run() {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
names.add("c");
}
};
x.start();
Set<Object> myObjects = new HashSet<>(names).stream().filter(Objects::nonNull).mapToInt(s -> s.hashCode()).mapToObj(i -> String.valueOf(i)).map(x1-> getx(x1)).collect(Collectors.toSet());
}
private static Object getx(String x) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | user7294900 |
| Solution 2 |
