python脚本管理jenkins 知识

zhuguojun6 2月前 121

python脚本管理jenkins个人开发,仅供 参考,谢谢


登陆类(jk_auth.py):

import jenkins
class auth(object):
    def __init__(self, url, username, password):
        self.url = url
        self.username = username
        self.password = password
        self.server = jenkins.Jenkins(self.url, self.username, self.password)


证书管理(jk_credentials.py):

import os
import wget
import time
from jk_auth import *
 
 
class credential(auth):
    def __init__(self, url, username, password):
        auth.__init__(self, url, username, password)
        self.url = url
        self.username = username
        self.password = password
 
    def delete_cre(self, id, folder_name):
        if folder_name == '.':
            if os.path.exists('jenkins-cli.jar'):
                os.remove('jenkins-cli.jar')
            cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
            # cre_list_cmd = f'java -jar {cli_jar} -auth {self.username}:{self.password} -s {self.url} list-credentials system::system::jenkins'
            # cre_res = os.popen(cre_list_cmd)
            # cre_list = cre_res.readlines()
            del_cre_cmd = f'java -jar {cli_jar}  -s {self.url} delete-credentials system::system::jenkins _ {id} --username {self.username} --password {self.password}'
            # print(del_cre_cmd)
            os.popen(del_cre_cmd)
            time.sleep(2)
            print(f"credential {folder_name}/{id} is absent !")
 
        else:
            cre_status = self.server.credential_exists(id, folder_name)
            if cre_status:
                res = self.server.delete_credential(id, folder_name)
                print(res.read())
                print(f"delete credential {folder_name}/{id} ok !")
                print(f"credential {folder_name}/{id} is absent !")
        return ''
 
    def create_cre(self, id, folder_name, template_path, id_type, content, username=''):
        my_cre = []
        # if os.path.exists(f"{template_path}/credential_{id}.xml"):
        #     os.remove(f"{template_path}/credential_{id}.xml")
        if id_type == 'ssh_private':
            template = open(f'{template_path}/Credentials_SSH_PrivateKey.xml', 'r', encoding='UTF-8')
        elif id_type == 'user_pass':
            template = open(f'{template_path}/Credentials_Username_with_password.xml', 'r', encoding='UTF-8')
        elif id_type == 'secret_txt':
            template = open(f'{template_path}/Credentials_SecretText.xml', 'r', encoding='UTF-8')
        else:
            print("credential type error !")
            exit(-1)
        template_content = template.read().split('\n')
        for line in template_content:
            if id_type == 'ssh_private':
                if '<id>' in format(line):
                    line = f'<id>{id}</id>'
                elif '<username>' in format(line):
                    line = f'<username>{username}</username>'
                elif '</privateKey>' in format(line):
                    line = content + '\n' + line
            elif id_type == 'user_pass':
                if '<id>' in format(line):
                    line = f'<id>{id}</id>'
                elif '<username>' in format(line):
                    line = f'<username>{username}</username>'
                elif '</password>' in format(line):
                    line = content + '\n' + line
            elif id_type == 'secret_txt':
                if '<id>' in format(line):
                    line = f'<id>{id}</id>'
                elif '</secret>' in format(line):
                    line = content + '\n' + line
            my_cre.append(line)
 
        if folder_name == '.':
            if os.path.exists(f"{template_path}/credential_sys_{id}.xml"):
                os.remove(f"{template_path}/credential_sys_{id}.xml")
            cre_file = open(f"{template_path}/credential_sys_{id}.xml", 'a')
            for i in my_cre:
                cre_file.write(i + "\n")
            cre_file.close()
 
            if os.path.exists('jenkins-cli.jar'):
                os.remove('jenkins-cli.jar')
            cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
            add_cre_cmd = f'java -jar {cli_jar} -s {self.url} create-credentials-by-xml  system::system::jenkins _  < {template_path}/credential_sys_{id}.xml --username {self.username} --password {self.password} '
            #create sys credentials
            # print(add_cre_cmd)
            os.popen(add_cre_cmd)
            time.sleep(5)
            os.remove(f"{template_path}/credential_sys_{id}.xml")
        else:
            credential_xml = format(''.join(my_cre))
            self.server.create_credential(folder_name, credential_xml)
        print(f"create credential {folder_name}/{id} SUCCESS!")
        return ''
