reading parquet file with array<map<string,string>> column

  • Last Update :
  • Techknowledgy :

I'm dealing with pyarrow.lib.ArrowNotImplementedError: Reading lists of structs from Parquet files not yet supported when I try to read using Pandas; however, when I read using pyspark and then convert to pandas, the data at least loads:

import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
df = spark.read.load(path)
pdf = df.toPandas()

and the offending field is now rendered as a pyspark Row object, which have some structured parsing but you would have to probably write custom pandas functions to extract data from them:

>>> pdf["user"][0]["sessions"][0]["views"]
   [Row(is_search = True, price = None, search_string = 'ABC', segment = 'listing', time = 1571250719.393951), Row(is_search = True, price = None, search_string = 'ZYX', segment = 'homepage', time = 1571250791.588197), Row(is_search = True, price = None, search_string = 'XYZ', segment = 'listing', time = 1571250824.106184)]

Suggestion : 2

In this article, I will explain how to explode array or list and map DataFrame columns to rows using different Spark explode functions (explode, explore_outer, posexplode, posexplode_outer) with Scala example.,In this article, you have learned how to how to explode or convert array or map DataFrame columns to rows using explode and posexplode SQL functions and their’s respective outer functions and also learned differences between these functions.,Spark SQL explode function is used to create or split an array or map DataFrame columns to rows. Spark defines several flavors of this function; explode_outer – to handle nulls and empty, posexplode – which explodes with a position of element and posexplode_outer – to handle nulls.,Spark SQL explode_outer(e: Column) function is used to create a row for each element in the array or map column. Unlike explode, if the array or map is null or empty, explode_outer returns null.

1._
import spark.implicits._

val arrayData = Seq(
   Row("James", List("Java", "Scala"), Map("hair" - > "black", "eye" - > "brown")),
   Row("Michael", List("Spark", "Java", null), Map("hair" - > "brown", "eye" - > null)),
   Row("Robert", List("CSharp", ""), Map("hair" - > "red", "eye" - > "")),
   Row("Washington", null, null),
   Row("Jefferson", List(), Map())
)

val arraySchema = new StructType()
   .add("name", StringType)
   .add("knownLanguages", ArrayType(StringType))
   .add("properties", MapType(StringType, StringType))

val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema)
df.printSchema()
df.show(false)

explode – array column example

df.select($ "name", explode($ "knownLanguages"))
   .show(false)
3._
+ -- -- -- - + -- -- -- +
|
name | col |
   + -- -- -- - + -- -- -- +
   |
   James | Java |
   |
   James | Scala |
   |
   Michael | Spark |
   |
   Michael | Java |
   |
   Michael | null |
   |
   Robert | CSharp |
   |
   Robert | |
   + -- -- -- - + -- -- -- +

Outputs:

+ -- -- -- - + -- -- + -- -- - +
|
name | key | value |
   + -- -- -- - + -- -- + -- -- - +
   |
   James | hair | black |
   |
   James | eye | brown |
   |
   Michael | hair | brown |
   |
   Michael | eye | null |
   |
   Robert | hair | red |
   |
   Robert | eye | |
   + -- -- -- - + -- -- + -- -- - +

explode_outer – array example

df.select($ "name", explode_outer($ "knownLanguages"))
   .show(false)

Suggestion : 3

Created ‎10-11-2019 06:32 AM

DESCRIBE struct_demo;
+-------------------+--------------------------+
| name | type |
+-------------------+--------------------------+
| id | bigint |
| name | string |
| employee_info | struct< | | | employer:string, | | | id:bigint, | | | address:string | | |> |
   | places_lived | array<struct< | | | street:string, | | | city:string, | | | country:string | | |>> |
      | memorable_moments | map<string,struct< | | | year:int, | | | place:string, | | | details:string | | |>> |
         | current_address | struct< | | | street_address:struct< | | | street_number:int, | | | street_name:string, | | | street_type:string | | |>, |
            | | country:string, |
            | | postal_code:string |
            | | > |
  1. Creating a temporary table with values, then transform it to Parquet complex type with Hive, please see our documentation here for sample queries: Constructing Parquet Files with Complex Columns Using Hive
  2. Using INSERT INTO ... SELECT <values> query, for inserting records one by one, reference queries can be found in the description of IMPALA-3938. Please note that this will generate separate files for each records that occasionally need to be compacted.
