项目需要使用pyspark将hive数据进行处理插入到hive表及hbase中,刚开始接触spark,记录了学习的过程帮助初学者。
读取key:value格式的配置文件,获取“表名”、“列名”、“日期”等相关的参数传递给数据处理类。
配置文件格式如下:
source_table:test_a #源表
target_table:test_b #目标表
columns:A,B,C #列名
base_hdfs_dir:/warehouse/tablespace/managed/hive/test.db/test_a/ #源表hdfs路径
inner_tb_loc:/warehouse/tablespace/managed/hive/test.db/test_b/ #目标表hdfs路径
包括sparksession的初始化方法:CreateSparkSession(self,SessionName);
包括hive数据读取方法:HiveExtenalRead(self);
数据处理方法:InnerTbDeal(self,DataFrame);
插入到hive方法:JsonToHive(self, DataFrame);
插入到hbase方法:JsonToHbaseShc(self, DataFrame);
DataDeal类的初始化
def __init__(self,source_tb,target_tb,data_dt,column,tb_loc): self.source_tb = source_tb self.target_tb = target_tb self.data_dt = data_dt self.column=column self.tb_loc=tb_loc
def CreateSparkSession(self,SessionName): spark = SparkSession.builder.appName(SessionName).config('spark.executor.memory', '10g').config('hive.exec.dynamic.partition.mode','nonstrict').config('hive.exec.dynamic.partition','true').enableHiveSupport().getOrCreate() spark.sparkContext.setLogLevel("WARN") return spark
# 读取hive外部表,插入到hive内部表中,后续处理速度快 def HiveExtenalRead(self): time_start = time.time() sc = self.CreateSparkSession("HiveExtenalRead") sql="select "+self.column+" from "+self.source_tb +' where data_dt='+self.data_dt df=sc.sql(sql) inner_tb=self.target_tb+"_inner" #写入指定目录下 df.write.orc(self.tb_loc,'overwrite') return df
def InnerTbDeal(self,DataFrame): sc = self.CreateSparkSession("InnerTbDeal") DataFrame.createOrReplaceTempView('InnerTbDeal') col=DataFrame.columns strcol = 'select A,B,C from InnerTbDeal' dt=sc.sql(strcol) df.show(20) return dt
def JsonToHive(self, DataFrame): sc = self.CreateSparkSession("JsonToHive") json_tb_loc=self.tb_loc DataFrame.write.orc(json_tb_loc, 'overwrite') return 0
使用开源shc框架,搜索shc框架下载一份复制到spark各节点就行了。
# Shc框架写入到到hbase def JsonToHbaseShc(self, DataFrame): sc = self.CreateSparkSession("JsonToHbaseShc") table = self.target_tb cataconf="""{ "table":{"namespace":"名字空间", "name":\""""+表名+"""\"}, "rowkey":"key", "columns":{ "A":{"cf":"rowkey", "col":"key", "type":"string"}, "B":{"cf":"jk_data", "col":"B", "type":"string"}, "C":{"cf":"jk_data", "col":"C", "type":"string"} } }""" catalog = ''.join(cataconf.split()) data_source_format = 'org.apache.spark.sql.execution.datasources.hbase' DataFrame.write.options(catalog=catalog,newTable="5").format(data_source_format).save() return 0
参数处理方法
def ReadConfig(TbName): dict = {} config_loc='./Config/'+TbName+'.ini' config = open(config_loc, 'r') for line in config: if line.find(':') > 0: strs = line.replace('\n', '').split(':') dict[strs[0]] = strs[1] return dict
主函数
if __name__=='__main__': #获取输入参数 prop = ReadConfig(sys.argv[1]) data_dt=sys.argv[2] #内部表hdfs路径 tb_loc=prop['base_hdfs_dir']+prop['inner_tb_loc']+'data_dt='+data_dt s1=DataDeal(prop['source_table'],prop['target_table'],data_dt,prop['columns'],tb_loc) dt=s1.HiveExtenalRead() dt2=s1.InnerTbDeal(dt) s1.JsonToHive(dt2) s1.JsonToHbaseShc(dt2)
结果是简单的,过程是复杂的。