'How to extract the query result from a Hive job output logs using DataprocHiveOperator?
I am trying to build a data migration pipeline using Airflow, source being a Hive table on a Dataproc cluster and the destination is BigQuery. I'm using DataprocHiveOperator to get the schema as well as the data from source. This operator uses Dataproc REST API's internally to submit and execute the job on a Dataproc cluster we specify. The output would be written to a file Google cloud storage as part of job logs. I need only the result of the query from these logs.
As of now, I have modified the gcp_dataproc_hook.py code to return the output to the calling method by downloading the contents of the output file as string with the help of driverOutputResourceUri parameter. The return type of this output is a Pandas data frame (which can be changed to any other type as per our convenience). But this includes complete logs. I have to extract the query result from it.
Here is the code snippet I added in gcp_dataproc_hook.py to return the output logs of a submitted query:
#download the output
def getOutput(self,project, output_bucket,output_path):
client = storage.Client(project=self.project_id)
bucket = client.get_bucket(output_bucket)
output_blob = ('/'.join(output_path)+"."+"000000000")
return bucket.blob(output_blob).download_as_string()
#get logs including query output
def getQueryResult(self):
result=self.job_ouput
output = self.getOutput(result['reference']['projectId'],result['driverOutputResourceUri'].split('/')[2],result['driverOutputResourceUri'].split('/')[3:])
df = pd.read_csv(io.BytesIO(output), sep='\n|', nrows=500000, engine='python')
return df
Here is a sample query I'm trying to execute:
SHOW CREATE TABLE my_tbl;
The output log looks like this:
Connecting to jdbc:hive2://prod-metastore-test-cluster1-m:10000
0 Connected to: Apache Hive (version 2.3.5)
1 Driver: Hive JDBC (version 2.3.5)
2 Transaction isolation: TRANSACTION_REPEATABLE_...
3 . . . . . . . . . . . . . . . . . . . . . . .>...
4 | createtab_stmt ...
5 +---------------------------------------------...
6 | CREATE TABLE `my_tbl`( ...
7 | `col1` string, ...
8 | `col2` bigint, ...
9 | `col3` string, ...
.. ...
141 | `coln` string) ...
142 | ROW FORMAT SERDE ...
143 | 'org.apache.hadoop.hive.ql.io.orc.OrcSerde...
144 | STORED AS INPUTFORMAT ...
145 | 'org.apache.hadoop.hive.ql.io.orc.OrcInput...
146 | OUTPUTFORMAT ...
147 | 'org.apache.hadoop.hive.ql.io.orc.OrcOutpu...
148 | LOCATION ...
149 | 'gs://my_hive_data_bucket/tmp/base_table/my_tbl...
150 | TBLPROPERTIES ( ...
151 | 'transient_lastDdlTime'='1566842329') ...
152 +---------------------------------------------...
153 143 rows selected (0.154 seconds)
154 Beeline version 2.3.5 by Apache Hive
155 Closing: 0: jdbc:hive2://prod-metastore-test-c...
The expected output should be like this:
CREATE TABLE `my_tbl`(
`col1` string,
`col2` bigint,
`col3` string,
..
`coln` string,
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'gs://my_hive_data_bucket/tmp/base_table/my_tbl'
TBLPROPERTIES (
'transient_lastDdlTime'='1566842329')
Please suggest me a way I could get close to a solution.
Solution 1:[1]
In Dataproc, Hive queries use Beeline instead of the deprecated Hive CLI, which is why the formatting is different by default. Beeline typically will format human-readable output in the fancy border format instead of something more easily parseable.
Fortunately, there are beeline options that can make the format pretty close to what the old Hive CLI did. You can simply create an initialization action that you add to the options when creating the Dataproc cluster specifying init_actions_uris in your Airflow operator. Create a file with the following contents:
#!/bin/bash
sed -i 's/beeline/beeline --outputformat=tsv2 --silent=true/' /usr/bin/beeline
And upload that file to GCS, like gs://some-gcs-bucket/beeline-legacyfmt.sh
and set that GCS URI as an init action for Dataproc clusters. That will apply the command-line options needed to beeline by default. Then, any Dataproc Hive jobs you send will now output in "tsv2" and "silent" mode meaning no extraneous log statements, and output will be raw tsv.
Solution 2:[2]
A late report, for those who find this question:
I recently went through this process, and found that the only way to make this work reliably is to write the output of the query into a storage folder, then read back from the files in the folder. Example:
INSERT OVERWITE DIRECTORY "gs://bucket/path/"
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t"
LINES TERMINATED BY "\n"
SELECT ...
Reading and parsing from the job log was not a reliable design, and the format changed months after I deployed the first version. The job log is for people to read, not for computers to read.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Dennis Huo |
Solution 2 | Jack Parsons |