spark write parquet not writing any files, only _success

  • Last Update :
  • Techknowledgy :

One possibility is that you looked at the wrong directory. The default file system Spark uses is specified by the fs.defaultFS Hadoop configuration option. For example, if you have something like

<property>
   <name>fs.defaultFS</name>
   <value>file:///tmp</value>
</property>

in your core-site.xml, then Spark writes to your local file system instead of HDFS if you omit the URL scheme of the output path. Vice versa, if it's

<property>
   <name>fs.defaultFS</name>
   <value>hdfs://some-host:9000</value>
</property>

Suggestion : 2

March 28, 2020 • Apache Spark SQL • Bartosz Konieczny

The physical write also starts with a setup stage where nothing happens for FileOutputCommitter called under-the-hood by SQLHadoopMapReduceCommitProtocol. Later, the real writing begins by creating an appropriate instance of the FileFormatDataWriter which for our case, ie. the case when we don't use partitions and buckets, will be SingleDirectoryDataWriter. It starts by creating a new file for the task in attempt_ directory. Data is first physically written to the attempt and once the task succeeds, it's committed in its turn from commitTask(taskContext: TaskAttemptContext). This commit consists on renaming attempt_ files into task_ files by delegating the call to SparkHadoopMapRedUtil.commitTask which, in its turn, will call FileOutputCommitter commitTask(TaskAttemptContext context, Path taskAttemptPath) method:

  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
     val attemptId = taskContext.getTaskAttemptID
     SparkHadoopMapRedUtil.commitTask(
        committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
     new TaskCommitMessage(addedAbsPathFiles.toMap - > partitionPaths.toSet)
  }

  // FileOutputCommitter
  public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
  throws IOException {
     // ...
     if (!fs.rename(taskAttemptPath, committedTaskPath)) {
        throw new IOException("Could not rename " + taskAttemptPath + " to " +
           committedTaskPath);
     }
     // ...

The whole operation of converting attempt to task files look like in this snippet (S3 tests):

# before task commit
tmp / test - output - files / _temporary / 0 / _temporary / attempt_20200209133656_0000_m_000000_0 / part - 00000 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
tmp / test - output - files / _temporary / 0 / _temporary / attempt_20200209133656_0000_m_000001_1 / part - 00001 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
tmp / test - output - files / _temporary / 0 / _temporary / attempt_20200209133656_0000_m_000002_2 / part - 00002 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
tmp / test - output - files / _temporary / 0 / _temporary / attempt_20200209133656_0000_m_000003_3 / part - 00003 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
tmp / test - output - files / _temporary / 0 / _temporary / attempt_20200209133656_0000_m_000004_4 / part - 00004 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
tmp / test - output - files / _temporary / 0 / _temporary / attempt_20200209133656_0000_m_000005_5 / part - 00005 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json

# VS

# after task commit
tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000000 / part - 00000 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000001 / part - 00001 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000002 / part - 00002 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000003 / part - 00003 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000004 / part - 00004 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000005 / part - 00005 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json

I took a snapshot of S3 during the migration with aws s3 ls s3://test-bucket/tmp/test-output-files/ --recursive --human-readable where some of task files were promoted and some others still remained in the task_ directories:

2020 - 02 - 09 13: 40: 35 0 Bytes tmp / test - output - files / _temporary / 0 / _temporary /
   2020 - 02 - 09 13: 41: 15 0 Bytes tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000000 /
   2020 - 02 - 09 13: 41: 21 0 Bytes tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000001 /
   2020 - 02 - 09 13: 41: 24 0 Bytes tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000002 /
   2020 - 02 - 09 13: 41: 27 0 Bytes tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000003 /
   2020 - 02 - 09 13: 40: 34 27 Bytes tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000004 / part - 00004 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
2020 - 02 - 09 13: 40: 35 27 Bytes tmp / test - output - files / _temporary / 0 / task_20200209133656_0000_m_000005 / part - 00005 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
2020 - 02 - 09 13: 41: 14 27 Bytes tmp / test - output - files / part - 00000 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
2020 - 02 - 09 13: 41: 21 27 Bytes tmp / test - output - files / part - 00001 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
2020 - 02 - 09 13: 41: 24 27 Bytes tmp / test - output - files / part - 00002 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json
2020 - 02 - 09 13: 41: 27 27 Bytes tmp / test - output - files / part - 00003 - d5cca9d3 - 361 a - 4505 - 953 b - f0d641d2fb7a - c000.json

Suggestion : 3

When you write a Spark DataFrame, it creates a directory and saves all part files inside a directory, sometimes you don’t want to create a directory instead you just want a single data file (CSV, JSON, Parquet, Avro e.t.c) with the name specified in the path.,In this quick article, I will explain how to save a Spark DataFrame into a CSV File without a directory.,The above example creates an address directory and creates a part-000* file along with _SUCCESS and CRC hidden files.,Unfortunately, Spark doesn’t support creating a data file without a folder, However, you can use the Hadoop file system library in order to achieve this.

1._
val spark: SparkSession = SparkSession.builder()
   .master("local[3]")
   .appName("SparkByExamples.com")
   .getOrCreate()

val df = spark.read.option("header", true).csv("address.csv")
df.coalesce(1).write.csv("address")

Now, Let’s use Hadoop Filesystem API to copy the part-0000* file from the directory to the desired location with the new file name and remove the directory.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs. {
   FileSystem,
   FileUtil,
   Path
}

// Copy the actual file from Directory and Renames to custom name
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

val srcPath = new Path("c:/tmp/address")
val destPath = new Path("c:/tmp/address_merged.csv")
val srcFile = FileUtil.listFiles(new File("c:/tmp/address"))
   .filterNot(f => f.getPath.endsWith(".csv"))(0)
//Copy the CSV file outside of Directory and rename to desired file name
FileUtil.copy(srcFile, hdfs, destPath, true, hadoopConfig)
//Removes CRC File that create from above statement
hdfs.delete(new Path(".address_merged.csv.crc"), true)
//Remove Directory created by df.write()
hdfs.delete(srcPath, true)