任务管理(jk_jobs.py):
import os
import time
import jenkins
from jk_auth import *
 
 
class jobs(auth):
    def __init__(self, url, username, password):
        auth.__init__(self, url, username, password)
        self.url = url
        self.username = username
        self.password = password
 
    def create_folder(self, folder_fullname):
        if folder_fullname != '.':
            folder_path = ''
            job_dic = []
            jobs_list = self.server.get_all_jobs()
            folder_depth = format(folder_fullname).split("/")
            for job in jobs_list:
                # print(job['fullname'])
                job_dic.append(job['fullname'])
            for folder_i in folder_depth:
                folder_path = str(folder_path) + format("/") + str(folder_i)
                print(folder_path.strip('/'))
                if folder_path.strip('/') in job_dic:
                    print(f"folder {folder_path.strip('/')} is present.")
 
                else:
                    self.server.create_job(folder_path.strip('/'), jenkins.EMPTY_FOLDER_XML)
                    print(f"folder {folder_path.strip('/')} create SUCCESS.")
        else:
            print(f"folder system is present.")
 
    def create_job(self, template_path, job_name, groovy_list=[], parameters=[], folder_name=''):
 
        my_job = []
        if folder_name != '.':
            folder_name = folder_name + '/'
        else:
            folder_name = ''
        # delete old job
        jobs_list = self.server.get_all_jobs()
        for job in jobs_list:
            if f'{folder_name}{job_name}' == job['fullname']:
                self.server.delete_job(f'{folder_name}{job_name}')
                print(f"delete job {folder_name}{job_name} SUCCESS !")
 
        template = open(f'{template_path}/jenkins_templates.xml', 'r', encoding='UTF-8')
        template_content = template.read().split('\n')
        # print(template_content)
        for line in template_content:
            if '/parameterDefinitions' in format(line):
                for para_name in parameters:
                    para = f"""
                    <hudson.model.StringParameterDefinition>
                    <name>{para_name}</name>
                    <description> {para_name} </description>
                    <defaultValue>  </defaultValue>
                    </hudson.model.StringParameterDefinition> 
                    """
 
                    line = format(para) + '\n' + line
            if 'template' in format(line):
                for groovy_file in groovy_list:
                    line = "    load '" + groovy_file + "'" + '\n'+ line
            my_job.append(line)
        job_config = format('\n'.join(my_job))
 
        #
        # for i in my_job:
        #     job_file = open(f"{template_path}/{job_name}.xml", 'a')
        #     job_file.write(i+"\n")
        #     job_file.close()
        # job_content = open(f"{template_path}/{job_name}.xml", 'r', encoding='UTF-8')
        # job_config = format(job_content.read())
        #create job
        self.server.create_job(f'{folder_name}{job_name}', job_config)
        print(f"create job {folder_name}{job_name} SUCCESS !")
 
    # def run_job(self, job_name, run_para={}):
    #     self.server.build_job(job_name, run_para)
    #     build_info = self.server.get_build_info(job_name, last_build_number)
    #     print(build_info)
 
    def run_job(self, job_name, run_para='', folder_name=''):
        folder_depth = format(folder_name).split("/")
        folder_path = ''
        if folder_name != '.':
            for folder_l in folder_depth:
                folder_path = folder_path + '/' + folder_l + '/job'
            folder_name = folder_name + '/'
        else:
            folder_name = ''
        # run job
        run_cmd = f'curl -s -X POST --user {self.username}:{self.password} ' \
                  f'{self.url}/job{folder_path}/{job_name}/buildWithParameters -d "{run_para}"'
 
        os.system(run_cmd)
        time.sleep(2)
        print(f'{folder_name}{job_name}')
        last_build_number = self.server.get_job_info(f'{folder_name}{job_name}')['builds'][0]['number']
        # print(last_build_number)
        #查询任务状态
        job_stat = 'TRUE'
        while job_stat == 'TRUE':
            build_info = self.server.get_build_info(f'{folder_name}{job_name}', last_build_number)
            if build_info['result'] in ['SUCCESS', 'FAILURE', 'ABORTED']:
                job_stat = build_info['building']
            else:
                print(f"job {folder_name}{job_name} buildID #{last_build_number} status is running !")
                time.sleep(2)
            if job_stat != 'TRUE':
                print(f"run job {folder_name}{job_name} buildID #{last_build_number} status is {build_info['result']} !")
        # 获取日志
        console_cmd = f'curl --user {self.username}:{self.password} ' \
                      f'{self.url}/job{folder_path}/{job_name}/1/consoleText'
        console_out = os.popen(console_cmd)
        print(console_out.read())
 
    def stop_job(self, job_name, folder_name=''):
        folder_depth = format(folder_name).split("/")
        folder_path = ''
        if folder_name != '.':
            for folder_l in folder_depth:
                folder_path = folder_path + '/' + folder_l + '/job'
            folder_name = folder_name + '/'
        else:
            folder_name = ''
 
        last_build_number = self.server.get_job_info(f'{folder_name}{job_name}')['builds'][0]['number']
        # print(last_build_number)
        stop_cmd = f'curl -s -X POST --user {self.username}:{self.password} ' \
                   f'{self.url}/job{folder_path}/{job_name}/{last_build_number}/stop'
 
        job_stat = 'TRUE'
        print(stop_cmd)
        while job_stat == 'TRUE':
            build_info = self.server.get_build_info(f'{folder_name}{job_name}', last_build_number)
            if build_info['result'] not in ['SUCCESS', 'FAILURE', 'ABORTED']:
                print(f"job {folder_name}{job_name} buildID #{last_build_number} status is running !")
                os.popen(stop_cmd)
                time.sleep(2)
            else:
                job_stat = build_info['building']
                print(f"job {folder_name}{job_name} buildID #{last_build_number} stop finished !")
                print(f"job {folder_name}{job_name} buildID #{last_build_number} status is {build_info['result']} !")


