'Reading image dataset into data frame and feature extraction [spark with python]
In my project , i need to read image dataset[each folder having different object and I want to read these folder in stream one by one ], and then need to extract the features for one sliding window duration and then apply clustering algorithm with extracted selected feature
What i Did for initial level programming :
with reference of https://spark.apache.org/docs/latest/mllib-clustering.html#streaming-k-means i tried to do following ..but not able to convert in stream queue.
Written following code.
from pyspark.mllib.clustering import StreamingKMeans
import findspark
findspark.init()
from pyspark.sql import SparkSession
sp = SparkSession.builder.appName('claster').getOrCreate()
image_df = sp.read.format("image").load("file:\D:\phd/dataset/coil-20/**")
train_df.select('image.nChannels', "image.width", "image.height","image.mode",
"image.data","image.origin").show(truncate=True)
+---------+-----+------+----+--------------------+--------------------+
|nChannels|width|height|mode| data| origin|
+---------+-----+------+----+--------------------+--------------------+
| -1| -1| -1| -1| []|file:///D:/phd/da...|
| 1| 128| 128| 0|[00 00 00 00 00 0...|file:///D:/phd/da...|
| 1| 128| 128| 0|[00 00 00 00 00 0...|file:///D:/phd/da...|
| 1| 128| 128| 0|[00 00 00 00 00 0...|file:///D:/phd/da...|
| 1| 128| 128| 0|[00 00 00 00 00 0...|file:///D:/phd/da...|
| 1| 128| 128| 0|[00 00 00 00 00 0...|file:///D:/phd/da...|
| 1| 128| 128| 0|[00 00 00 00 00 0...|file:///D:/phd/da...|
| 1| 128| 128| 0|[00 00 00 00 00 0...|file:///D:/phd/da...|
| 1| 128| 128| 0|[00 00 00 00 00 0...|file:///D:/phd/da...|
| 1| 128| 128| 0|[00 00 00 00 00 0...|file:///D:/phd/da...|
| 1| 128| 128| 0|[00 00 00 00 00 0...|file:///D:/phd/da...|
| 1| 128| 128| 0|[00 00 00 00 00 0...|file:///D:/phd/da...|
trainingQueue =[train_df]
testingQueue = [test_df]
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, 1)
but following line of code gives error :
trainingstream = ssc.queueStream(trainingQueue)
testingstream=ssc.queueStream(testingQueue)
Error :
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
<ipython-input-9-bd64c31ec0c6> in <module>()
----> 1 trainingstream = ssc.queueStream(trainingQueue)
2 testingstream=ssc.queueStream(testingQueue)
~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\streaming\context.py in queueStream(self, rdds, oneAtATime, default)
304
305 if rdds and not isinstance(rdds[0], RDD):
--> 306 rdds = [self._sc.parallelize(input) for input in rdds]
307 self._check_serializers(rdds)
308
~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\streaming\context.py in <listcomp>(.0)
304
305 if rdds and not isinstance(rdds[0], RDD):
--> 306 rdds = [self._sc.parallelize(input) for input in rdds]
307 self._check_serializers(rdds)
308
~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\context.py in parallelize(self, c, numSlices)
525 return self._jvm.PythonParallelizeServer(self._jsc.sc(), numSlices)
526
--> 527 jrdd = self._serialize_to_jvm(c, serializer, reader_func, createRDDServer)
528 return RDD(jrdd, self, serializer)
529
~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\context.py in _serialize_to_jvm(self, data, serializer, reader_func, createRDDServer)
557 try:
558 try:
--> 559 serializer.dump_stream(data, tempFile)
560 finally:
561 tempFile.close()
~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\serializers.py in dump_stream(self, iterator, stream)
350
351 def dump_stream(self, iterator, stream):
--> 352 self.serializer.dump_stream(self._batched(iterator), stream)
353
354 def load_stream(self, stream):
~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\serializers.py in dump_stream(self, iterator, stream)
141 def dump_stream(self, iterator, stream):
142 for obj in iterator:
--> 143 self._write_with_length(obj, stream)
144
145 def load_stream(self, stream):
~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\serializers.py in _write_with_length(self, obj, stream)
151
152 def _write_with_length(self, obj, stream):
--> 153 serialized = self.dumps(obj)
154 if serialized is None:
155 raise ValueError("serialized value should not be None")
~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\serializers.py in dumps(self, obj)
581
582 def dumps(self, obj):
--> 583 return pickle.dumps(obj, protocol)
584
585 if sys.version >= '3':
~\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
~\spark\spark-2.4.7-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
~\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
--> 332 format(target_id, ".", name, value))
333 else:
334 raise Py4JError(
Py4JError: An error occurred while calling o169.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
** My Question :**
Why I am not able to convert data frame into stream queue?
what is use of "image.data" column in dataframe , how I can use this column for feature selection?.
**Any reference [theoretical or practical] would help me a lot :)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|