data_transfer.py 992 B

1234567891011121314151617181920212223242526272829303132
  1. #!coding=utf8
  2. from pyspark.sql import SparkSession, DataFrame
  3. def main_func(input0, spark, sc):
  4. output_tb = "your_test_x"
  5. print(f"read table {output_tb}")
  6. input0.write.mode("overwrite").saveAsTable(output_tb)
  7. def run(inputs: dict):
  8. spark = SparkSession.builder.config('hive.metastore.uris',
  9. 'thrift://192.168.199.27:9083').enableHiveSupport().getOrCreate()
  10. param_dict = preprocess(input_infos=inputs, ss=spark)
  11. main_func(**param_dict, spark=spark, sc=spark.sparkContext)
  12. def read_table(ss: SparkSession, tb_name: str) -> DataFrame:
  13. print(f"read table {tb_name}")
  14. return ss.sql(f'select * from {tb_name}')
  15. def write_table(df: DataFrame, tb_name: str):
  16. df.write.mode("overwrite").saveAsTable(tb_name)
  17. def preprocess(input_infos: dict, ss: SparkSession) -> dict:
  18. return {k: read_table(ss=ss, tb_name=v) for k, v in input_infos.items()}
  19. if __name__ == '__main__':
  20. run(inputs={'input0': "my_test_p"})