Spark supports deleting partition, both data and metadata.
Quoting the scala code comment
/**
* Drop Partition in ALTER TABLE: to drop a particular partition for a table.
*
* This removes the data and metadata for this partition.
* The data is actually moved to the .Trash/Current directory if Trash is configured,
* unless 'purge' is true, but the metadata is completely lost.
* An error message will be issued if the partition does not exist, unless 'ifExists' is true.
* Note: purge is always false when the target is a view.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
* }}}
*/
From pyspark, we could run the SQL using the syntax in this link Sample:
df = spark.read.format('parquet').load('Revenue.parquet').registerTempTable("tmp")
spark.sql("ALTER TABLE tmp DROP IF EXISTS PARTITION (month='2015-02-01') PURGE")
Below statement will only delete the metadata related to partition information.
ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date = "2019-08-22");
you need to set the tblproperties for your hive external table as False, if you want to delete the data as well. It will set your hive table as managed table.
alter table db.yourtable set TBLPROPERTIES('EXTERNAL' = 'FALSE');
you can set it back to external table.
alter table db.yourtable set TBLPROPERTIES('EXTERNAL' = 'TRUE');
I am sure there must be someway to do this. I ended up using python. I defined below function in pyspark and it did the job.
query = ""
" hive -e 'alter table db.yourtable set tblproperties ("
EXTERNAL "="
FALSE ");ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="
2019 - 08 - 22 ");' "
""
def delete_partition():
print("I am here")
import subprocess
import sys
p = subprocess.Popen(query, shell = True, stderr = subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
print stderr
sys.exit(1)
>>>
delete_partition()
This will delete the metadata and data both. Note. I have tested this with Hive ORC external partition table, which is partitioned on loaded_date
# Partition Information # col_name data_type comment loaded_date string
Below statement will only delete the anycodings_python metadata related to partition anycodings_python information.,Question: How can I delete the parquet anycodings_parquet folder corresponding to a particular month?,This will delete the metadata and data anycodings_python both. Note. I have tested this with Hive anycodings_python ORC external partition table, which is anycodings_python partitioned on loaded_date,What would be the right way to delete the anycodings_parquet parquet data for a particular month?
I am having monthly Revenue data for the anycodings_parquet last 5 years and I am storing the DataFrames anycodings_parquet for respective months in parquet formats in anycodings_parquet append mode, but partitioned by month anycodings_parquet column. Here is the pseudo-code below -
def Revenue(filename):
df = spark.read.load(filename)
.
.
df.write.format('parquet').mode('append').partitionBy('month').save('/path/Revenue')
Revenue('Revenue_201501.csv')
Revenue('Revenue_201502.csv')
Revenue('Revenue_201503.csv')
Revenue('Revenue_201504.csv')
Revenue('Revenue_201505.csv')
One way would be to load all these parquet anycodings_parquet files in a big df and then use .where() anycodings_parquet clause to filter out that particular month anycodings_parquet and then save it back into parquet format anycodings_parquet partitionBy month in overwrite mode, like anycodings_parquet this -
# If we want to remove data from Feb, 2015 df = spark.read.format('parquet').load('Revenue.parquet') df = df.where(col('month') != lit('2015-02-01')) df.write.format('parquet').mode('overwrite').partitionBy('month').save('/path/Revenue')
Spark supports deleting partition, both anycodings_python data and metadata. Quoting the scala anycodings_python code comment
/**
* Drop Partition in ALTER TABLE: to drop a particular partition for a table.
*
* This removes the data and metadata for this partition.
* The data is actually moved to the .Trash/Current directory if Trash is configured,
* unless 'purge' is true, but the metadata is completely lost.
* An error message will be issued if the partition does not exist, unless 'ifExists' is true.
* Note: purge is always false when the target is a view.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
* }}}
*/
From pyspark, we could run the SQL using anycodings_python the syntax in this link Sample:
df = spark.read.format('parquet').load('Revenue.parquet').registerTempTable("tmp")
spark.sql("ALTER TABLE tmp DROP IF EXISTS PARTITION (month='2015-02-01') PURGE")
Below statement will only delete the anycodings_python metadata related to partition anycodings_python information.
ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date = "2019-08-22");
you need to set the tblproperties for anycodings_python your hive external table as False, if anycodings_python you want to delete the data as well. It anycodings_python will set your hive table as managed anycodings_python table.
alter table db.yourtable set TBLPROPERTIES('EXTERNAL' = 'FALSE');
you can set it back to external table.
alter table db.yourtable set TBLPROPERTIES('EXTERNAL' = 'TRUE');
I am sure there must be someway to do anycodings_python this. I ended up using python. I defined anycodings_python below function in pyspark and it did the anycodings_python job.
query = ""
" hive -e 'alter table db.yourtable set tblproperties ("
EXTERNAL "="
FALSE ");ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="
2019 - 08 - 22 ");' "
""
def delete_partition():
print("I am here")
import subprocess
import sys
p = subprocess.Popen(query, shell = True, stderr = subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
print stderr
sys.exit(1)
>>>
delete_partition()
This will delete the metadata and data anycodings_python both. Note. I have tested this with Hive anycodings_python ORC external partition table, which is anycodings_python partitioned on loaded_date
# Partition Information # col_name data_type comment loaded_date string
Impala allows you to create, manage, and query Parquet tables. Parquet is a column-oriented binary file format intended to be highly efficient for the types of large-scale queries., In this example, the new table is partitioned by year, month, and day. These partition key columns are not part of the data file, so you specify them in the CREATE TABLE statement: ,For example, if you were loading 12 years of data partitioned by year, month, and day, even a value of 4096 might not be high enough., Increase the transceivers value for HDFS, sometimes spelled xcievers (sic). The property value in the hdfs-site.xml configuration file is dfs.datanode.max.transfer.threads. For example, if you were loading 12 years of data partitioned by year, month, and day, even a value of 4096 might not be high enough.
To create a table in the Parquet format, use the STORED AS
PARQUET
clause in the CREATE TABLE
statement. For example:
CREATE TABLE parquet_table_name(x INT, y STRING) STORED AS PARQUET;
Or, to clone the column names and data types of an existing table, use
the LIKE
with the STORED AS PARQUET
clause. For example:
CREATE TABLE parquet_table_name LIKE other_table_name STORED AS PARQUET;
You can derive column definitions from a raw Parquet data file, even without an existing Impala table. For example, you can create an external table pointing to an HDFS directory, and base the column definitions on one of the files in that directory:
CREATE EXTERNAL TABLE ingest_existing_files LIKE PARQUET '/user/etl/destination/datafile1.dat'
STORED AS PARQUET
LOCATION '/user/etl/destination';
In this example, the new table is partitioned by year, month, and day.
These partition key columns are not part of the data file, so you
specify them in the CREATE TABLE
statement:
CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
PARTITION(year INT, month TINYINT, day TINYINT)
STORED AS PARQUET;
INSERT OVERWRITE TABLE parquet_table_name SELECT * FROM other_table_name;
SELECT AVG(income) FROM census_data WHERE state = 'CA';
SELECT * FROM census_data;
For example, if a table is partitioned by columns YEAR, MONTH, and DAY, then WHERE clauses such as WHERE year = 2013, WHERE year < 2010, or WHERE year BETWEEN 1995 AND 1998 allow Impala to skip the data files in all partitions outside the specified range. Likewise, WHERE year = 2013 AND month BETWEEN 1 AND 3 could prune even more partitions, reading the data files for only a portion of one year. , Data that already passes through an extract, transform, and load (ETL) pipeline. The values of the partitioning columns are stripped from the original data files and represented by directory names, so loading data into a partitioned table involves some sort of transformation or preprocessing. , The data type of the partition columns does not have a significant effect on the storage required, because the values from those columns are not stored in the data files, rather they are represented as strings inside HDFS directory names. , The data type of the partition columns does not have a significant effect on the storage required, because the values from those columns are not stored in the data files, rather they are represented as strings inside HDFS directory names.
Specifying all the partition columns in a SQL statement is called static partitioning, because the statement affects a
single predictable partition. For example, you use static partitioning with an ALTER TABLE
statement that affects
only one partition, or with an INSERT
statement that inserts all values into the same partition:
insert into t1 partition(x = 10, y = 'a') select c1 from some_other_table;
When you specify some partition key columns in an INSERT
statement, but leave out the values, Impala determines
which partition to insert. This technique is called dynamic partitioning:
insert into t1 partition(x, y = 'b') select c1, c2 from some_other_table;
--Create new partition
if necessary based on variable year, month, and day;
insert a single value.
insert into weather partition(year, month, day) select 'cloudy', 2014, 4, 21;
--Create new partition
if necessary
for specified year and month but variable day;
insert a single value.
insert into weather partition(year = 2014, month = 04, day) select 'sunny', 22;
To check the effectiveness of partition pruning for a query, check the EXPLAIN
output for the query before
running it. For example, this example shows a table with 3 partitions, where the query only reads 1 of them. The notation
#partitions=1/3
in the EXPLAIN
plan confirms that Impala can do the appropriate partition
pruning.
[localhost: 21000] > insert into census partition(year = 2010) values('Smith'), ('Jones');
[localhost: 21000] > insert into census partition(year = 2011) values('Smith'), ('Jones'), ('Doe');
[localhost: 21000] > insert into census partition(year = 2012) values('Smith'), ('Doe');
[localhost: 21000] > select name from census where year = 2010; +
-- -- -- - +
|
name |
+ -- -- -- - +
|
Smith |
|
Jones |
+ -- -- -- - +[localhost: 21000] > explain select name from census where year = 2010; +
-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +
|
Explain String |
+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +
|
PLAN FRAGMENT 0 |
|
PARTITION: UNPARTITIONED |
|
|
|
1: EXCHANGE |
|
|
|
PLAN FRAGMENT 1 |
|
PARTITION: RANDOM |
|
|
|
STREAM DATA SINK |
|
EXCHANGE ID: 1 |
|
UNPARTITIONED |
|
|
|
0: SCAN HDFS |
|
table = predicate_propagation.census #partitions = 1 / 3 size = 12 B |
+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +
Impala can even do partition pruning in cases where the partition key column is not directly compared to a constant, by applying
the transitive property to other parts of the WHERE
clause. This technique is known as predicate propagation, and
is available in Impala 1.2.2 and later. In this example, the census table includes another column indicating when the data was
collected, which happens in 10-year intervals. Even though the query does not compare the partition key column
(YEAR
) to a constant value, Impala can deduce that only the partition YEAR=2010
is required, and
again only reads 1 out of 3 partitions.
[localhost: 21000] > drop table census;
[localhost: 21000] > create table census(name string, census_year int) partitioned by(year int);
[localhost: 21000] > insert into census partition(year = 2010) values('Smith', 2010), ('Jones', 2010);
[localhost: 21000] > insert into census partition(year = 2011) values('Smith', 2020), ('Jones', 2020), ('Doe', 2020);
[localhost: 21000] > insert into census partition(year = 2012) values('Smith', 2020), ('Doe', 2020);
[localhost: 21000] > select name from census where year = census_year and census_year = 2010; +
-- -- -- - +
|
name |
+ -- -- -- - +
|
Smith |
|
Jones |
+ -- -- -- - +[localhost: 21000] > explain select name from census where year = census_year and census_year = 2010; +
-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +
|
Explain String |
+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +
|
PLAN FRAGMENT 0 |
|
PARTITION: UNPARTITIONED |
|
|
|
1: EXCHANGE |
|
|
|
PLAN FRAGMENT 1 |
|
PARTITION: RANDOM |
|
|
|
STREAM DATA SINK |
|
EXCHANGE ID: 1 |
|
UNPARTITIONED |
|
|
|
0: SCAN HDFS |
|
table = predicate_propagation.census #partitions = 1 / 3 size = 22 B |
|
predicates: census_year = 2010, year = census_year |
+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +
For example, if partition key columns are compared to literal values in a WHERE
clause, Impala can perform static
partition pruning during the planning phase to only read the relevant partitions:
--The query only needs to read 3 partitions whose key values are known ahead of time. --That 's static partition pruning. SELECT COUNT( * ) FROM sales_table WHERE year IN(2005, 2010, 2015);
Dynamic partition pruning involves using information only available at run time, such as the result of a subquery. The following example shows a simple dynamic partition pruning.
CREATE TABLE yy(s STRING) PARTITIONED BY(year INT);
INSERT INTO yy PARTITION(year) VALUES('1999', 1999), ('2000', 2000),
('2001', 2001), ('2010', 2010), ('2018', 2018);
COMPUTE STATS yy;
CREATE TABLE yy2(s STRING, year INT);
INSERT INTO yy2 VALUES('1999', 1999), ('2000', 2000), ('2001', 2001);
COMPUTE STATS yy2;
--The following query reads an unknown number of partitions, whose key values
--are only known at run time.The runtime filters line shows the
--information used in query fragment 02 to decide which partitions to skip.
EXPLAIN SELECT s FROM yy WHERE year IN(SELECT year FROM yy2); +
-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +
|
PLAN - ROOT SINK |
|
| |
|
04: EXCHANGE[UNPARTITIONED] |
|
| |
|
02: HASH JOIN[LEFT SEMI JOIN, BROADCAST] |
|
| hash predicates: year = year |
|
| runtime filters: RF000 < -year |
|
| |
|
| --03: EXCHANGE[BROADCAST] |
|
| | |
|
| 01: SCAN HDFS[
default.yy2] |
|
| partitions = 1 / 1 files = 1 size = 620 B |
|
| |
|
00: SCAN HDFS[
default.yy] |
|
partitions = 5 / 5 files = 5 size = 1.71 KB |
|
runtime filters: RF000 - > year |
+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +
SELECT s FROM yy WHERE year IN(SELECT year FROM yy2);
--Returns 3 rows from yy
PROFILE;
The Filter summary in the PROFILE
output shows that
the scan node filtered out based on a runtime filter of dynamic
partition pruning.
Filter 0(1.00 MB):
-Files processed: 3 -
Files rejected: 1(1) -
Files total: 3(3)
For example, here is how you might switch from text to Parquet data as you receive data for different years:
[localhost: 21000] > create table census(name string) partitioned by(year smallint);
[localhost: 21000] > alter table census add partition(year = 2012);
--Text format;
[localhost: 21000] > alter table census add partition(year = 2013);
--Text format switches to Parquet before data loaded;
[localhost: 21000] > alter table census partition(year = 2013) set fileformat parquet;
[localhost: 21000] > insert into census partition(year = 2012) values('Smith'), ('Jones'), ('Lee'), ('Singh');
[localhost: 21000] > insert into census partition(year = 2013) values('Flores'), ('Bogomolov'), ('Cooper'), ('Appiah');
“FilenamePartitioning”: this scheme expects the partitions will have filenames containing the field values separated by “_”. For example, given schema<year:int16, month:int8, day:int8>, a possible partition filename “2009_11_part-0.parquet” would be parsed to (“year”_ == 2009 and “month”_ == 11).,“DirectoryPartitioning”: this scheme expects one segment in the file path for each field in the specified schema (all fields are required to be present). For example given schema<year:int16, month:int8> the path “/2009/11” would be parsed to (“year”_ == 2009 and “month”_ == 11).,“HivePartitioning”: a scheme for “/$key=$value/” nested directories as found in Apache Hive. This is a multi-level, directory based partitioning scheme. Data is partitioned by static values of a particular column in the schema. Partition keys are represented in the form $key=$value in directory names. Field order is ignored, as are missing or unrecognized field names. For example, given schema<year:int16, month:int8, day:int8>, a possible path would be “/year=2009/month=11/day=15” (but the field order does not need to match).,The schema that describes the partitions present in the file path. If not specified, and field_names and/or flavor are specified, the schema will be inferred from the file path (and a PartitioningFactory is returned).
>>>
import pyarrow as pa
>>>
import pyarrow.dataset as ds >>>
part = ds.partitioning(pa.schema([("year", pa.int16()),
...("month", pa.string())
]))
>>> part = ds.partitioning(field_names = ["year", "month"])
>>> part = ds.partitioning(
...pa.schema([
...("year", pa.int16()),
...("month", pa.dictionary(pa.int8(), pa.string()))
...
]),
...dictionaries = {
..."month": pa.array(["January", "February", "March"]),
...
})
>>> part = ds.partitioning(
...pa.schema([
...("year", pa.int16()),
...("month", pa.dictionary(pa.int8(), pa.string()))
...
]),
...dictionaries = "infer")
>>> part = ds.partitioning(
...pa.schema([("year", pa.int16()), ("month", pa.int8())]),
...flavor = "hive")
>>> part = ds.partitioning(flavor = "hive")