'MapCoder issue after updating Beam to 2.35

After updating Beam from 2.33 to 2.35, started getting this error:

def estimate_size(self, unused_value, nested=False):

estimate = 4 # 4 bytes for int32 size prefix

> for key, value in unused_value.items():

E AttributeError: 'list' object has no attribute 'items' [while running 'MyGeneric ParDo Job']

../../../python3.8/site-packages/apache_beam/coders/coder_impl.py:677: AttributeError

This is a method of MapCoderImpl. I don't know Beam enough to know when it's called.

Any thoughts on what might be causing it?



Solution 1:[1]

Beam uses Coder to encode and decode a PCollection. From the error message you got, Beam tried to use MapCoder to decode your input data. It expected a dict but received a list instead, hence the error.

Additionally, Beam uses the transform functions' type hints to infer the Coder for output PCollection's elements. My guess is that you might use a wrong return type for your function. Assuming you are implementing a DoFn's process, you yield a list in the function body, then you 'd see the error above if you define the function like this:

def process(self, element, **kwargs) -> List[Dict[A, B]]:

Beam sees the output element's type hint, Dict[A, B], and decides to use MapCoder. You might want to change the type hint to the one below, so that Beam could actually use ListCoder:

def process(self, element, **kwargs) -> List[List[Dict[A, B]]]:

More about benefits of using type hint is describe here.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1