Pyspark读取hive表数据进行处理,将结果插入到hive及hbase

首页 / 新闻资讯 / 正文


项目需要使用pyspark将hive数据进行处理插入到hive表及hbase中,刚开始接触spark,记录了学习的过程帮助初学者。


1.创建参数初始化方法ReadConfig();

读取key:value格式的配置文件,获取“表名”、“列名”、“日期”等相关的参数传递给数据处理类。
配置文件格式如下:
source_table:test_a #源表
target_table:test_b #目标表
columns:A,B,C #列名
base_hdfs_dir:/warehouse/tablespace/managed/hive/test.db/test_a/ #源表hdfs路径
inner_tb_loc:/warehouse/tablespace/managed/hive/test.db/test_b/ #目标表hdfs路径

2.创建一个数据处理类DataDeal():

包括sparksession的初始化方法:CreateSparkSession(self,SessionName);
包括hive数据读取方法:HiveExtenalRead(self);
数据处理方法:InnerTbDeal(self,DataFrame);
插入到hive方法:JsonToHive(self, DataFrame);
插入到hbase方法:JsonToHbaseShc(self, DataFrame);

DataDeal类的初始化

    def __init__(self,source_tb,target_tb,data_dt,column,tb_loc):         self.source_tb = source_tb         self.target_tb = target_tb         self.data_dt = data_dt         self.column=column         self.tb_loc=tb_loc 

1.初始化sparksession

    def CreateSparkSession(self,SessionName):         spark = SparkSession.builder.appName(SessionName).config('spark.executor.memory',                                                              '10g').config('hive.exec.dynamic.partition.mode','nonstrict').config('hive.exec.dynamic.partition','true').enableHiveSupport().getOrCreate()         spark.sparkContext.setLogLevel("WARN")         return spark 

2.读取hive数据

# 读取hive外部表,插入到hive内部表中,后续处理速度快     def HiveExtenalRead(self):         time_start = time.time()         sc = self.CreateSparkSession("HiveExtenalRead")         sql="select "+self.column+" from "+self.source_tb +' where data_dt='+self.data_dt         df=sc.sql(sql)         inner_tb=self.target_tb+"_inner"         #写入指定目录下         df.write.orc(self.tb_loc,'overwrite')         return df 

3.数据处理

def InnerTbDeal(self,DataFrame):         sc = self.CreateSparkSession("InnerTbDeal")         DataFrame.createOrReplaceTempView('InnerTbDeal')         col=DataFrame.columns         strcol = 'select A,B,C from InnerTbDeal'         dt=sc.sql(strcol)         df.show(20) 		return dt 

4.插入到hive

    def JsonToHive(self, DataFrame):         sc = self.CreateSparkSession("JsonToHive")         json_tb_loc=self.tb_loc         DataFrame.write.orc(json_tb_loc, 'overwrite')         return 0 

5.插入到hbase

使用开源shc框架,搜索shc框架下载一份复制到spark各节点就行了。

# Shc框架写入到到hbase     def JsonToHbaseShc(self, DataFrame):         sc = self.CreateSparkSession("JsonToHbaseShc")         table = self.target_tb         cataconf="""{             "table":{"namespace":"名字空间", "name":\""""+表名+"""\"},             "rowkey":"key",             "columns":{                 "A":{"cf":"rowkey", "col":"key", "type":"string"},                 "B":{"cf":"jk_data", "col":"B", "type":"string"},                 "C":{"cf":"jk_data", "col":"C", "type":"string"}             }         }"""         catalog = ''.join(cataconf.split())         data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'         DataFrame.write.options(catalog=catalog,newTable="5").format(data_source_format).save()         return 0 

主函数及参数处理

参数处理方法

def ReadConfig(TbName):     dict = {}     config_loc='./Config/'+TbName+'.ini'     config = open(config_loc, 'r')     for line in config:         if line.find(':') > 0:             strs = line.replace('\n', '').split(':')             dict[strs[0]] = strs[1]     return dict 

主函数

if __name__=='__main__':     #获取输入参数     prop = ReadConfig(sys.argv[1])     data_dt=sys.argv[2]     #内部表hdfs路径     tb_loc=prop['base_hdfs_dir']+prop['inner_tb_loc']+'data_dt='+data_dt     s1=DataDeal(prop['source_table'],prop['target_table'],data_dt,prop['columns'],tb_loc)     dt=s1.HiveExtenalRead()     dt2=s1.InnerTbDeal(dt)     s1.JsonToHive(dt2)     s1.JsonToHbaseShc(dt2) 

结果是简单的,过程是复杂的。