DataFrame을 Hive에 직접 저장하는 방법은 무엇입니까?
DataFrame
Spark에서 Hive에 직접 저장할 수 있습니까?
변환 DataFrame
을 시도한 Rdd
다음 텍스트 파일로 저장 한 다음 하이브에로드했습니다. 하지만 dataframe
벌집에 직접 저장할 수 있는지 궁금합니다.
메모리 내 임시 테이블을 생성하고 sqlContext를 사용하여 하이브 테이블에 저장할 수 있습니다.
데이터 프레임이 myDf라고 가정 해 보겠습니다. 다음을 사용하여 하나의 임시 테이블을 만들 수 있습니다.
myDf.createOrReplaceTempView("mytempTable")
그런 다음 간단한 하이브 문을 사용하여 테이블을 만들고 임시 테이블에서 데이터를 덤프 할 수 있습니다.
sqlContext.sql("create table mytable as select * from mytempTable");
사용 DataFrameWriter.saveAsTable
. ( df.write.saveAsTable(...)
) Spark SQL 및 DataFrame 가이드를 참조하십시오 .
df.write.saveAsTable(...)
Spark 2.0 문서에서 더 이상 사용되지 않는 항목이 표시 되지 않습니다. Amazon EMR에서 우리를 위해 일했습니다. 우리는 S3의 데이터를 데이터 프레임으로 읽고, 처리하고, 결과에서 테이블을 만들고, MicroStrategy로 읽을 수있었습니다. Vinays 답변도 작동했습니다.
HiveContext를 가지고 / 생성해야합니다.
import org.apache.spark.sql.hive.HiveContext;
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
그런 다음 데이터 프레임을 직접 저장하거나 하이브 테이블로 저장할 열을 선택합니다.
df는 데이터 프레임입니다.
df.write().mode("overwrite").saveAsTable("schemaName.tableName");
또는
df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
또는
df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
SaveMode는 Append / Ignore / Overwrite / ErrorIfExists입니다.
여기에 Spark Documentation의 HiveContext에 대한 정의를 추가했습니다.
기본 SQLContext 외에도 기본 SQLContext에서 제공하는 기능의 상위 집합을 제공하는 HiveContext를 만들 수도 있습니다. 추가 기능에는보다 완전한 HiveQL 파서를 사용하여 쿼리를 작성하는 기능, Hive UDF에 대한 액세스 및 Hive 테이블에서 데이터를 읽는 기능이 포함됩니다. HiveContext를 사용하기 위해 기존 Hive 설정이 필요하지 않으며 SQLContext에 사용 가능한 모든 데이터 소스를 계속 사용할 수 있습니다. HiveContext는 기본 Spark 빌드에 Hive의 모든 종속성을 포함하지 않도록 개별적으로 만 패키징됩니다.
Spark 버전 1.6.2에서 "dbName.tableName"을 사용하면 다음 오류가 발생합니다.
org.apache.spark.sql.AnalysisException : 임시 테이블에는 데이터베이스 이름 또는 기타 규정자를 지정할 수 없습니다. 테이블 이름에 마침표 (.)가있는 경우 백틱 ()으로 테이블 이름을 인용하십시오 .`
Hive에 저장하는 것은 write()
SQLContext의 메소드를 사용 하는 것입니다.
df.write.saveAsTable(tableName)
https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String) 참조
Spark 2.2에서 : DataFrame 대신 DataSet을 사용합니다.
다음은 parquet 파일에서 Hive 테이블을 만드는 PySpark 버전입니다. 추론 된 스키마를 사용하여 Parquet 파일을 생성했으며 이제 정의를 Hive 메타 스토어에 푸시하려고 할 수 있습니다. 정의를 Hive 메타 스토어뿐만 아니라 AWS Glue 또는 AWS Athena와 같은 시스템으로 푸시 할 수도 있습니다. 여기에서는 spark.sql을 사용하여 영구 테이블을 푸시 / 생성합니다.
# Location where my parquet files are present.
df = spark.read.parquet("s3://my-location/data/")
cols = df.dtypes
buf = []
buf.append('CREATE EXTERNAL TABLE test123 (')
keyanddatatypes = df.dtypes
sizeof = len(df.dtypes)
print ("size----------",sizeof)
count=1;
for eachvalue in keyanddatatypes:
print count,sizeof,eachvalue
if count == sizeof:
total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
else:
total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
buf.append(total)
count = count + 1
buf.append(' )')
buf.append(' STORED as parquet ')
buf.append("LOCATION")
buf.append("'")
buf.append('s3://my-location/data/')
buf.append("'")
buf.append("'")
##partition by pt
tabledef = ''.join(buf)
print "---------print definition ---------"
print tabledef
## create a table using spark.sql. Assuming you are using spark 2.1+
spark.sql(tabledef);
Hive 외부 테이블의 경우 PySpark에서이 함수를 사용합니다.
def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
print("Saving result in {}.{}".format(database, table_name))
output_schema = "," \
.join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
.replace("StringType", "STRING") \
.replace("IntegerType", "INT") \
.replace("DateType", "DATE") \
.replace("LongType", "INT") \
.replace("TimestampType", "INT") \
.replace("BooleanType", "BOOLEAN") \
.replace("FloatType", "FLOAT")\
.replace("DoubleType","FLOAT")
output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)
sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))
query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
.format(database, table_name, output_schema, save_format, database, table_name)
sparkSession.sql(query)
dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
참고 URL : https://stackoverflow.com/questions/30664008/how-to-save-dataframe-directly-to-hive
'Program Tip' 카테고리의 다른 글
std :: async를 사용해야하는 이유는 무엇입니까? (0) | 2020.11.23 |
---|---|
UIWebView와 동일한 배율로 렌더링하기 위해 콘텐츠 크기 조정에서 WKWebView 억제 (0) | 2020.11.23 |
디렉토리가 git 제어하에 있는지 확인 (0) | 2020.11.23 |
세션 변수로 배열 (0) | 2020.11.23 |
MySQL 용 이스케이프 문자열 Python (0) | 2020.11.23 |