Assuming that data frame is sorted to match order of values in an array you can zip RDDs and rebuild data frame as follows:
n = sparkdf.rdd.getNumPartitions() # Parallelize and cast to plain integer(np.int64 won 't work) new_col = sc.parallelize(np.array([20, 20, 20, 20]), n).map(int) def process(pair): return dict(pair[0].asDict().items() + [("new_col", pair[1])]) rdd = (sparkdf .rdd # Extract RDD .zip(new_col) # Zip with new col .map(process)) # Add new column sqlContext.createDataFrame(rdd) # Rebuild data frame
You can also use joins:
new_col = sqlContext.createDataFrame( zip(range(1, 5), [20] * 4), ("rn", "new_col")) sparkdf.registerTempTable("df") sparkdf_indexed = sqlContext.sql( # Make sure we have specific order and add row number "SELECT row_number() OVER (ORDER BY a, b, c) AS rn, * FROM df") (sparkdf_indexed .join(new_col, new_col.rn == sparkdf_indexed.rn) .drop(new_col.rn))
Of course if all you need is a column of a single value you can simply use lit
import pyspark.sql.functions as f
sparkdf.withColumn("new_col", f.lit(20))
Last Updated : 13 Jan, 2022
Syntax:
dataframe.withColumn("column_name", lit(value))
Syntax:
if 'column_name'
not in dataframe.columns:
dataframe.withColumn("column_name", lit(value))
In this PySpark article, I will explain different ways of how to add a new column to DataFrame using withColumn(), select(), sql(), Few ways include adding a constant column with a default value, derive based out of another column, add a column with NULL/None value, add multiple columns e.t.c,In this article, you have learned multiple ways to add a new column to PySpark DataFrame that includes adding a constant column, based on the existing column, when a column not exists, add multiple columns with Python examples.,So far most of the examples covered above use withColumn() to add a column, you can also achieve all the above examples using select() transformation.,You can add multiple columns to PySpark DataFrame in several ways if you wanted to add a known set of columns you can easily do it by chaining withColumn() or using select(). However, sometimes you may need to add multiple columns after applying some transformations, In that case, you can use either map() or foldLeft(). Let’s see an example with a map.
To see all these with examples first, let’s create a PySpark DataFrame.
from pyspark.sql
import SparkSession
spark = SparkSession.builder\
.appName('SparkByExamples.com')\
.getOrCreate()
data = [('James', 'Smith', 'M', 3000), ('Anna', 'Rose', 'F', 4100),
('Robert', 'Williams', 'M', 6200)
]
columns = ["firstname", "lastname", "gender", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.show() +
-- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- +
|
firstname | lastname | gender | salary |
+ -- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- +
|
James | Smith | M | 3000 |
|
Anna | Rose | F | 4100 |
|
Robert | Williams | M | 6200 |
+ -- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- +
# Add new constanct column from pyspark.sql.functions import lit df.withColumn("bonus_percent", lit(0.3))\ .show() + -- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- + -- -- -- -- -- -- - + | firstname | lastname | gender | salary | bonus_percent | + -- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- + -- -- -- -- -- -- - + | James | Smith | M | 3000 | 0.3 | | Anna | Rose | F | 4100 | 0.3 | | Robert | Williams | M | 6200 | 0.3 | + -- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- + -- -- -- -- -- -- - + # Add New column with NULL df.withColumn("DEFAULT_COL", lit(None))\ .show()
You can also add a column to DataFrame based on another existing column value, this is most used way.
#Add column from existing column
df.withColumn("bonus_amount", df.salary * 0.3)\
.show()
+
-- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- + -- -- -- -- -- -- +
|
firstname | lastname | gender | salary | bonus_amount |
+ -- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- + -- -- -- -- -- -- +
|
James | Smith | M | 3000 | 900.0 |
|
Anna | Rose | F | 4100 | 1230.0 |
|
Robert | Williams | M | 6200 | 1860.0 |
+ -- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- + -- -- -- -- -- -- +
#Add column by concatinating existing columns
from pyspark.sql.functions
import concat_ws
df.withColumn("name", concat_ws(",", "firstname", 'lastname'))\
.show() +
-- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- + -- -- -- -- -- -- -- - +
|
firstname | lastname | gender | salary | name |
+ -- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- + -- -- -- -- -- -- -- - +
|
James | Smith | M | 3000 | James, Smith |
|
Anna | Rose | F | 4100 | Anna, Rose |
|
Robert | Williams | M | 6200 | Robert, Williams |
+ -- -- -- -- - + -- -- -- -- + -- -- -- + -- -- -- + -- -- -- -- -- -- -- - +
if 'dummy'
not in df.columns:
df.withColumn("dummy", lit(None))
I don’t have a real-time scenario to add multiple columns, below is just a skeleton on how to use. I will update this once I have a Scala example.
//Let's assume DF has just 3 columns c1,c2,c3
df2 = df.rdd.map(row => {
//apply transformation on these columns and derive multiple columns
//and store these column vlaues into c5,c6,c7,c8,c9,10
(c1, c2, c5, c6, c7, c8, c9, c10)
})
Here is the code to create a pyspark.sql DataFrame, 2 days ago Aug 06, 2022 · How do you add a value to a DataFrame in PySpark? In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() ... Defines the structure of the Dataframe. PySpark provides from pyspark. sql. types import StructType class to define the structure of the DataFrame. StructType is a ... ,fails. Probably an udf is the way to go, but I don't know how to create an udf that assigns one different value per DataFrame row, i.e. that iterates through new_col. I have looked at other pyspark and pyspark.sql but couldn't find a solution. Also I need to stay within pyspark.sql so not a scala solution. Thanks!, To create a numpy array from the pyspark dataframe, you can use: You can convert it to a pandas dataframe using toPandas (), and you can then convert it to numpy array using .values. I haven't tested this, but assuming it would work the same as numpy (might have inconsistencies):
import numpy as np
import pandas as pd from pyspark
import SparkContext from pyspark.sql
import SQLContext df = pd.DataFrame(np.array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
[10, 11, 12]
]), columns = ['a', 'b', 'c']) sparkdf = sqlContext.createDataFrame(df, samplingRatio = 0.1)
Get the existing SQLContext or create a new one with given SparkContext.,pyspark.sql.Column A column expression in a DataFrame.,Create a DataFrame with single pyspark.sql.types.LongType column named id, containing elements in a range from start to end (exclusive) with step value step.,The column expression must be an expression over this DataFrame; attempting to add a column from some other dataframe will raise an error.
>>> spark = SparkSession.builder\
....master("local")\
....appName("Word Count")\
....config("spark.some.config.option", "some-value")\
....getOrCreate()
>>> from pyspark.conf
import SparkConf
>>>
SparkSession.builder.config(conf = SparkConf()) <
pyspark.sql.session...
>>> SparkSession.builder.config("spark.some.config.option", "some-value") <
pyspark.sql.session...
>>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate() >>>
s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1"
True
>>> s2 = SparkSession.builder.config("k2", "v2").getOrCreate() >>>
s1.conf.get("k1") == s2.conf.get("k1")
True
>>>
s1.conf.get("k2") == s2.conf.get("k2")
True
>>> l = [('Alice', 1)] >>>
spark.createDataFrame(l).collect()[Row(_1 = 'Alice', _2 = 1)] >>>
spark.createDataFrame(l, ['name', 'age']).collect()[Row(name = 'Alice', age = 1)]
Updated Aug 11, 2022
import numpy as np import pandas as pd # Enable Arrow - based columnar data transfers spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # Generate a pandas DataFrame pdf = pd.DataFrame(np.random.rand(100, 3)) # Create a Spark DataFrame from a pandas DataFrame using Arrow df = spark.createDataFrame(pdf) # Convert the Spark DataFrame back to a pandas DataFrame using Arrow result_pdf = df.select("*").toPandas()