When doing stream processing, a single line contains all the values with a tab character between each value. So string.split(line, "\t") can be used to split the input at each tab, returning just the fields.,Python can be used as a UDF from Hive through the HiveQL TRANSFORM statement. For example, the following HiveQL invokes the hiveudf.py file stored in the default Azure Storage account for the cluster.,Python2.7 is installed by default on HDInsight 3.0 and later. Apache Hive can be used with this version of Python for stream processing. Stream processing uses STDOUT and STDIN to pass data between Hive and the UDF.,Once in the editor, uncomment the following line by removing the # character from the beginning of the line: #from pig_util import outputSchema This line modifies the Python script to work with C Python instead of Jython. Once the change has been made, use Ctrl+X to exit the editor. Select Y, and then Enter to save the changes.
Use scp
to copy the files to your HDInsight cluster. Edit and enter the command below:
scp hiveudf.py sshuser @mycluster - ssh.azurehdinsight.net:
Use SSH to connect to the cluster. Edit and enter the command below:
ssh sshuser @mycluster - ssh.azurehdinsight.net
From the SSH session, add the Python files uploaded previously to the storage for the cluster.
hdfs dfs - put hiveudf.py / hiveudf.py
To connect to Hive, use the following command from your open SSH session:
beeline - u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
Enter the following query at the 0: jdbc:hive2://headnodehost:10001/>
prompt:
add file wasbs: ///hiveudf.py;
SELECT TRANSFORM(clientid, devicemake, devicemodel)
USING 'python hiveudf.py'
AS
(clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;
After entering the last line, the job should start. Once the job completes, it returns output similar to the following example:
100041 RIM 9650 d476f3687700442549a83fac4560c51c 100041 RIM 9650 d476f3687700442549a83fac4560c51c 100042 Apple iPhone 4.2.x 375 ad9a0ddc4351536804f1d5d0ea9b9 100042 Apple iPhone 4.2.x 375 ad9a0ddc4351536804f1d5d0ea9b9 100042 Apple iPhone 4.2.x 375 ad9a0ddc4351536804f1d5d0ea9b9
A colleague recently asked me how to create a custom function for Hive using Python. You can pretty much create a function in any language and plug it into your Hive query using the Hive TRANSFORM clause. TRANSFORM lets you add your own mappers and/or reducers to process the data. The example in this article is working code that I wrote a few years ago using an early version of Hive to demonstrate how to add a custom function. ,In earlier versions of Hive we had to implement our own functions to hash sensitive data for PII compliance. Beginning with Hive 1.3 the SHA2 UDF was added to calculate a hash using SHA-224, SHA-256, SHA-384, or SHA-512. In my example below I create a custom UDF using Python to calculate the SHA-256 hash for social security number. Keep in mind that when I did this there were no out of the box Hive UDF’s available. This example is to only demonstrate how to write your own custom functions for Hive using Python. ,First, we need to write some python code that will read each record passed in from Hive and process the data. Save this to a file:,One thing I wish I had known when starting with python UDF's is that you can write to stderr to assist in debugging. Then look in the Yarn RM for the logs.
First, we need to write some python code that will read each record passed in from Hive and process the data. Save this to a file:
#!/usr/local/bin/python
import hashlib
import sys
# # we are receiving each record passed in from Hive via standard input
# # By
default, columns will be transformed to STRING and delimited by TAB
# # Also, by
default, NULL values will be converted to literal string\ N to differentiate from empty strings
for line in sys.stdin:
line = line.strip()
(customer_no, ssn, plan, join_date, status, balance, region) = line.split('\t')
# # hash social security number and emit all the fields to standard out
x = hashlib.sha256(str(ssn))
ssn = x.hexdigest()
print '\t'.join([str(customer_no), str(ssn), plan, str(join_date), status, str(balance), region])
Now you can call the above python code from your HiveQL:
ADD FILE / path - to - my - script / my_python_code.py;
CREATE VIEW customer_data_mart_view.v_customer_balance
SELECT
TRANSFORM(
customer_no, ssn, plan, join_date, status, balance, region)
USING '/path-to-my-script/my_python_code.py'
AS customer_no, ssn, plan, join_date, status, balance, region
FROM customer_data_mart.customer_details;
One thing I wish I had known when starting with python UDF's is that you can write to stderr to assist in debugging. Then look in the Yarn RM for the logs.
import syssys.stderr.write('>>>> Read a line \n' + line + '\n')
Post last modified:November 2, 2019
import sys
for line in sys.stdin:
line = line.strip('\n\r')
fname, lname = line.split('\t')
firstname = fname.title()
lastname = lname.title()
print '\t'.join([firstname, lastname])
You can use the add FILE command available with Hive command line interface (CLI), we can add the Python script into the Hive’s classpath. Once this is done, we need not alias the script as it can be directly referred to by its name.
0: jdbc: hive2: //> add FILE /home/cloudera/python_scripts/initCap.py
Below is the example of using Python UDF using TRANSFORM command.
SELECT Transform(col1)
using 'python /home/cloudera/python_scripts/initCap.py'
AS(col_names)
FROM(
SELECT 'vithal\tsamp'
AS col1 UNION ALL SELECT 'Ram\tkumar'
AS col1 UNION ALL SELECT 'sam\tKumar'
AS col1) AS a;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 hive = Hive() sql = '' ' CREATE TABLE mydb.mytable( a INT, b STRING, c STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE '' ' hive.write(sql) sql = 'SELECT a, b, c FROM mydb.mytable' hive.read(sql) rows = hive.fetchall()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
hive = Hive() sql = '' ' CREATE TABLE mydb.mytable( a INT, b STRING, c STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE '' ' hive.write(sql) sql = 'SELECT a, b, c FROM mydb.mytable' hive.read(sql) rows = hive.fetchall()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
hive = Hive() sql = '' ' CREATE TABLE mydb.mytable( a INT, b STRING, c STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE '' ' hive.write(sql) sql = 'SELECT a, b, c FROM mydb.mytable' hive.read(sql) rows = hive.fetchall()
1
2
3
4
5
6
1, '{"make": "honda", "price": 1000}'
2, '{"make": "ford", "price": 2000}'
3, '{"make": "ford"}'
4, '{"make": "tesla", "price": 3000}'
5, '{"make": "honda", "price": 2000}'
6, '{"make": "ford", "price": 4000}'
1 2 3 4 5 6
1, '{"make": "honda", "price": 1000}'
2, '{"make": "ford", "price": 2000}'
3, '{"make": "ford"}'
4, '{"make": "tesla", "price": 3000}'
5, '{"make": "honda", "price": 2000}'
6, '{"make": "ford", "price": 4000}'
1 2 3 4 5 6
1, '{"make": "honda", "price": 1000}'
2, '{"make": "ford", "price": 2000}'
3, '{"make": "ford"}'
4, '{"make": "tesla", "price": 3000}'
5, '{"make": "honda", "price": 2000}'
6, '{"make": "ford", "price": 4000}'
This package provides a single function make_udf for working with Hive UDF written in Python.,Hive UDF (user-defined functions) in Python.,Suppose you have a Hive UDF as Python module mypackage.udfs.udf, then insert this UDF into your HiveQL statement as follows,Some test code is included, along with several example UDF modules, which are used in the tests. The user is recommended to run these tests in their Hive setup to confirm that things work as expected.
$ pip install hive - udf
from hive_udf import make_udf import mypackage.udfs.udf s = make_udf(mypackage.udfs.udf) sql = f "" " SELECT TRANSFORM(...input_columns...) USING '{s}' AS(...output_columns...) FROM { db_name }. { table_name } WHERE...
Hive UDFs written in Python have to use Hadoop Streaming data. This allows any executable script to be used as a Mapper or Reducer in a MapReduce job. In order to create Hive UDF, Python must read input and output from the standard input stream. For that, the sys module will be always imported. In fact, this is the only restriction when you write Hive UDF.,In general, your UDF can take any number of parameters which is done inside the pythong script. To call the Python UDF, you will need to use TRANSFORM to pass into Python with Hadoop Streaming.,This will add the file to the resources of Hive environment and be ready to use,Write Python UDF for Hive Register a custom Python UDF in Hive Use a custom Python UDF in Hive
When writing the UDF, Pig needs to know the output data type. The fastest way to let Pig know that is to use the @outputSchema
. Since Java is the main language for Hadoop, and Java is type-dependent, so the UDF will need to have the output data type defined.
from pig_util
import outputSchema
@outputSchema("chararray")
def capitalize(input):
return input.upper()
Hive UDFs written in Python have to use Hadoop Streaming data. This allows any executable script to be used as a Mapper or Reducer in a MapReduce job. In order to create Hive UDF, Python must read input and output from the standard input stream. For that, the sys
module will be always imported. In fact, this is the only restriction when you write Hive UDF.
#!/usr/bin/python
import sys
def capitalize(input):
return input.upper()
for line in sys.stdin:
line = line.strip()
print capitalize(line)
For Hive, you can just simply do an add FILE
like this:
hive(
default) > add FILE / path / to / your / directory / hive_udf_name.py;
Added resources: [/path/to / your / directory / hive_udf_name.py]
Depending on the number columns in the returned output from the UDF, you will need to specify the new columns. In the example above I only return one column. In case you want to return more than one column, you can use the following
print '\t'.join([str(col_1), str(col_2)])