implicit schema for pandas_udf in pyspark?

  • Last Update :
  • Techknowledgy :

Based on Sanxofons comment, I got an idea on how to implement this myself:

from pyspark.sql.types
import *

mapping = {
   "float64": DoubleType,
   "object": StringType,
   "int64": IntegerType
}
# Incomplete - extend with your types.

def createUDFSchemaFromPandas(dfp):
   column_types = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
schema = StructType(column_types)
return schema

What I do is get a sample pandas df, pass it to the function, and see what returns:

dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)

If internal process is somehow based on code generation, your best option is to integrate both logic and schema generation. For example

def describe(cols, fun):
   schema = StructType([StructField(c, DoubleType()) for c in cols])
@pandas_udf(schema, PandasUDFType, PandasUDFType.GROUPED_MAP)
def _(df):
   return df[cols].agg([fun])
return _

df = spark.createDataFrame([(1, 2.0, 1.0, 3.0), (1, 4.0, 2.0, 5.0)], ("id", "x", "y", "z"))

df.groupBy("id").apply(describe(["x"], "mean")).show()
# + -- - +
   # | x |
   # + -- - +
   # | 3.0 |
   # + -- - +

   df.groupBy("id").apply(describe(["x", "y"], "mean")).show()
# + -- - + -- - +
   # | x | y |
   # + -- - + -- - +
   # | 3.0 | 1.5 |
   # + -- - + -- - +

Slightly modifying @Thomas answer I did the following. Since df.types returns a list of tuples (at least in the latest pandas version) and not a dictionary, I replaced str(dfp.dtypes[key]) with dict(df.dtypes)[key]

def udf_schema_from_pandas(df):
   column_types = [StructField(key, mapping[dict(df.dtypes)[key]]()) for key in df.columns]
schema = StructType(column_types)
return schema

Suggestion : 2

This seems to work for me. The problem anycodings_pyspark is that it is kind of recursive (need to anycodings_pyspark define the function to get the schema, anycodings_pyspark have the schema to define as udf). I anycodings_pyspark solved this by creating a "wrapper" UDF anycodings_pyspark that simply passes the dataframe.,What I do is get a sample pandas df, anycodings_pyspark pass it to the function, and see what anycodings_pyspark returns:,If internal process is somehow based on anycodings_pyspark code generation, your best option is to anycodings_pyspark integrate both logic and schema anycodings_pyspark generation. For example,Unfortunately there is no such option. anycodings_pyspark Schema must be known statically before anycodings_pyspark any component is evaluated, so any form anycodings_pyspark inference, based on actual data, is anycodings_pyspark simply not on the table.

This answer nicely explains how to use anycodings_apache-spark pyspark's groupby and pandas_udf to do anycodings_apache-spark custom aggregations. However, I cannot anycodings_apache-spark possibly declare my schema manually as shown anycodings_apache-spark in this part of the example

from pyspark.sql.types
import *

schema = StructType([
   StructField("key", StringType()),
   StructField("avg_min", DoubleType())
])

Based on Sanxofons comment, I got an anycodings_pyspark idea on how to implement this myself:

from pyspark.sql.types
import *

mapping = {
   "float64": DoubleType,
   "object": StringType,
   "int64": IntegerType
}
# Incomplete - extend with your types.

def createUDFSchemaFromPandas(dfp):
   column_types = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
schema = StructType(column_types)
return schema

What I do is get a sample pandas df, anycodings_pyspark pass it to the function, and see what anycodings_pyspark returns:

dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)

If internal process is somehow based on anycodings_pyspark code generation, your best option is to anycodings_pyspark integrate both logic and schema anycodings_pyspark generation. For example

def describe(cols, fun):
   schema = StructType([StructField(c, DoubleType()) for c in cols])
@pandas_udf(schema, PandasUDFType, PandasUDFType.GROUPED_MAP)
def _(df):
   return df[cols].agg([fun])
return _

df = spark.createDataFrame([(1, 2.0, 1.0, 3.0), (1, 4.0, 2.0, 5.0)], ("id", "x", "y", "z"))