CREATE TABLE struct_demo
(
id BIGINT,
name STRING,

-- A STRUCT as a top-level column. Demonstrates how the table ID column
-- and the ID field within the STRUCT can coexist without a name conflict.
employee_info STRUCT < employer: STRING, id: BIGINT, address: STRING>,

   -- A STRUCT as the element type of an ARRAY.
   places_lived ARRAY < STRUCT <street: STRING, city: STRING, country: STRING>>,

      -- A STRUCT as the value portion of the key-value pairs in a MAP.
      memorable_moments MAP < STRING, STRUCT < year: INT, place: STRING, details: STRING>>,

         -- A STRUCT where one of the fields is another STRUCT.
         current_address STRUCT < street_address: STRUCT <street_number: INT, street_name: STRING, street_type: STRING>, country: STRING, postal_code: STRING >
            )
            STORED AS PARQUET; 

Suggestion : 4

Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.,By passing path/to/table to either SparkSession.read.parquet or SparkSession.read.load, Spark SQL will automatically extract the partitioning information from the paths. Now the schema of the returned DataFrame becomes:,LEGACY: Spark will rebase dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files.,Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table conversion is enabled, metadata of those converted tables are also cached. If these tables are updated by Hive or other external tools, you need to refresh them manually to ensure consistent metadata.

// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");

   // DataFrames can be saved as Parquet files, maintaining the schema information
   peopleDF.write().parquet("people.parquet");

   // Read in the Parquet file created above.
   // Parquet files are self-describing so the schema is preserved
   // The result of loading a parquet file is also a DataFrame
   Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");

      // Parquet files can also be used to create a temporary view and then used in SQL statements
      parquetFileDF.createOrReplaceTempView("parquetFile");
      Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
         Dataset<String> namesDS = namesDF.map(
            (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
               Encoders.STRING());
               namesDS.show();
               // +------------+
               // | value|
               // +------------+
               // |Name: Justin|
               // +------------+
peopleDF = spark.read.json("examples/src/main/resources/people.json")

# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")

# Read in the Parquet file created above.
# Parquet files are self - describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()
# + -- -- -- +
   # | name |
   # + -- -- -- +
   # | Justin |
   # + -- -- -- +
df < -read.df("examples/src/main/resources/people.json", "json")

# SparkDataFrame can be saved as Parquet files, maintaining the schema information.
write.parquet(df, "people.parquet")

# Read in the Parquet file created above.Parquet files are self - describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile < -read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers < -sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers)
# # name
# # 1 Justin

# We can also run custom R - UDFs on Spark DataFrames.Here we prefix all the names with "Name:"
schema < -structType(structField("name", "string"))
teenNames < -dapply(df, function(p) {
   cbind(paste("Name:", p$name))
}, schema)
for (teenName in collect(teenNames) $name) {
   cat(teenName, "\n")
}
# # Name: Michael
# # Name: Andy
# # Name: Justin
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS(
   path "examples/src/main/resources/people.parquet"
)

SELECT * FROM parquetTable
path└── to└── table├── gender = male│├──...│││├──country = US││└── data.parquet│├── country = CN││└── data.parquet│└──...└──gender = female├──...│├──country = US│└── data.parquet├── country = CN│└── data.parquet└──...

Suggestion : 5

DataFrameReader can load datasets from Dataset[String] (with lines being complete "files") using format-specific csv and json operators.,DataFrameReader is a fluent API to describe the input data source that will be used to "load" data from an external data source (e.g. files, tables, JDBC or Dataset[String]).,Set when DataFrameReader is requested to set a schema, load a data from an external data source, loadV1Source (when creating a DataSource), and load a data using json and csv file formats ,schema allows for specifying the schema of a data source (that the DataFrameReader is about to read a dataset from).

import org.apache.spark.sql.SparkSession
assert(spark.isInstanceOf[SparkSession])

import org.apache.spark.sql.DataFrameReader
val reader = spark.read
assert(reader.isInstanceOf[DataFrameReader])
csv(csvDataset: Dataset[String]): DataFrame
csv(path: String): DataFrame
csv(paths: String * ): DataFrame
format(source: String): DataFrameReader
jdbc(
   url: String,
   table: String,
   predicates: Array[String],
   connectionProperties: Properties): DataFrame
jdbc(
   url: String,
   table: String,
   properties: Properties): DataFrame
jdbc(
   url: String,
   table: String,
   columnName: String,
   lowerBound: Long,
   upperBound: Long,
   numPartitions: Int,
   connectionProperties: Properties): DataFrame
json(jsonDataset: Dataset[String]): DataFrame
json(path: String): DataFrame
json(paths: String * ): DataFrame
load(): DataFrame
load(path: String): DataFrame
load(paths: String * ): DataFrame