'MapCoder issue after updating Beam to 2.35
After updating Beam from 2.33 to 2.35, started getting this error:
def estimate_size(self, unused_value, nested=False):
estimate = 4 # 4 bytes for int32 size prefix
> for key, value in unused_value.items():
E AttributeError: 'list' object has no attribute 'items' [while running 'MyGeneric ParDo Job']
../../../python3.8/site-packages/apache_beam/coders/coder_impl.py:677: AttributeError
This is a method of MapCoderImpl. I don't know Beam enough to know when it's called.
Any thoughts on what might be causing it?
Solution 1:[1]
Beam uses Coder
to encode and decode a PCollection
. From the error message you got, Beam tried to use MapCoder
to decode your input data. It expected a dict
but received a list
instead, hence the error.
Additionally, Beam uses the transform functions' type hints to infer the Coder
for output PCollection
's elements. My guess is that you might use a wrong return type for your function. Assuming you are implementing a DoFn
's process
, you yield
a list in the function body, then you 'd see the error above if you define the function like this:
def process(self, element, **kwargs) -> List[Dict[A, B]]:
Beam sees the output element's type hint, Dict[A, B]
, and decides to use MapCoder
. You might want to change the type hint to the one below, so that Beam could actually use ListCoder
:
def process(self, element, **kwargs) -> List[List[Dict[A, B]]]:
More about benefits of using type hint is describe here.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |