'Pyspark issue in resolving column when there is dot (.)
Pyspark apply in pandas have some difficulty in resolving columns when there is dot in the column name.
Here is an example that I have which reproduces the issue. Example taken by modifying doctest example here
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("abc|database|10.159.154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10.159.154|xef", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show()
This gives the below error
AnalysisException Traceback (most recent call last)
<ipython-input-126-b1807bb28ae3> in <module>
8 return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show()
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in applyInPandas(self, func, schema)
295 udf = pandas_udf(
296 func, returnType=schema, functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF)
--> 297 all_cols = self._extract_cols(self._gd1) + self._extract_cols(self._gd2)
298 udf_column = udf(*all_cols)
299 jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, udf_column._jc.expr())
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in _extract_cols(gd)
303 def _extract_cols(gd):
304 df = gd._df
--> 305 return [df[col] for col in df.columns]
306
307
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in <listcomp>(.0)
303 def _extract_cols(gd):
304 df = gd._df
--> 305 return [df[col] for col in df.columns]
306
307
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in __getitem__(self, item)
1378 """
1379 if isinstance(item, basestring):
-> 1380 jc = self._jdf.apply(item)
1381 return Column(jc)
1382 elif isinstance(item, Column):
~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
135 # Hide where the exception came from that shows a non-Pythonic
136 # JVM exception message.
--> 137 raise_from(converted)
138 else:
139 raise
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in raise_from(e)
AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" among (abc|database|10.159.154|xef, id, v1); did you mean to quote the `abc|database|10.159.154|xef` column?;
As we can see the column is present there in the among
list shown at the end of stack trace.
When i replace .
(dot) with _
(underscore) the code actually works.
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("abc|database|10_159_154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10_159_154|xef", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 string").show()
+---------------------------+---+---+---+
|abc|database|10_159_154|xef| id| v1| v2|
+---------------------------+---+---+---+
| 20000101| 1|1.0| x|
| 20000102| 1|3.0| x|
| 20000101| 2|2.0| y|
| 20000102| 2|4.0| y|
+---------------------------+---+---+---+
PS : Also created an issue here
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|