1234567891011121314151617181920212223242526272829303132 |
- #!coding=utf8
- from pyspark.sql import SparkSession, DataFrame
- def main_func(input0, spark, sc):
- output_tb = "your_test_x"
- print(f"read table {output_tb}")
- input0.write.mode("overwrite").saveAsTable(output_tb)
- def run(inputs: dict):
- spark = SparkSession.builder.config('hive.metastore.uris',
- 'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
- param_dict = preprocess(input_infos=inputs, ss=spark)
- main_func(**param_dict, spark=spark, sc=spark.sparkContext)
- def read_table(ss: SparkSession, tb_name: str) -> DataFrame:
- print(f"read table {tb_name}")
- return ss.sql(f'select * from {tb_name}')
- def write_table(df: DataFrame, tb_name: str):
- df.write.mode("overwrite").saveAsTable(tb_name)
- def preprocess(input_infos: dict, ss: SparkSession) -> dict:
- return {k: read_table(ss=ss, tb_name=v) for k, v in input_infos.items()}
- if __name__ == '__main__':
- run(inputs={'input0': "my_test_p"})
|