12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- from kubernetes.client import api_client
- from kubernetes.client.apis import core_v1_api
- from kubernetes import client, config
- class KubernetesTools(object):
- def __init__(self):
- self.k8s_url = "https://192.168.199.109:6443" # todo 改成配置项
- def get_token(self):
- """
- 获取token
- :return:
- """
- return "eyJhbGciOiJSUzI1NiIsImtpZCI6IjhTMXExTDJkUXBtMWZnWHE3SGgwVGtlMjdYZlAtQTlHYWdXSWlybmY4VEkifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlcm5ldGVzLWRhc2hib2FyZCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJhZG1pbi11c2VyLXRva2VuLTc1bWo1Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImFkbWluLXVzZXIiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiJiZWI1MjhkNC1jMmUxLTRiMjgtYTYyNC1iYWI5NjgzNjg2NTAiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZXJuZXRlcy1kYXNoYm9hcmQ6YWRtaW4tdXNlciJ9.a9TBKjgcEVSI6pDvqqfhq8NBJFcSCD4xEGI9Ne_KXfURXODUAmiM8Ld480BZudFo-RKnaWxZXUUUgQCcqH7pjsV-YrB27Lh4aySjaNfxXt-euecCF17XheP_g7K1p3XP1eS07FP-8KgHeo_p9TZK8PotR9zux5SsExuOAd7VUCLvUeWllPkLrsN6woTbDWnJTNCqfwtP04BOiDCiQpOwqfd8S1bby1lwWWI6pxM0cK1_lPQc6YydB-nB4mQRNe4qCwVx_qG_NTzSiI9uel1m0OVIZg2ungsjTzSInSTyaWjJ8V0T8uovDNTZv0_aC4DHEW7X14xtLY9UiZNy50Nebg"
- # with open('k8s-token.txt', 'r') as file:
- # Token = file.read().strip('\n')
- # return Token
- def get_api(self):
- """
- 获取API的CoreV1Api版本对象
- :return:
- """
- configuration = client.Configuration()
- configuration.host = self.k8s_url
- configuration.verify_ssl = False
- configuration.api_key = {"authorization": "Bearer " + self.get_token()}
- client1 = api_client.ApiClient(configuration=configuration)
- api = core_v1_api.CoreV1Api(client1)
- return api
- def get_namespace_list(self):
- """
- 获取命名空间列表
- :return:
- """
- api = self.get_api()
- namespace_list = []
- for ns in api.list_namespace().items:
- # print(ns.metadata.name)
- namespace_list.append(ns.metadata.name)
- return namespace_list
- def get_pod_info(self, namespaces, pod_name):
- """
- 查看pod信息
- :param namespaces: 命令空间,比如:default
- :param pod_name: pod完整名称,比如:flaskapp-1-5d96dbf59b-lhmp8
- :return:
- """
- api = self.get_api()
- # 示例参数
- namespaces = "airflow"
- pod_name = "python-task-1664728976-3572760bf5614ca8889930ec0af323fb"
- resp = api.read_namespaced_pod(namespace=namespaces, name=pod_name)
- # 详细信息
- print(resp)
- # def get_pod_logs(self, namespaces, pod_name, tail_lines=200):
- # """
- # 查看pod日志
- # :param namespaces: 命令空间,比如:default
- # :param pod_name: pod完整名称,比如:flaskapp-1-5d96dbf59b-lhmp8
- # :return:
- # """
- # api = self.get_api()
- # # 示例参数
- # # namespaces = "airflow"
- # # pod_name = "flaskapp-1-5d96dbf59b-lhmp8"
- # """
- # pretty美化输出
- # tail_lines=200输出最近200行
- # """
- # log_content = api.read_namespaced_pod_log(pod_name, namespaces, pretty=True, tail_lines=200)
- # return log_content
- def get_pod_logs(self, namespaces, labels: dict, tail_lines=10000):
- """
- 查看pod日志
- :param namespaces: 命令空间,比如:default
- :param pod_name: pod完整名称,比如:flaskapp-1-5d96dbf59b-lhmp8
- :return:
- """
- api = self.get_api()
- rets = api.list_namespaced_pod(namespaces, label_selector=','.join([f'{k}={v}' for k, v in labels.items()]),
- watch=False)
- if len(rets.items) > 0:
- pod_name = rets.items[0].to_dict()['metadata']['name']
- log_content = api.read_namespaced_pod_log(pod_name, namespaces, pretty=True, tail_lines=tail_lines)
- return log_content
- else:
- return None
|