'Pyspark issue in resolving column when there is dot (.)

Pyspark apply in pandas have some difficulty in resolving columns when there is dot in the column name.

Here is an example that I have which reproduces the issue. Example taken by modifying doctest example here

df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("abc|database|10.159.154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10.159.154|xef", "id", "v2"))
def asof_join(l, r):
    return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show()

This gives the below error

AnalysisException                         Traceback (most recent call last)
<ipython-input-126-b1807bb28ae3> in <module>
      8     return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
      9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show()

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in applyInPandas(self, func, schema)
    295         udf = pandas_udf(
    296             func, returnType=schema, functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF)
--> 297         all_cols = self._extract_cols(self._gd1) + self._extract_cols(self._gd2)
    298         udf_column = udf(*all_cols)
    299         jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, udf_column._jc.expr())

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in _extract_cols(gd)
    303     def _extract_cols(gd):
    304         df = gd._df
--> 305         return [df[col] for col in df.columns]
    306 
    307 

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in <listcomp>(.0)
    303     def _extract_cols(gd):
    304         df = gd._df
--> 305         return [df[col] for col in df.columns]
    306 
    307 

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in __getitem__(self, item)
   1378         """
   1379         if isinstance(item, basestring):
-> 1380             jc = self._jdf.apply(item)
   1381             return Column(jc)
   1382         elif isinstance(item, Column):

~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    135                 # Hide where the exception came from that shows a non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise

~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" among (abc|database|10.159.154|xef, id, v1); did you mean to quote the `abc|database|10.159.154|xef` column?;

As we can see the column is present there in the among list shown at the end of stack trace.

When i replace . (dot) with _ (underscore) the code actually works.

df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("abc|database|10_159_154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10_159_154|xef", "id", "v2"))
def asof_join(l, r):
    return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 string").show()
+---------------------------+---+---+---+
|abc|database|10_159_154|xef| id| v1| v2|
+---------------------------+---+---+---+
|                   20000101|  1|1.0|  x|
|                   20000102|  1|3.0|  x|
|                   20000101|  2|2.0|  y|
|                   20000102|  2|4.0|  y|
+---------------------------+---+---+---+

PS : Also created an issue here



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source