'Reading image dataset into data frame and feature extraction [spark with python]

In my project , i need to read image dataset[each folder having different object and I want to read these folder in stream one by one ], and then need to extract the features for one sliding window duration and then apply clustering algorithm with extracted selected feature

What i Did for initial level programming :

with reference of https://spark.apache.org/docs/latest/mllib-clustering.html#streaming-k-means i tried to do following ..but not able to convert in stream queue.

Written following code.

    from pyspark.mllib.clustering import StreamingKMeans
    import findspark
    findspark.init()
    from pyspark.sql import SparkSession
    sp = SparkSession.builder.appName('claster').getOrCreate()
   image_df = sp.read.format("image").load("file:\D:\phd/dataset/coil-20/**")

   train_df.select('image.nChannels', "image.width", "image.height","image.mode", 
   "image.data","image.origin").show(truncate=True)

+---------+-----+------+----+--------------------+--------------------+
|nChannels|width|height|mode|                data|              origin|
+---------+-----+------+----+--------------------+--------------------+
|       -1|   -1|    -1|  -1|                  []|file:///D:/phd/da...|
|        1|  128|   128|   0|[00 00 00 00 00 0...|file:///D:/phd/da...|
|        1|  128|   128|   0|[00 00 00 00 00 0...|file:///D:/phd/da...|
|        1|  128|   128|   0|[00 00 00 00 00 0...|file:///D:/phd/da...|
|        1|  128|   128|   0|[00 00 00 00 00 0...|file:///D:/phd/da...|
|        1|  128|   128|   0|[00 00 00 00 00 0...|file:///D:/phd/da...|
|        1|  128|   128|   0|[00 00 00 00 00 0...|file:///D:/phd/da...|
|        1|  128|   128|   0|[00 00 00 00 00 0...|file:///D:/phd/da...|
|        1|  128|   128|   0|[00 00 00 00 00 0...|file:///D:/phd/da...|
|        1|  128|   128|   0|[00 00 00 00 00 0...|file:///D:/phd/da...|
|        1|  128|   128|   0|[00 00 00 00 00 0...|file:///D:/phd/da...|
|        1|  128|   128|   0|[00 00 00 00 00 0...|file:///D:/phd/da...|

trainingQueue =[train_df]
testingQueue = [test_df]

from pyspark.streaming import StreamingContext
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, 1)

but following line of code gives error :

trainingstream = ssc.queueStream(trainingQueue)
testingstream=ssc.queueStream(testingQueue)

Error :

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
<ipython-input-9-bd64c31ec0c6> in <module>()
----> 1 trainingstream = ssc.queueStream(trainingQueue)
      2 testingstream=ssc.queueStream(testingQueue)

~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\streaming\context.py in queueStream(self, rdds, oneAtATime, default)
    304 
    305         if rdds and not isinstance(rdds[0], RDD):
--> 306             rdds = [self._sc.parallelize(input) for input in rdds]
    307         self._check_serializers(rdds)
    308 

~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\streaming\context.py in <listcomp>(.0)
    304 
    305         if rdds and not isinstance(rdds[0], RDD):
--> 306             rdds = [self._sc.parallelize(input) for input in rdds]
    307         self._check_serializers(rdds)
    308 

~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\context.py in parallelize(self, c, numSlices)
    525             return self._jvm.PythonParallelizeServer(self._jsc.sc(), numSlices)
    526 
--> 527         jrdd = self._serialize_to_jvm(c, serializer, reader_func, createRDDServer)
    528         return RDD(jrdd, self, serializer)
    529 

~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\context.py in _serialize_to_jvm(self, data, serializer, reader_func, createRDDServer)
    557             try:
    558                 try:
--> 559                     serializer.dump_stream(data, tempFile)
    560                 finally:
    561                     tempFile.close()

~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\serializers.py in dump_stream(self, iterator, stream)
    350 
    351     def dump_stream(self, iterator, stream):
--> 352         self.serializer.dump_stream(self._batched(iterator), stream)
    353 
    354     def load_stream(self, stream):

~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\serializers.py in dump_stream(self, iterator, stream)
    141     def dump_stream(self, iterator, stream):
    142         for obj in iterator:
--> 143             self._write_with_length(obj, stream)
    144 
    145     def load_stream(self, stream):

~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\serializers.py in _write_with_length(self, obj, stream)
    151 
    152     def _write_with_length(self, obj, stream):
--> 153         serialized = self.dumps(obj)
    154         if serialized is None:
    155             raise ValueError("serialized value should not be None")

~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\serializers.py in dumps(self, obj)
    581 
    582     def dumps(self, obj):
--> 583         return pickle.dumps(obj, protocol)
    584 
    585     if sys.version >= '3':

~\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    330                 raise Py4JError(
    331                     "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
--> 332                     format(target_id, ".", name, value))
    333         else:
    334             raise Py4JError(

Py4JError: An error occurred while calling o169.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:745)

** My Question :**

  1. Why I am not able to convert data frame into stream queue?

  2. what is use of "image.data" column in dataframe , how I can use this column for feature selection?.

**Any reference [theoretical or practical] would help me a lot :)



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source