'Jupyter Notebook PySpark OSError [WinError 123] The filename, directory name, or volume label syntax is incorrect:

System Configuration: Operating System: Windows 10 Python Version: 3.7 Spark Version: 2.4.4 SPARK_HOME: C:\spark\spark-2.4.4-bin-hadoop2.7

Problem I am using PySpark to do parallel computations on all the columns of a row in a dataframe. I convert my Pandas Dataframe to Spark dataframe. On the spark dataframe, the map transformation and collect action is performed. While, performing the collect operation the Py4J error with OSError pops up. The error arises in the import sklearn statement and in the trained classifier(ML Model).

Code Snippet

from sklearn.neural_network.multilayer_perceptron import MLPClassifier
classifier=MLPClassifier()
classifier.fit(x_train, y_train)

def func1(rows,trained_model=classifier):
    items = rows.asDict()
    row = pd.Series(items)
    output = func2(row,trained_model) # Consumes pandas series in other file having import sklearn statement
    return output

spdf=spark.createDataFrame(pandasDF)
result=spdf.rdd.map(lambda row:func1(row)).collect()

Error

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-33-0bfb9d088e2d> in <module>
----> 1 result=spdf.rdd.map(lambda row:clusterCreation(row)).collect()
      2 print(type(result))
.
.
.
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 2.0 failed 1 times, most recent failure: Lost task 2.0 in stage 2.0 (TID 5, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 364, in main
  File "C:\spark\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 71, in read_command
  File "C:\spark\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 580, in loads
    return pickle.loads(obj, encoding=encoding)
.
.
.
 File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\ensemble\__init__.py", line 7, in <module>
    from .forest import RandomForestClassifier
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\ensemble\forest.py", line 53, in <module>
    from ..metrics import r2_score
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\metrics\__init__.py", line 7, in <module>
    from .ranking import auc
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\metrics\ranking.py", line 35, in <module>
    from ..preprocessing import label_binarize
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\preprocessing\__init__.py", line 6, in <module>
    from ._function_transformer import FunctionTransformer
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\preprocessing\_function_transformer.py", line 5, in <module>
    from ..utils.testing import assert_allclose_dense_sparse
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\sklearn\utils\testing.py", line 718, in <module>
    import pytest
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\pytest.py", line 6, in <module>
    from _pytest.assertion import register_assert_rewrite
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\assertion\__init__.py", line 6, in <module>
    from _pytest.assertion import rewrite
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\assertion\rewrite.py", line 20, in <module>
    from _pytest.assertion import util
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\assertion\util.py", line 5, in <module>
    import _pytest._code
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\_code\__init__.py", line 2, in <module>
    from .code import Code  # noqa
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\_pytest\_code\code.py", line 11, in <module>
    import pluggy
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\pluggy\__init__.py", line 16, in <module>
    from .manager import PluginManager, PluginValidationError
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\pluggy\manager.py", line 6, in <module>
    import importlib_metadata
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 466, in <module>
    __version__ = version(__name__)
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 433, in version
    return distribution(package).version
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 406, in distribution
    return Distribution.from_name(package)
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 176, in from_name
    dist = next(dists, None)
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 362, in <genexpr>
    for path in map(cls._switch_path, paths)
  File "C:\Users\rkagr\Anaconda3\lib\site-packages\importlib_metadata\__init__.py", line 377, in _search_path
    if not root.is_dir():
  File "C:\Users\rkagr\Anaconda3\lib\pathlib.py", line 1351, in is_dir
    return S_ISDIR(self.stat().st_mode)
  File "C:\Users\rkagr\Anaconda3\lib\pathlib.py", line 1161, in stat
    return self._accessor.stat(self)
OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect: 'C:\\C:\\spark\\spark-2.4.4-bin-hadoop2.7\\jars\\spark-core_2.11-2.4.4.jar'

MCVE This MCVE defines the function to just return the same input row as dictionary whereas the original code returns a dictionary after some processing.

import findspark

findspark.init()
findspark.find()

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf().setAppName('MRC').setMaster('local[2]')
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession.builder.getOrCreate()

import sklearn
import sklearn.datasets
import sklearn.model_selection
import sklearn.ensemble

iris = sklearn.datasets.load_iris()
train, test, labels_train, labels_test = sklearn.model_selection.train_test_split(iris.data, iris.target, train_size=0.80)

classifier = sklearn.ensemble.RandomForestClassifier()
classifier.fit(train, labels_train)

import pickle
path = './random_classifier.mdl'
pickle.dump(classifier, open(path,'wb'))

import pandas as pd
pddf=pd.DataFrame(test)
spdf=spark.createDataFrame(pddf)

def clusterCreation(rows,classifier_path):
    items = rows.asDict()
    row = pd.Series(items)
    with open(classifier_path,'rb') as fp:
        classifier = pickle.load(fp)
        print(classifier)
    return items

result=spdf.rdd.map(lambda row:clusterCreation(row,classifier_path=path)).collect()
print(result)


Solution 1:[1]

I encountered the same problem of file path containing C:\\C:\\. I found a discussion in https://github.com/Ibotta/sk-dist/issues/30 which indicated that this may be a problem with pytest used inside scikit-learn. The problem was reported in scikit-learn version 0.21.3. I upgraded my scikit-learn package to 0.22.1 (by upgrading to Anaconda 2020.02) and the error went away.

My environment is Windows 10, Spark 2.4.5, Anaconda 2020.02 (which contains scikit-learn 0.22.1). Note that the older Anaconda version 2019.10 contained scikit-learn version 0.21.3.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Grace