节点管理(jk_node.py):

import os
import wget
import jenkins
from jk_auth import auth
 
 
class node(auth):
    def __init__(self, url, username, password):
        auth.__init__(self, url, username, password)
        self.url = url
        self.username = username
        self.password = password
 
    def delete_node(self, node_name):
        if node_name == 'master':
            print("node name can not use 'master'!")
            exit(1)
        # delete node
        if os.path.exists('jenkins-cli.jar'):
            os.remove('jenkins-cli.jar')
        node_info = self.server.get_nodes()
        for node_i in node_info:
            if node_name == (node_i['name']):
                cli_jar = wget.download(f'{self.url}/jnlpJars/jenkins-cli.jar', out='jenkins-cli.jar')
                os.system(
                    f'java -jar {cli_jar} -auth {self.username}:{self.password} -s {self.url} delete-node {node_name}')
 
    def create_node(self, node_name, port, username, cre_id, host):
        node_info = self.server.get_nodes()
        for node_i in node_info:
            if node_name == (node_i['name']):
                print(f"node name {node_name} is  present, plase delete after !")
                exit(-1)
        # create node#
        params = {
            'port': port,
            'username': username,
            'credentialsId': cre_id,
            'host': host
        }
        self.server.create_node(
            node_name,
            nodeDescription=node_name,
            remoteFS='~/',
            labels='',
            exclusive=True,
            launcher=jenkins.LAUNCHER_SSH,
            launcher_params=params
        )
        return "create node " + node_name + " ok !"


最后于 2月前 被luyaops编辑 ,原因:
最新回复 (0)
    • 运维开源项目互助社区
      2
        立即登录 立即注册 
返回