'Spark read from S3 working, but I am unable to write using the same session [duplicate]

I am using a pyspark test script to read and write files to S3. Here is how I initialize the spark-session:

import findspark
from pyspark.sql import SparkSession

findspark.init()
    spark = SparkSession \
        .builder \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("fs.s3a.fast.upload", "true") \
        .config("fs.s3a.access.key", session.get_credentials().access_key) \
        .config("fs.s3a.secret.key", session.get_credentials().secret_key) \
        .master("local") \
        .getOrCreate()

I have Spark 3.0.3 pre-built for Apache Hadoop 3.2 and later, so I haven't installed hadoop independently (I understand this version of Spark does not need it).

I have downloaded the following jar files in the spark jar folder:

With this configuration, this piece of code works when I try to read from a file located in S3, using the S3a filesystem:

s3read_path = 's3a://' + path-to-file-in-s3
df = spark.read.parquet(s3read_path)
df.count()

This returns some content inside the file as expected. However, when I try to write a different dataframe to the same location, I keep getting an error. This is the code:

key_name = 's3a://' + key_name.replace('s3://', '') + file
df.repartition(1).write.parquet(key_name, mode='overwrite')

which results in the following error:

22/02/22 17:22:00 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)/ 1]
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
        at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
        at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:160)
        at org.apache.hadoop.util.DiskChecker.checkDirInternal(DiskChecker.java:100)
        at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:77)
        at org.apache.hadoop.util.BasicDiskValidator.checkStatus(BasicDiskValidator.java:32)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:331)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:394)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:477)
        at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:213)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:575)
        at org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
        at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
        at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:781)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
        at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
        at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
22/02/22 17:22:00 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, AT-5CG9513X0N.mshome.net, executor driver): java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
        at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
        at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:160)
        at org.apache.hadoop.util.DiskChecker.checkDirInternal(DiskChecker.java:100)
        at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:77)
        at org.apache.hadoop.util.BasicDiskValidator.checkStatus(BasicDiskValidator.java:32)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:331)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:394)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:477)
        at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:213)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:575)
        at org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
        at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
        at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:781)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
        at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
        at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

22/02/22 17:22:00 ERROR TaskSetManager: Task 0 in stage 3.0 failed 1 times; aborting job
22/02/22 17:22:00 ERROR FileFormatWriter: Aborting job ccd33513-a067-4468-9706-99c136e45805.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, AT-5CG9513X0N.mshome.net, executor driver): java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
        at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
        at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:160)
        at org.apache.hadoop.util.DiskChecker.checkDirInternal(DiskChecker.java:100)
        at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:77)
        at org.apache.hadoop.util.BasicDiskValidator.checkStatus(BasicDiskValidator.java:32)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:331)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:394)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:477)
        at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:213)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:575)
        at org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
        at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
        at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:781)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
        at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
        at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:962)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:287)
        at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:847)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
        at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
        at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:160)
        at org.apache.hadoop.util.DiskChecker.checkDirInternal(DiskChecker.java:100)
        at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:77)
        at org.apache.hadoop.util.BasicDiskValidator.checkStatus(BasicDiskValidator.java:32)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:331)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:394)
        at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:477)
        at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:213)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:575)
        at org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
        at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
        at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:781)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
        at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
        at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        ... 1 more

Can please anyone help me? I think it might be some kind of error due to the jar versions but I am not able to find the proper configuration.

Thanks in advance!



Solution 1:[1]

That's the issue:

UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

From the javadoc:

Thrown if the Java Virtual Machine cannot find an appropriate native-language definition of a method declared native.

That seems to recommend double-checking the classpath as there are missing libraries.

Given that you downloaded the following jar files in the spark jar folder you seem to have introduced a class incompatibility.

I'd avoid using these two jars and find a proven way to write a S3a filesystem.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Jacek Laskowski