import sys
from awsglue.transforms
import *
from pyspark.context
import SparkContext
from awsglue.context
import GlueContext
from awsglue.dynamicframe
import DynamicFrame
from awsglue.job
import Job
glueContext = GlueContext(SparkContext.getOrCreate())
job = Job(glueContext)
#create dynamic frame
for namefile table
column_name_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
database = "sampledb",
table_name = 'sample1' + 'namefile')
#create dynamic frame
for datafile table
userdata_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
database = 'sampledb',
table_name = 'sample1')
#Generate the applymapping script dynamically and apply it #on our dynamicframe data file mapping = [] for x in range(0, len(userdata_dynamicframe.schema().fields)): mapping.append(( userdata_dynamicframe.schema().fields[x].name, column_name_dynamicframe.schema().fields[x].name )) userdata_dynamicframe = userdata_dynamicframe.apply_mapping(mapping)
bucket_name = "your_bucket_name"
datasink4 =
glueContext.write_dynamic_frame.from_options(
frame = userdata_dynamicframe,
connection_type = "s3",
connection_options = {
"path": f "s3://{bucket-name}/Sample1/Sample1FileRenamed"
},
format = "orc",
transformation_ctx = "datasink4"
)
job.commit()
This example simplifies the names of fields in DynamicFrames created by the Relationalize transform, and then drops the added index and id fields.,Inherited from GlueTransform describeReturn.,To use the Amazon Web Services Documentation, Javascript must be enabled. Please refer to your browser's Help pages for instructions.,If you've got a moment, please tell us how we can make the documentation better.
If the old name has dots in it, RenameField will not work unless you place back-ticks
around it (``
). For example, to replace this.old.name
with
thisNewName
, you would call RenameField as follows:
newDyF = RenameField(oldDyF, "`this.old.name`", "thisNewName")
If the old name has dots in it, RenameField will not work unless you place back-ticks
around it (``
). For example, to replace this.old.name
with
thisNewName
, you would call RenameField as follows:
newDyF = RenameField(oldDyF, "`this.old.name`", "thisNewName")
This example simplifies the names of fields in DynamicFrames created by the Relationalize
transform, and then drops the added index
and id
fields.
dyf_renameField_1 = RenameField.apply(dyf_flattened, "`customers.val.address`", "address")
dyf_renameField_2 = RenameField.apply(dyf_renameField_1, "`customers.val.id`", "cust_id")
dyf_dropfields_rf = DropFields.apply(frame = dyf_renameField_2, paths = ["index", "id"])
dyf_dropfields_rf.toDF().show() +
-- -- -- -- -- -- -- -- -- - + -- -- -- - +
|
address | cust_id |
+ -- -- -- -- -- -- -- -- -- - + -- -- -- - +
|
66 P Street, NY | 343 |
|
708 Fed Ln, CA | 932 |
|
807 Deccan Dr, CA | 102 |
|
108 Park Street, TX | 623 |
|
763 Marsh Ln, TX | 231 |
+ -- -- -- -- -- -- -- -- -- - + -- -- -- - +
The question specifically asks about anycodings_pyspark renaming:,1) First approach is around getting the anycodings_amazon-web-services column names from df extracted from dynamic anycodings_amazon-web-services df,(a) Convert to DataFrame. (b) Create anycodings_pyspark new_columns array with desired column anycodings_pyspark names in same order as old_columns. (c) anycodings_pyspark Overwrite and persist new_columns using anycodings_pyspark functools.reduce() and anycodings_pyspark pyspark.withColumnRenamed(). (d) Convert anycodings_pyspark back to DynamicFrame.,Also given the horrible aws glue anycodings_amazon-web-services documentation I could not come up with a anycodings_amazon-web-services dynamic frame only solution. I have problems anycodings_amazon-web-services getting the column names in dynamic fashion, anycodings_amazon-web-services thus I am utilizing toDF().
1) First approach is around getting the anycodings_amazon-web-services column names from df extracted from dynamic anycodings_amazon-web-services df
relationalize1 = Relationalize.apply(frame = datasource0, transformation_ctx = "relationalize1").select("roottable")
df_relationalize1 = relationalize1.toDF()
for field in df_relationalize1.schema.fields:
relationalize1 = RenameField.apply(frame = relationalize1, old_name = "`" + field.name + "`", new_name = field.name.replace(".", "_"), transformation_ctx = "renamefield_" + field.name)
from awsglue.job import Job from awsglue.context import GlueContext from pyspark.context import SparkContext from functools import reduce JOB_NAME = "csv_to_parquet" sc = SparkContext() glue_context = GlueContext(sc) job = Job(glue_context) job.init(JOB_NAME) # Create DynamicFrame datasource = glue_context.create_dynamic_frame_from_options( connection_type = "s3", format = "csv", connection_options = { "paths": ["s3://path/to/source/file.csv"] }, format_options = { "withHeader": True, "separator": chr(44) }, # comma delimited ) #(a) Convert to DataFrame df = datasource.toDF() #(b) Create array with desired columns old_columns = df.schema.names new_columns = [ field.lower().replace(" ", "_").replace(".", "_") for field in old_columns ] #(c) Overwrite and persist `new_columns` df = reduce( lambda df, idx: df.withColumnRenamed(old_columns[idx], new_columns[idx]), range(len(old_columns)), df, ) #(d) Convert back to DynamicFrame datasource = datasource.fromDF(df, glue_context, "datasource") # Write DynamicFrame as Parquet datasink = glue_context.write_dynamic_frame_from_options( frame = datasource, connection_type = "s3", connection_options = { "path": "s3://path/to/target/prefix/" }, format = "parquet", )
Maybe:
from awsglue.transforms import ApplyMapping # construct renaming mapping for ApplyMapping mappings = list() for field in df.schema.fields: if '.' in field.name: dtype = field.dataType.typeName() mappings.append((field.name, dtype, field.name.replace('.', '_'), dtype)) # apply mapping renamed = ApplyMapping(frame = df, mappings = mappings)
In order to tackle this problem I also rename the column names in the Glue job to exclude the dots and put underscores instead. My question is which approach of the two would be better and why? (Efficiency- memory? execution speed on nodes? etc.).,I load json data and use relationalize method on dynamic dataframe to flatten the otherwise nested json object and saving it into parquet format. The problem is that once saved into parquet format for faster Athena queries, the column names contain dots, which is against the Athena sql query syntax and thus I am unable to make column specific queries.,Also given the horrible aws glue documentation I could not come up with dynamic frame only solution. I have problems getting the column names in dynamic fashion, thus I am utilizing toDF().,I am still interested in removing the dots from column names and thus would like to know what would be good approach of renaming multiple columns in AWS Glue. I changed my approach to first converting the DynamicDataframe to PySpark dataframe and then using piece that I found on stackoverflow.
- First approach is around getting the column names from df extracted from dynamic df
relationalize1 = Relationalize.apply(frame = datasource0, transformation_ctx = "relationalize1").select("roottable")
df_relationalize1 = relationalize1.toDF()
for field in df_relationalize1.schema.fields:
relationalize1 = RenameField.apply(frame = relationalize1, old_name = "`" + field.name + "`", new_name = field.name.replace(".", "_"), transformation_ctx = "renamefield_" + field.name)
Table: parquet_table
root | --name: string | --url: string | --sample.key: string
Query:
SELECT "sample.key"
FROM "parquet_table"
limit 10;
SELECT * FROM "parquet_table"
WHERE "sample.key"
LIKE 'sample%'
limit 10;
I am still interested in removing the dots from column names and thus would like to know what would be good approach of renaming multiple columns in AWS Glue. I changed my approach to first converting the DynamicDataframe to PySpark dataframe and then using piece that I found on stackoverflow.
new_column_name_list = list(map(lambda x: x.replace(".", "_"), df_relationalized.columns)) df_renamed = df_relationalized.toDF( * new_column_name_list)