I figured it out. Thanks for the help.
def res(df):
if df['data_type_x'] == df['data_type_y']:
return 'no change'
elif pd.isnull(df['data_type_x']):
return 'new attribute'
elif pd.isnull(df['data_type_y']):
return 'deleted attribute'
elif df['data_type_x'] != df['data_type_y'] and not pd.isnull(df['data_type_x']) and not pd.isnull(df['data_type_y']):
return 'datatype change'
pd_merge['result'] = pd_merge.apply(res, axis = 1)
Because you are setting these up as Pandas DataFrames and not Spark DataFrames. For joins with Pandas DataFrames, you would want to use
DataFrame_output = DataFrame.join(other, on = None, how = 'left', lsuffix = '', rsuffix = '', sort = False)
Run this to understand what DataFrame it is.
type(df)
To use withColumn
, you would need Spark DataFrames. If you want to convert the DataFrames, use this:
import pyspark
from pyspark.sql
import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
df = spark.createDataFrame(pd_df1)
AttributeError: 'DataFrame' object has no attribute 'saveAsTextFile',Can someone take a look at the code and let me know where I'm going wrong:,As the error message states, the object, either a DataFrame or List does not have the saveAsTextFile() method., Pyspark issue AttributeError: 'DataFrame' object h...
# % % import findspark findspark.init('/home/packt/spark-2.1.0-bin-hadoop2.7') from pyspark.sql import SparkSession def main(): spark = SparkSession.builder.appName('aggs').getOrCreate() df = spark.read.csv('/home/packt/Downloads/Spark_DataFrames/sales_info.csv', inferSchema = True, header = True) df.createOrReplaceTempView('sales_info') example8 = spark.sql("" "SELECT * FROM sales_info ORDER BY Sales DESC "" ") example8.saveAsTextFile("juyfd") main()
# % % import findspark findspark.init('/home/packt/spark-2.1.0-bin-hadoop2.7') from pyspark.sql import SparkSession spark = SparkSession.builder.appName('ops').getOrCreate() df = spark.read.csv('/home/packt/Downloads/Spark_DataFrames/Person_Person.csv', inferSchema = True, header = True) df.createOrReplaceTempView('Person_Person') myresults = spark.sql("" "SELECT PersonType, COUNT(PersonType) AS `Person Count` FROM Person_Person GROUP BY PersonType "" ") myresults.collect() result = myresults.collect() result result.saveAsTextFile("test")
Problem: In PySpark I am getting error AttributeError: ‘DataFrame’ object has no attribute ‘map’ when I use map() transformation on DataFrame.,PySpark DataFrame doesn’t have a map() transformation instead it’s present in RDD hence you are getting the error AttributeError: ‘DataFrame’ object has no attribute ‘map’,So first, Convert PySpark DataFrame to RDD using df.rdd, apply the map() transformation which returns an RDD and Convert RDD to DataFrame back, let’s see with an example.,SparkSpark RDDSpark DataFrameSpark SQL FunctionsWhat’s New in Spark 3.0?Spark StreamingApache Spark Interview Questions
Problem: In PySpark I am getting error AttributeError: ‘DataFrame’ object has no attribute ‘map’ when I use map() transformation on DataFrame.
df2 = df.map(lambda x: [x[0], x[1]])
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\sql\dataframe.py", line 1401, in __getattr__ "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
AttributeError: 'DataFrame'
object has no attribute 'map'
So first, Convert PySpark DataFrame to RDD using df.rdd
, apply the map() transformation which returns an RDD and Convert RDD to DataFrame back, let’s see with an example.
data = [('James', 3000), ('Anna', 4001), ('Robert', 6200)] df = spark.createDataFrame(data, ["name", "salary"]) df.show() #converts DataFrame to rdd rdd = df.rdd print(rdd.collect()) # apply map() transformation) rdd2 = df.rdd.map(lambda x: [x[0], x[1] * 20 / 100]) print(rdd2.collect()) #conver RDD to DataFrame df2 = rdd2.toDF(["name", "bonus"]) df2.show()
The syntax you are using is for a pandas DataFrame. To achieve this for a spark DataFrame, you should use the withColumn() method. This works great for a wide range of well defined DataFrame functions, but it's a little more complicated for user defined mapping functions. ,This works because when() returns null by default if the condition is not met, and coalesce() will pick the first non-null value it encounters. Since the keys of the map are unique, at most one column will be non-null.,In your specific case, you want to use a dictionary to translate the values of your DataFrame. ,Python – Adding new column to existing DataFrame in Python pandas
The new column is expected to be a transformation from an existing column, that can be done doing a lookup in a dictionary/hashmap.
# Loading data df = sqlContext.read.format(...).load(train_df_path) # Instanciating the map some_map = { 'a': 0, 'b': 1, 'c': 1, } # Creating a new column using the map df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis = 1)
Which leads to the following error:
AttributeErrorTraceback (most recent call last)
<ipython-input-12-aeee412b10bf> in <module>()
25 df= train_df
26
---> 27 df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)
/usr/lib/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
962 if name not in self.columns:
963 raise AttributeError(
--> 964 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
965 jc = self._jdf.apply(name)
966 return Column(jc)
AttributeError: 'DataFrame' object has no attribute 'apply'
In order to define a udf
, you need to specify the output data type. For instance, if you wanted to apply a function my_func
that returned a string
, you could create a udf
as follows:
import pyspark.sql.functions as f
my_udf = f.udf(my_func, StringType())
Then you can use my_udf
to create a new column like:
df = df.withColumn('new_column', my_udf(f.col("some_column_name")))
Another option is to use select
:
df = df.select("*", my_udf(f.col("some_column_name")).alias("new_column"))
Notice that I used dict.get()
because you want your udf
to be robust to bad inputs.
df = df.withColumn('new_column', some_map_udf(f.col("some_column_name")))
The trick is to iterate over the items in some_map
to create a list of pyspark.sql.functions.when()
functions.
some_map_func = [f.when(f.col("some_column_name") == k, v) for k, v in some_map.items()]
print(some_map_func)
#[Column<CASE WHEN (some_column_name=a) THEN 0 END>,
# Column<CASE WHEN (some_column_name=c) THEN 1 END>,
# Column<CASE WHEN (some_column_name=b) THEN 1 END>]
03/11/2022
You are selecting columns from a DataFrame and you get an error message.
ERROR: AttributeError: 'function'
object has no attribute '_get_object_id' in job
This sample code uses summary as a column name and generates the error message when run.
df = spark.createDataFrame([1, 2], "int").toDF("id")
df.show()
from pyspark.sql.types
import StructType, StructField, StringType, IntegerType
df1 = spark.createDataFrame(
[(10, ), (11, ), (13, )],
StructType([StructField("summary", IntegerType(), True)]))
df1.show()
ResultDf = df1.join(df, df1.summary == df.id, "inner").select(df.id, df1.summary)
ResultDf.show()
If you must use protected keywords, you should use bracket based column access when selecting columns from a DataFrame. Do not use dot notation when selecting columns that use protected keywords.
ResultDf = df1.join(df, df1["summary"] == df.id, "inner").select(df.id, df1["summary"])
AttributeError: ‘dataframe’ object has no attribute ‘str’Get a Series from a DataFrame, Suf https://researchdatapod.com/author/soofyserial/ How to Add a New Column to an Existing Pandas DataFrame in Python , Suf https://researchdatapod.com/author/soofyserial/ How to Solve R Error: plot.new has not been called yet , Suf https://researchdatapod.com/author/soofyserial/ How to Iterate Through a Vector in C++
Get a Series from a DataFrame
import pandas as pd
data = [
['Jim', 21],
['Patrice', 45],
['Louise', 19]
]
df = pd.DataFrame(data, columns = ['Name', 'Age'])
names = df['Name']
type(df)
type(names)
import pandas as pd data = [['Jim', 21], ['Patrice', 45], ['Louise', 19]] df = pd.DataFrame(data, columns = ['Name', 'Age']) names = df['Name'] type(df) type(names)
pandas.core.frame.DataFrame pandas.core.series.Series
We can access the str attribute with the names variable but not the df variable.
names.str.replace('Patrice', 'Ulysses')
print(names)
df.str.replace('Patrice', 'Ulysses')
print(df)
margherita, £7.99 pepperoni, £8.99 four cheeses, £10.99 funghi, £8.99 tartufo, £14.99 porcino, £11.75 vegetarian, £10.99
We will read the CSV into a DataFrame using pandas.read_csv
and then attempt to extract a specific pizza based on its name.
import pandas as pd
df = pd.read_csv('new_pizzas.csv')
df