Based on Sanxofons comment, I got an idea on how to implement this myself:
from pyspark.sql.types import * mapping = { "float64": DoubleType, "object": StringType, "int64": IntegerType } # Incomplete - extend with your types. def createUDFSchemaFromPandas(dfp): column_types = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns] schema = StructType(column_types) return schema
What I do is get a sample pandas df, pass it to the function, and see what returns:
dfp = df_total.limit(100).toPandas() df_return = my_UDF_function(dfp) schema = createUDFSchemaFromPandas(df_return)
If internal process is somehow based on code generation, your best option is to integrate both logic and schema generation. For example
def describe(cols, fun):
schema = StructType([StructField(c, DoubleType()) for c in cols])
@pandas_udf(schema, PandasUDFType, PandasUDFType.GROUPED_MAP)
def _(df):
return df[cols].agg([fun])
return _
df = spark.createDataFrame([(1, 2.0, 1.0, 3.0), (1, 4.0, 2.0, 5.0)], ("id", "x", "y", "z"))
df.groupBy("id").apply(describe(["x"], "mean")).show()
# + -- - +
# | x |
# + -- - +
# | 3.0 |
# + -- - +
df.groupBy("id").apply(describe(["x", "y"], "mean")).show()
# + -- - + -- - +
# | x | y |
# + -- - + -- - +
# | 3.0 | 1.5 |
# + -- - + -- - +
Slightly modifying @Thomas answer I did the following. Since df.types
returns a list of tuples (at least in the latest pandas version) and not a dictionary, I replaced str(dfp.dtypes[key])
with dict(df.dtypes)[key]
def udf_schema_from_pandas(df):
column_types = [StructField(key, mapping[dict(df.dtypes)[key]]()) for key in df.columns]
schema = StructType(column_types)
return schema
This seems to work for me. The problem anycodings_pyspark is that it is kind of recursive (need to anycodings_pyspark define the function to get the schema, anycodings_pyspark have the schema to define as udf). I anycodings_pyspark solved this by creating a "wrapper" UDF anycodings_pyspark that simply passes the dataframe.,What I do is get a sample pandas df, anycodings_pyspark pass it to the function, and see what anycodings_pyspark returns:,If internal process is somehow based on anycodings_pyspark code generation, your best option is to anycodings_pyspark integrate both logic and schema anycodings_pyspark generation. For example,Unfortunately there is no such option. anycodings_pyspark Schema must be known statically before anycodings_pyspark any component is evaluated, so any form anycodings_pyspark inference, based on actual data, is anycodings_pyspark simply not on the table.
This answer nicely explains how to use anycodings_apache-spark pyspark's groupby and pandas_udf to do anycodings_apache-spark custom aggregations. However, I cannot anycodings_apache-spark possibly declare my schema manually as shown anycodings_apache-spark in this part of the example
from pyspark.sql.types
import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_min", DoubleType())
])
Based on Sanxofons comment, I got an anycodings_pyspark idea on how to implement this myself:
from pyspark.sql.types import * mapping = { "float64": DoubleType, "object": StringType, "int64": IntegerType } # Incomplete - extend with your types. def createUDFSchemaFromPandas(dfp): column_types = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns] schema = StructType(column_types) return schema
What I do is get a sample pandas df, anycodings_pyspark pass it to the function, and see what anycodings_pyspark returns:
dfp = df_total.limit(100).toPandas() df_return = my_UDF_function(dfp) schema = createUDFSchemaFromPandas(df_return)
If internal process is somehow based on anycodings_pyspark code generation, your best option is to anycodings_pyspark integrate both logic and schema anycodings_pyspark generation. For example
def describe(cols, fun):
schema = StructType([StructField(c, DoubleType()) for c in cols])
@pandas_udf(schema, PandasUDFType, PandasUDFType.GROUPED_MAP)
def _(df):
return df[cols].agg([fun])
return _
df = spark.createDataFrame([(1, 2.0, 1.0, 3.0), (1, 4.0, 2.0, 5.0)], ("id", "x", "y", "z"))
df.groupBy("id").apply(describe(["x"], "mean")).show()
# + -- - +
# | x |
# + -- - +
# | 3.0 |
# + -- - +
df.groupBy("id").apply(describe(["x", "y"], "mean")).show()
# + -- - + -- - +
# | x | y |
# + -- - + -- - +
# | 3.0 | 1.5 |
# + -- - + -- - +
Slightly modifying @Thomas answer I did anycodings_pyspark the following. Since df.types returns a anycodings_pyspark list of tuples (at least in the latest anycodings_pyspark pandas version) and not a dictionary, I anycodings_pyspark replaced str(dfp.dtypes[key]) with anycodings_pyspark dict(df.dtypes)[key]
def udf_schema_from_pandas(df):
column_types = [StructField(key, mapping[dict(df.dtypes)[key]]()) for key in df.columns]
schema = StructType(column_types)
return schema
For a complete list of the types of operations that can be performed on a DataFrame refer to the API Documentation.,Upgrading from Spark SQL 1.0-1.2 to 1.3 Rename of SchemaRDD to DataFrame Unification of the Java and Scala APIs Isolation of Implicit Conversions and Removal of dsl Package (Scala-only) Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only) UDF Registration Moved to sqlContext.udf (Java & Scala) Python DataTypes No Longer Singletons ,Additionally, the implicit conversions now only augment RDDs that are composed of Products (i.e., case classes or tuples) with a method toDF, instead of applying automatically.,The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
from pyspark.sql
import SparkSession
spark = SparkSession\
.builder\
.appName("Python Spark SQL basic example")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()
sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
DataFrame to RDD - rdd(),DataFrame conversions,Create a Spark DataFrame Implicit schema Explicit schema Alternative way to specify schema Persist ,Hyperparameter tuning
[1]:
from pyspark.sql
import SparkSession
[2]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
[3]:
spark = (
SparkSession.builder
.master("local")
.appName("BIOS-823")
.config("spark.executor.cores", 4)
.getOrCreate()
)
Upgrading from Spark SQL 1.0-1.2 to 1.3 Rename of SchemaRDD to DataFrame Unification of the Java and Scala APIs Isolation of Implicit Conversions and Removal of dsl Package (Scala-only) Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only) UDF Registration Moved to sqlContext.udf (Java & Scala) Python DataTypes No Longer Singletons ,Isolation of Implicit Conversions and Removal of dsl Package (Scala-only),Unification of the Java and Scala APIs,Upgrading from Spark SQL 1.3 to 1.4 DataFrame data reader/writer interface DataFrame.groupBy retains grouping columns Behavior change on DataFrame.withColumn
. / sbin / start - thriftserver.sh\
--conf spark.sql.hive.thriftServer.singleSession = true\
...
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg($ "department", max("age"), sum("expense"))
// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"))
// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));
// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"));
// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");
import pyspark.sql.functions as func
# In 1.3.x, in order
for the grouping column "department"
to show up,
# it must be included explicitly as part of the agg
function call.
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))
# In 1.4 + , grouping column "department"
is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))
# Revert to 1.3.x behavior(not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
sqlContext.udf.register("strLen", (s: String) => s.length())
sqlContext.udf().register("strLen", (String s) - > s.length(), DataTypes.IntegerType);