'Avoid ConcurrentModificationException on java stream using cache
I'm getting occasionally ConcurrentModificationException
with the following code:
public Set<MyObject> getTypes(Set<Type> names) {
Set<MyObject> myObjects = new HashSet<>();
myObjects = names.stream().filter(Objects::nonNull).mapToInt(Type::getId)
.mapToObj(cache::getMyObject)
.collect(Collectors.toSet());
I'm using cache to convert to MyObject
, but exception seems to thrown in collect
method (DAOImpl.java line 114)
java.util.ConcurrentModificationException
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1558)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at com.dao.DAOImpl.getTypes(DAOImpl.java:114)
at com.dao.DAOImpl$$FastClassBySpringCGLIB$$affe23c4.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.dao.DAOImpl$$EnhancerBySpringCGLIB$$6000bd7e.getTypes(<generated>)
How can I use cache to map to objects, or must I use cache outside stream?
Notice cache should be updated only 1 per day
Solution 1:[1]
The issue was that 2 threads changed myObjects
so the expected count of the Set
when looping through it was compromised, therefore the exception
The fix was to make the above service call atomic and therefore to prevent changing Set
on runtime with a different thread
Solution 2:[2]
The problem is not related with your cache::getMyObject
. The problem is that your names
Set is being changed while you stream it, hence the ConcurrentModificationException
.
If you wrap (create a new collection) from your names
collection the problem is fixed.
public Set<MyObject> getTypes(Set<Type> names) {
Set<MyObject> myObjects = new HashSet<>(names).stream().filter(Objects::nonNull).mapToInt(Type::getId)
.mapToObj(cache::getMyObject).collect(Collectors.toSet());
Nonetheless, I think your cache method (getMyObject) can return null. In that case you should filter them out.
If you want to trigger the exception you can do something like this:
public static void main(String[] args) {
Set<String> names = new HashSet();
names.add("a");
names.add("b");
Thread x = new Thread(){
@Override
public void run() {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
names.add("c");
}
};
x.start();
Set<Object> myObjects = new HashSet<>(names).stream().filter(Objects::nonNull).mapToInt(s -> s.hashCode()).mapToObj(i -> String.valueOf(i)).map(x1-> getx(x1)).collect(Collectors.toSet());
}
private static Object getx(String x) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | user7294900 |
Solution 2 |