df.groupBy("id").apply(describe(["x"], "mean")).show()
# + -- - +
   # | x |
   # + -- - +
   # | 3.0 |
   # + -- - +

   df.groupBy("id").apply(describe(["x", "y"], "mean")).show()
# + -- - + -- - +
   # | x | y |
   # + -- - + -- - +
   # | 3.0 | 1.5 |
   # + -- - + -- - +

Slightly modifying @Thomas answer I did anycodings_pyspark the following. Since df.types returns a anycodings_pyspark list of tuples (at least in the latest anycodings_pyspark pandas version) and not a dictionary, I anycodings_pyspark replaced str(dfp.dtypes[key]) with anycodings_pyspark dict(df.dtypes)[key]

def udf_schema_from_pandas(df):
   column_types = [StructField(key, mapping[dict(df.dtypes)[key]]()) for key in df.columns]
schema = StructType(column_types)
return schema

Suggestion : 3

For a complete list of the types of operations that can be performed on a DataFrame refer to the API Documentation.,Upgrading from Spark SQL 1.0-1.2 to 1.3 Rename of SchemaRDD to DataFrame Unification of the Java and Scala APIs Isolation of Implicit Conversions and Removal of dsl Package (Scala-only) Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only) UDF Registration Moved to sqlContext.udf (Java & Scala) Python DataTypes No Longer Singletons ,Additionally, the implicit conversions now only augment RDDs that are composed of Products (i.e., case classes or tuples) with a method toDF, instead of applying automatically.,The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.

import org.apache.spark.sql.SparkSession

val spark = SparkSession
   .builder()
   .appName("Spark SQL basic example")
   .config("spark.some.config.option", "some-value")
   .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
   .builder()
   .appName("Java Spark SQL basic example")
   .config("spark.some.config.option", "some-value")
   .getOrCreate();
from pyspark.sql
import SparkSession

spark = SparkSession\
   .builder\
   .appName("Python Spark SQL basic example")\
   .config("spark.some.config.option", "some-value")\
   .getOrCreate()
sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))
val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

   // Displays the content of the DataFrame to stdout
   df.show();
   // +----+-------+
   // | age| name|
   // +----+-------+
   // |null|Michael|
   // | 30| Andy|
   // | 19| Justin|
   // +----+-------+

Suggestion : 4

DataFrame to RDD - rdd(),DataFrame conversions,Create a Spark DataFrame Implicit schema Explicit schema Alternative way to specify schema Persist ,Hyperparameter tuning

[1]:
from pyspark.sql
import SparkSession
[2]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
[3]:
spark = (
   SparkSession.builder
   .master("local")
   .appName("BIOS-823")
   .config("spark.executor.cores", 4)
   .getOrCreate()
)

Suggestion : 5

Upgrading from Spark SQL 1.0-1.2 to 1.3 Rename of SchemaRDD to DataFrame Unification of the Java and Scala APIs Isolation of Implicit Conversions and Removal of dsl Package (Scala-only) Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only) UDF Registration Moved to sqlContext.udf (Java & Scala) Python DataTypes No Longer Singletons ,Isolation of Implicit Conversions and Removal of dsl Package (Scala-only),Unification of the Java and Scala APIs,Upgrading from Spark SQL 1.3 to 1.4 DataFrame data reader/writer interface DataFrame.groupBy retains grouping columns Behavior change on DataFrame.withColumn

   . / sbin / start - thriftserver.sh\
      --conf spark.sql.hive.thriftServer.singleSession = true\
      ...
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg($ "department", max("age"), sum("expense"))

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"))

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"));

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");
import pyspark.sql.functions as func

# In 1.3.x, in order
for the grouping column "department"
to show up,
# it must be included explicitly as part of the agg
function call.
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))

# In 1.4 + , grouping column "department"
is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))

# Revert to 1.3.x behavior(not retaining grouping column) by:
   sqlContext.setConf("spark.sql.retainGroupColumns", "false")
sqlContext.udf.register("strLen", (s: String) => s.length())
sqlContext.udf().register("strLen", (String s) - > s.length(), DataTypes.IntegerType);