#!coding=utf8 from pyspark.sql import SparkSession, DataFrame def main_func(input0, spark, sc): output_tb = "your_test_x" print(f"read table {output_tb}") input0.write.mode("overwrite").saveAsTable(output_tb) def run(inputs: dict): spark = SparkSession.builder.config('hive.metastore.uris', 'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate() param_dict = preprocess(input_infos=inputs, ss=spark) main_func(**param_dict, spark=spark, sc=spark.sparkContext) def read_table(ss: SparkSession, tb_name: str) -> DataFrame: print(f"read table {tb_name}") return ss.sql(f'select * from {tb_name}') def write_table(df: DataFrame, tb_name: str): df.write.mode("overwrite").saveAsTable(tb_name) def preprocess(input_infos: dict, ss: SparkSession) -> dict: return {k: read_table(ss=ss, tb_name=v) for k, v in input_infos.items()} if __name__ == '__main__': run(inputs={'input0': "my_test_p"})