programing

saveAsTextFile 출력을 여러 파일로 분할하지 않는 방법은 무엇입니까?

nasanasas 2020. 10. 21. 08:09
반응형

saveAsTextFile 출력을 여러 파일로 분할하지 않는 방법은 무엇입니까?


Spark에서 Scala를 사용할 때를 사용하여 결과를 덤프 할 때마다 saveAsTextFile출력이 여러 부분으로 분할되는 것 같습니다. 매개 변수 (경로)를 전달하고 있습니다.

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
  1. 출력 수는 사용하는 감속기 수와 일치합니까?
  2. 이것은 출력이 압축되었음을 의미합니까?
  3. bash를 사용하여 출력을 함께 결합 할 수 있다는 것을 알고 있지만 분할하지 않고 단일 텍스트 파일에 출력을 저장하는 옵션이 있습니까 ?? API 문서를 살펴 보았지만 이에 대해 많이 언급하지 않았습니다.

여러 파일로 저장하는 이유는 계산이 분산되기 때문입니다. 출력이 충분히 작아서 한 대의 컴퓨터에 맞출 수 있다고 생각되면 다음을 사용하여 프로그램을 종료 할 수 있습니다.

val arr = year.collect()

그런 다음 결과 배열을 파일로 저장합니다. 또 다른 방법은 사용자 지정 파티 partitionBy셔 너를 사용하여 병렬화가 발생하지 않기 때문에 모든 것이 하나의 파티션으로 이동하도록 만드는 것입니다.

이 파일이 필요한 경우로 저장 될 saveAsTextFile당신이 사용할 수 있습니다 coalesce(1,true).saveAsTextFile(). 이것은 기본적으로 계산을 수행 한 다음 하나의 파티션으로 통합하는 것을 의미합니다. shuffle 인수를 true로 설정하여 repartition(1)래퍼로 사용할 수도 있습니다 coalesce. RDD.scala 의 소스를 살펴보면 제가이 대부분을 알아 방법이 있습니다. 살펴 보셔야 합니다.


더 큰 데이터 세트로 작업하는 경우 :

  • rdd.collect()이 경우 에는 드라이버에서 모든 데이터를 수집 하므로 Array메모리에서 가장 쉽게 얻을 수있는 방법 이므로 사용해서는 안됩니다 .

  • rdd.coalesce(1).saveAsTextFile() 또한 데이터가 저장되는 단일 노드에서 수행되기 위해 업스트림 단계의 병렬 처리가 손실되므로 사용해서는 안됩니다.

  • rdd.coalesce(1, shuffle = true).saveAsTextFile() 업스트림 작업의 처리를 병렬로 유지 한 다음 하나의 노드에 대해서만 셔플을 수행 하므로 가장 간단한 옵션입니다 ( rdd.repartition(1).saveAsTextFile()정확한 동의어 임).

  • rdd.saveAsSingleTextFile()아래에 제공된대로 추가로 의 병렬 처리 속성을 유지하면서 특정 이름 의 단일 파일에 rdd를 저장할 수 있습니다 rdd.coalesce(1, shuffle = true).saveAsTextFile().


함께 불편할 수 있습니다 뭔가 rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt")실제로 그 경로에있는 파일을 생성하는 것입니다 path/to/file.txt/part-00000하지를 path/to/file.txt.

다음 솔루션 rdd.saveAsSingleTextFile("path/to/file.txt")은 실제로 경로가 다음과 같은 파일을 생성합니다 path/to/file.txt.

package com.whatever.package

import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec

object SparkHelper {

  // This is an implicit class so that saveAsSingleTextFile can be attached to
  // SparkContext and be called like this: sc.saveAsSingleTextFile
  implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {

    def saveAsSingleTextFile(path: String): Unit =
      saveAsSingleTextFileInternal(path, None)

    def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
      saveAsSingleTextFileInternal(path, Some(codec))

    private def saveAsSingleTextFileInternal(
        path: String, codec: Option[Class[_ <: CompressionCodec]]
    ): Unit = {

      // The interface with hdfs:
      val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)

      // Classic saveAsTextFile in a temporary folder:
      hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
      codec match {
        case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
        case None        => rdd.saveAsTextFile(s"$path.tmp")
      }

      // Merge the folder of resulting part-xxxxx into one file:
      hdfs.delete(new Path(path), true) // to make sure it's not there already
      FileUtil.copyMerge(
        hdfs, new Path(s"$path.tmp"),
        hdfs, new Path(path),
        true, rdd.sparkContext.hadoopConfiguration, null
      )
      // Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144

      hdfs.delete(new Path(s"$path.tmp"), true)
    }
  }
}

다음과 같이 사용할 수 있습니다.

import com.whatever.package.SparkHelper.RDDExtensions

rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])

이 스 니펫 :

  • 먼저 데이터를 하나의 파일에 저장하지 않으려는 것처럼 rdd.saveAsTextFile("path/to/file.txt")임시 폴더에 rdd path/to/file.txt.tmp를 저장합니다 (업스트림 작업의 처리를 병렬로 유지).

  • 그리고에만 사용 하둡 파일 시스템 API를 , 우리는 계속 병합 ( FileUtil.copyMerge()우리의 최종 출력 하나의 파일을 만들 수있는 다양한 출력 파일의) path/to/file.txt.


You could call coalesce(1) and then saveAsTextFile() - but it might be a bad idea if you have a lot of data. Separate files per split are generated just like in Hadoop in order to let separate mappers and reducers write to different files. Having a single output file is only a good idea if you have very little data, in which case you could do collect() as well, as @aaronman said.


As others have mentioned, you can collect or coalesce your data set to force Spark to produce a single file. But this also limits the number of Spark tasks that can work on your dataset in parallel. I prefer to let it create a hundred files in the output HDFS directory, then use hadoop fs -getmerge /hdfs/dir /local/file.txt to extract the results into a single file in the local filesystem. This makes the most sense when your output is a relatively small report, of course.


You can call repartition() and follow this way:

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)

var repartitioned = year.repartition(1)
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")

enter image description here


You will be able to do it in the next version of Spark, in the current version 1.0.0 it's not possible unless you do it manually somehow, for example, like you mentioned, with a bash script call.


I also want to mention that the documentation clearly states that users should be careful when calling coalesce with a real small number of partitions . this can cause upstream partitions to inherit this number of partitions.

I would not recommend using coalesce(1) unless really required.


In Spark 1.6.1 the format is as shown below. It creates a single output file.It is best practice to use it if the output is small enough to handle.Basically what it does is that it returns a new RDD that is reduced into numPartitions partitions.If you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1)

pair_result.coalesce(1).saveAsTextFile("/app/data/")

Here's my answer to output a single file. I just added coalesce(1)

val year = sc.textFile("apat63_99.txt")
              .map(_.split(",")(1))
              .flatMap(_.split(","))
              .map((_,1))
              .reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")

Code:

year.coalesce(1).saveAsTextFile("year")

참고URL : https://stackoverflow.com/questions/24371259/how-to-make-saveastextfile-not-split-output-into-multiple-file

반응형