用python实现hive函数过程 由于hive_sql无法处理一些复杂的语句比如递归以及缺乏对函数、过程、块的支持,因此需要使用其他的程序来辅助改造,推荐使用python,python有很多开源的库,使用起来很方便而且比较容易上手; 以下是一个使用python实现机构递归的例子作为python应用hive开发的参考:Python操作hive主要用到两个库:pyhive、pandas,Pyhive用于连接到hive,数据,操作数据;Pandas用于数据分析与处理。 1、python处理 #加载包 from pyhive import hive import pandas as pd #定义连接,连接到hive con = hive.connect(host=’*.*.*.*’,port=10000,auth=’KERBEROS’,kerberos_service_name=”*”) #定义游标 cursor=con.cursor() cursor.execute(‘use *’) cursor.execute(‘select org_id from organization’) # fetchall所有数据,还有一个fetchone每次一条数据 vdit=cursor.fetchall() cursor.execute(“select org_id,org_name,org_subtype,parent_org_id,org_level,index_code,status_cd from organization”) result=cursor.fetchall() #表头 index = cursor.description title = list() for i in range(len(index)): #select * from的时候 #title.append(index[i][0].split(‘.’)[1]) ##指定字段 title.append(index[i][0]) vdit_org=pd.DataFrame(result,columns=title) #DataFrame #创建空DataFrame(初始化DataFrame) rows=pd.DataFrame(columns=[‘org_id’,’org_name’,’org_subtype’,’parent_org_id’,’org_level’,’index_code’,’status_cd’,’own_org_id’]) #递归函数 def recursive_org(v_org_id): #检索 vidx_tmp=vdit_org[vdit_org.org_id == v_org_id] #重置索引,索引是连接关联字段 vidx=vidx_tmp.set_index(‘status_cd’,drop=False) global rows if len(vidx)>0: row=pd.concat([vidx,vtail],axis=1) rows=pd.concat([rows,row],axis=0) p_org_id=vidx.loc[‘1000’][3] recursive_org(p_org_id) else: return for i in vdit: v_id=i[0] vtail=pd.DataFrame([[i[0]]],columns=[‘own_org_id’],index=[‘1000’]) recursive_org(v_id) #导出数据到本地路径 rows.to_csv(‘d_organization.csv’,index=None,header=False,sep=’|’) #关闭游标,关闭数据库连接 cursor.close() con.close() 2、将python生成的数据上传到hdfs hadoop fs -put d_organization.csv hdfs://test/test/test/hive/test/python 3、hive中建表,并载入数据 CREATE EXTERNAL TABLE if not exists `tmp_1`( `org_id` decimal(21,0), `org_name` varchar(1000), `org_subtype` varchar(80), `parent_org_id` decimal(21,0), `org_level` decimal(21,0), `index_code` varchar(1000), `status_cd` varchar(80), `own_org_id` decimal(21,0)) ROW FORMAT SERDE ’org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’ WITH SERDEPROPERTIES ( ’field.delim’=’|’, ’serialization.format’=’|’, ’serialization.null.format’=”) STORED AS INPUTFORMAT ’org.apache.hadoop.mapred.TextInputFormat’ OUTPUTFORMAT ’org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat’ LOCATION ’hdfs://test/test/test/hive/test/python/tmp_1′; LOAD DATA INPATH ‘hdfs://test/test/test/hive/test/python/d_organization.csv’ OVERWRITE INTO TABLE tmp_1; 注意:如果有大量数据需要处理可以使用pandas来处理,如果只是简单的判定然后执行一个语句可以直接使用pyhive来执行语句就行: 比如:判定表记录数是否大于0,大于0就执行一个语句 cursor.execute(‘select count(1) x from rb_tmp’) result=cursor.fetchall() cnt= result[0] if cnt>0: cursor.execute(‘create table yb_tmp as select * from rb_tmp’)
2024最新激活全家桶教程,稳定运行到2099年,请移步至置顶文章:https://sigusoft.com/99576.html
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。 文章由激活谷谷主-小谷整理,转载请注明出处:https://sigusoft.com/84553.html