'Spark streaming jdbc read the stream as and when data comes - Data source jdbc does not support streamed reading
I am using PostGre as database. I want to capture one table data for each batch and convert it as parquet file and store in to s3. I tried to connect using JDBC options of spark and readStream like below...
val jdbcDF = spark.readStream
.format("jdbc")
.option("url", "jdbc:postgresql://myserver:5432/mydatabase")
.option("dbtable", "database.schema.table")
.option("user", "xxxxx")
.option("password", "xxxxx")
.load()
but it throwed unsupported exception
Exception in thread "main" java.lang.UnsupportedOperationException: Data source jdbc does not support streamed reading
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at examples.SparkJDBCStreaming$.delayedEndpoint$examples$SparkJDBCStreaming$1(SparkJDBCStreaming.scala:16)
at examples.SparkJDBCStreaming$delayedInit$body.apply(SparkJDBCStreaming.scala:5)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
Am I in right track ? Really there is no support for database as data source for spark streaming?
AFAIK other way of doing this is write a kafka producer to publish data in to kafka topic and then using spark streaming...
Note : I dont want to use kafka connect for this since I need to do some auxiliary transformations.
Is this the only way to do it ?
What is the right way of doing this ? is there any example for such thing? Please assist!
Solution 1:[1]
Spark structured streaming does not have a standard JDBC source, but you can write a custom, but you should understand that your table must have a unique key by which you can track changes. For example, you can take my implementation, do not forget to add the necessary JDBC driver to the dependencies
Solution 2:[2]
This library may help: Jdbc2S.
It provides JDBC streaming capabilities and was built on top of Spark JDBC batch source.
Basically, you use it as you would with any other streaming source, the only mandatory configuration is the name of the offset column in the tables you're consuming.
Solution 3:[3]
Jdbc2s works for me using pyspark with few changes in JDBCStreamingSourceV1.scala to fit Python keyword convention such as :
object JDBCStreamingSourceV1 {
val CONFIG_OFFSET_FIELD = "offsetfield"
val CONFIG_START_OFFSET = "startoffset"
val CONFIG_OFFSET_FIELD_DATE_FORMAT = "offsetfielddateformat"
}
Then Finally:
def df_readstream(dbtable, offsetfield):
df = spark.readStream.format("jdbc-streaming-v1") \
.options(url=url,
driver='oracle.jdbc.driver.OracleDriver',
dbtable=dbtable,
user=user,
offsetField=offsetfield,
password=password).load()
return df
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | sutugin |
Solution 2 | Felipe Martins Melo |
Solution 3 | losforword |