華為云計算 云知識 批量遷移GitLab內(nèi)網(wǎng)倉庫到代碼托管CodeArts Repo
批量遷移GitLab內(nèi)網(wǎng)倉庫到代碼托管CodeArts Repo

 

背景介紹

CodeArts Repo現(xiàn)有遷倉能力只支持公網(wǎng)之間 遷移 ,缺少客戶內(nèi)網(wǎng)自建代碼托管平臺往Repo遷移的快速方案,因此提供批量遷移內(nèi)網(wǎng)代碼托管平臺倉庫到Repo的腳本。

 

配置訪問CodeArts Repo的SSH公鑰

在進行批量遷移GitLab的代碼倉到CodeArts Repo前,您需要安裝Git Bash客戶端,并且把本地生成的SSH公鑰配置到CodeArts Repo,具體操作步驟如下:

 

1. 運行Git Bash,先檢查本地是否已生成過SSH密鑰。

如果選擇RSA算法,請在Git Bash中執(zhí)行如下命令:

cat ~/.ssh/id_rsa.pub

 

如果選擇ED255219算法,請在Git Bash中執(zhí)行如下命令:

 

cat ~/.ssh/id_ed25519.pub

 

如果提示“No such file or directory”,說明您這臺計算機沒生成過SSH密鑰,請繼續(xù)執(zhí)行2。

如果返回以ssh-rsa或ssh-ed25519開頭的字符串,說明您這臺計算機已經(jīng)生成過SSH密鑰,如果想使用已經(jīng)生成的密鑰請直接跳到3,如果想重新生成密鑰,請從2向下執(zhí)行。

2. 生成SSH密鑰。如果選擇RSA算法,在Git Bash中生成密鑰的命令如下:

ssh-keygen -t rsa -b 4096 -C your_email@example.com

 

其中,-t rsa表示生成的是RSA類型密鑰,-b 4096是密鑰長度(該長度的RSA密鑰更具安全性),-C your_email@example.com表示在生成的公鑰文件中添加注釋,方便識別這個密鑰對的用途。

 

如果選擇ED25519算法,在Git Bash中生成密鑰的命令如下:

 

ssh-keygen -t ed25519 -b 521 -C your_email@example.com

 

其中,-t ed25519表示生成的是ED25519類型密鑰,-b 521是密鑰長度(該長度的ED25519密鑰更具安全性),-C your_email@example.com表示在生成的公鑰文件中添加注釋,方便識別這個密鑰對的用途。

 

輸入生成密鑰的命令后,直接回車,密鑰會默認存儲到~/.ssh/id_rsa或者~/.ssh/id_ed25519路徑下,對應的公鑰文件為~/.ssh/id_rsa.pub或者~/.ssh/id_ed25519.pub。

 

3. 復制SSH公鑰到剪切板。請根據(jù)您的操作系統(tǒng),選擇相應的執(zhí)行命令,將SSH公鑰復制到您的剪切板。

Windows:

clip < ~/.ssh/id_rsa.pub

 

Mac:

pbcopy < ~/.ssh/id_rsa.pub

 

Linux (xclip required):

xclip -sel clip < ~/.ssh/id_rsa.pub

 

4. 登錄并進入Repo的代碼倉庫列表頁,單擊右上角昵稱,選擇“個人設(shè)置” > “代碼托管” > “SSH密鑰”,進入配置SSH密鑰頁面。

也可以在Repo的代碼倉庫列表頁,單擊右上角“設(shè)置我的SSH密鑰”,進入配置SSH密鑰頁面。

 

5. 在“標題”中為您的新密鑰起一個名稱,將您在3中復制的SSH公鑰粘貼進“密鑰”中,單擊確定后,彈出頁面“密鑰已設(shè)置成功,單擊 立即返回,無操作3S后自動跳轉(zhuǎn)”,表示密鑰設(shè)置成功。

批量遷移GitLab內(nèi)網(wǎng)倉庫到CodeArts Repo

1. 進入Python官網(wǎng)下載并安裝Python3。

2. 登錄GitLab并獲取private_token,在“用戶設(shè)置”里,選擇“訪問令牌” > “添加新令牌”。

3. 您需要在本地生成SSH公鑰并配置到GitLab和CodeArts Repo,其中配置到CodeArts Repo可參考配置訪問CodeArts Repo的SSH公鑰

4. 調(diào)試獲取IAM用戶Token(使用密碼)接口,通過華為云賬號的用戶密碼獲取用戶Token。參數(shù)的填寫方法,您可以在接口的調(diào)試界面,單擊右側(cè)“請求示例”,填寫好參數(shù)后,單擊“調(diào)試”,將獲取到的用戶Token復制并保存到本地。

5. 用獲取到的用戶Token配置“config.json”文件。其中,source_host_url是您內(nèi)網(wǎng)的GitLab的接口地址,repo_api_prefix是CodeArts Repo 的openAPI地址。

{

       "source_host_url": "http://{source_host}/api/v4/projects?simple=true",

       "private_token": "GitLab上獲取的private_token",

       "repo_api_prefix": "https://${open_api}",

       "x_auth_token": "用戶Token"

}

 

6. 登錄CodeArts首頁創(chuàng)建項目并保存您的項目ID。

7. 用獲取的項目ID配置“plan.json”文件,如下的示例表示兩個代碼倉的遷移配置,您可以根據(jù)需要進行配置。此處的g1/g2/g3表示代碼組路徑,如果沒有提前在頁面創(chuàng)建,根據(jù)該配置會自動生成。

[

       ["path_with_namespace", "項目ID", "g1/g2/g3/目標倉庫名1"],

        ["path_with_namespace", "項目ID", "g1/g2/g3/目標倉庫名2"]

]

 

說明:

代碼組的創(chuàng)建請進入CodeArts Repo首頁,單擊“新建倉庫”旁的下拉框,選擇“新建代碼組”。

代碼倉庫的名字需要以大小寫字母、數(shù)字、下劃線開頭,可包含大小寫字母、數(shù)字、中劃線、下劃線、英文句點,但不能以.git、.atom或.結(jié)尾。

8. 在本地Python控制臺,創(chuàng)建migrate_to_repo.py文件。

#!/usr/bin/python

# -*- coding: UTF-8 -*-

import json

import logging

import os

import subprocess

import time

import urllib.parse

import urllib.request

from logging import handlers

 

# 存在同名倉庫時是否跳過

SKIP_SAME_NAME_REPO = True

 

STATUS_OK = 200

STATUS_CREATED = 201

STATUS_INTERNAL_SERVER_ERROR = 500

STATUS_NOT_FOUND = 404

HTTP_METHOD_POST = "POST"

CODE_UTF8 = 'utf-8'

FILE_SOURCE_REPO_INFO = 'source_repos.json'

FILE_TARGET_REPO_INFO = 'target_repos.json'

FILE_CONFIG = 'config.json'

FILE_PLAN = 'plan.json'

FILE_LOG = 'migrate.log'

X_AUTH_TOKEN = 'x-auth-token'

 

 

class Logger(object):

    def __init__(self, filename):

        format_str = logging.Formatter('%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')

        self.logger = logging.getLogger(filename)

        self.logger.setLevel(logging.INFO)

        sh = logging.StreamHandler()

        sh.setFormatter(format_str)

        th = handlers.TimedRotatingFileHandler(filename=filename, when='D', backupCount=3, encoding=CODE_UTF8)

        th.setFormatter(format_str)

        self.logger.addHandler(sh)

        self.logger.addHandler(th)

 

 

log = Logger(FILE_LOG)

 

 

def make_request(url, data={}, headers={}, method='GET'):

    headers["Content-Type"] = 'application/json'

    headers['Accept-Charset'] = CODE_UTF8

    params = json.dumps(data)

    params = bytes(params, 'utf8')

    try:

        import ssl

        ssl._create_default_https_context = ssl._create_unverified_context

        request = urllib.request.Request(url, data=params, headers=headers, method=method)

        r = urllib.request.urlopen(request)

        if r.status != STATUS_OK and r.status != STATUS_CREATED:

            log.logger.error('request error: ' + str(r.status))

            return r.status, ""

    except urllib.request.HTTPError as e:

        log.logger.error('request with code: ' + str(e.code))

        msg = str(e.read().decode(CODE_UTF8))

        log.logger.error('request error: ' + msg)

        return STATUS_INTERNAL_SERVER_ERROR, msg

    content = r.read().decode(CODE_UTF8)

    return STATUS_OK, content

 

 

def read_migrate_plan():

    log.logger.info('read_migrate_plan start')

    with open(FILE_PLAN, 'r') as f:

        migrate_plans = json.load(f)

    plans = []

    for m_plan in migrate_plans:

        if len(m_plan) != 3:

            log.logger.error("line format not match \"source_path_with_namespace\",\"project_id\",\"target_namespace\"")

            return STATUS_INTERNAL_SERVER_ERROR, []

        namespace = m_plan[2].split("/")

        if len(namespace) < 1 or len(namespace) > 4:

            log.logger.error("group level support 0 to 3")

            return STATUS_INTERNAL_SERVER_ERROR, []

        l = len(namespace)

        plan = {

            "path_with_namespace": m_plan[0],

            "project_id": m_plan[1],

            "groups": namespace[0:l - 1],

            "repo_name": namespace[l - 1]

        }

        plans.append(plan)

    return STATUS_OK, plans

 

 

def get_repo_by_plan(namespace, repos):

    if namespace not in repos:

        log.logger.info("%s not found in gitlab, skip" % namespace)

        return STATUS_NOT_FOUND, {}

 

    repo = repos[namespace]

    return STATUS_OK, repo

 

 

def repo_info_from_source(config):

    if os.path.exists(FILE_SOURCE_REPO_INFO):

        log.logger.info('get_repos skip: %s already exist' % FILE_SOURCE_REPO_INFO)

        return STATUS_OK

 

    log.logger.info('get_repos start')

    headers = {'PRIVATE-TOKEN': config['private_token']}

    url = config['source_host_url']

    per_page = 100

    page = 1

    data = {}

 

    while True:

        url_with_page = "%s&page=%s&per_page=%s" % (url, page, per_page)

        status, content = make_request(url_with_page, headers=headers)

        if status != STATUS_OK:

            return status

        repos = json.loads(content)

        for repo in repos:

            namespace = repo['path_with_namespace']

            repo_info = {'name': repo['name'], 'id': repo['id'], 'path_with_namespace': namespace,

                         'ssh_url': repo['ssh_url_to_repo']}

            data[namespace] = repo_info

        if len(repos) < per_page:

            break

        page = page + 1

 

    with open(FILE_SOURCE_REPO_INFO, 'w') as f:

        json.dump(data, f, indent=4)

    log.logger.info('get_repos end with %s' % len(data))

    return STATUS_OK

 

 

def get_repo_dir(repo):

    return "repo_%s" % repo['id']

 

 

def exec_cmd(cmd, ssh_url, dir_name):

    log.logger.info("will exec %s %s" % (cmd, ssh_url))

    pr = subprocess.Popen(cmd + " " + ssh_url, cwd=dir_name, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    (out, error) = pr.communicate()

    log.logger.info("stdout of %s is:%s" % (cmd, str(out)))

    log.logger.info("stderr of %s is:%s" % (cmd, str(error)))

    if "Error" in str(error) or "err" in str(error) or "failed" in str(error):

        log.logger.error("%s failed" % cmd)

        return STATUS_INTERNAL_SERVER_ERROR

    return STATUS_OK

 

 

def clone_from_source(config, plans):

    log.logger.info('clone_repos start')

    with open(FILE_SOURCE_REPO_INFO, 'r') as f:

        repos = json.load(f)

    for plan in plans:

        status, repo = get_repo_by_plan(plan["path_with_namespace"], repos)

        if status == STATUS_NOT_FOUND:

            return status

 

        name = repo["name"]

        dir_name = get_repo_dir(repo)

        folder = os.path.exists(dir_name)

        if folder:

            log.logger.info("skip clone " + name)

            continue

        os.makedirs(dir_name)

        status = exec_cmd("git clone --mirror", repo['ssh_url'], dir_name)

        if status != STATUS_OK:

            return status

    log.logger.info('clone_repos end')

    return STATUS_OK

 

 

def get_groups(config, project_id):

    log.logger.info('get_groups start')

    headers = {X_AUTH_TOKEN: config['x_auth_token']}

    api_prefix = config['repo_api_prefix']

    limit = 100

    offset = 0

    data = {}

    while True:

        url_with_page = "%s/v4/%s/manageable-groups?offset=%s&limit=%s" % (api_prefix, project_id, offset, limit)

        status, content = make_request(url_with_page, headers=headers)

        if status != STATUS_OK:

            return status, dict()

        rows = json.loads(content)

        for row in rows:

            full_name = row['full_name']

            data[full_name] = row

        if len(rows) < limit:

            break

        offset = offset + len(rows)

    log.logger.info('get_groups end with %s' % len(data))

    return STATUS_OK, data

 

 

def create_group(config, project_id, name, parent, has_parent):

    log.logger.info('create_group start')

    headers = {X_AUTH_TOKEN: config['x_auth_token']}

    api_prefix = config['repo_api_prefix']

    data = {

        'name': name,

        'visibility': 'private',

        'description': ''

    }

    if has_parent:

        data['parent_id'] = parent['id']

 

    url = "%s/v4/%s/groups" % (api_prefix, project_id)

    status, content = make_request(url, data=data, headers=headers, method='POST')

    if status != STATUS_OK:

        log.logger.error('create_group error: %s', str(status))

        return status

    return STATUS_OK

 

 

# 指定代碼組創(chuàng)建倉庫

def create_repo(config, project_id, name, parent, has_parent):

    log.logger.info('create_repo start')

    headers = {X_AUTH_TOKEN: config['x_auth_token']}

    api_prefix = config['repo_api_prefix']

    data = {

        'name': name,

        'project_uuid': project_id,

        'enable_readme': 0

    }

    if has_parent:

        data['group_id'] = parent['id']

    url = "%s/v1/repositories" % api_prefix

    status, content = make_request(url, data=data, headers=headers, method='POST')

    if "同名倉庫或代碼組" in content:

        log.logger.info("repo %s already exist. %s" % (name, content))

        log.logger.info("skip same name repo %s: %s" % (name, SKIP_SAME_NAME_REPO))

        return check_repo_conflict(config, project_id, parent, name)

    elif status != STATUS_OK:

        log.logger.error('create_repo error: %s', str(status))

        return status, ""

    response = json.loads(content)

    repo_uuid = response["result"]["repository_uuid"]

 

    # 創(chuàng)建后檢查

    for retry in range(1, 4):

        status, ssh_url = get_repo_detail(config, repo_uuid)

        if status != STATUS_OK:

            if retry == 3:

                return status, ""

            time.sleep(retry * 2)

            continue

        break

 

    return STATUS_OK, ssh_url

 

 

def check_repo_conflict(config, project_id, group, name):

    if not SKIP_SAME_NAME_REPO:

        return STATUS_INTERNAL_SERVER_ERROR, ""

 

    log.logger.info('check_repo_conflict start')

    headers = {X_AUTH_TOKEN: config['x_auth_token']}

    api_prefix = config['repo_api_prefix']

    url_with_page = "%s/v2/projects/%s/repositories?search=%s" % (api_prefix, project_id, name)

    status, content = make_request(url_with_page, headers=headers)

    if status != STATUS_OK:

        return status, ""

    rows = json.loads(content)

    for row in rows["result"]["repositories"]:

        if "full_name" in group and "group_name" in row:

            g = group["full_name"].replace(" ", "")

            if row["group_name"].endswith(g):

                return STATUS_OK, row["ssh_url"]

        elif "full_name" not in group and name == row['repository_name']:

            # 沒有代碼組的場景

            return STATUS_OK, row["ssh_url"]

 

    log.logger.info('check_repo_conflict end, failed to find: %s' % name)

    return STATUS_INTERNAL_SERVER_ERROR, ""

 

 

def get_repo_detail(config, repo_uuid):

    log.logger.info('get_repo_detail start')

    headers = {X_AUTH_TOKEN: config['x_auth_token']}

    api_prefix = config['repo_api_prefix']

    url_with_page = "%s/v2/repositories/%s" % (api_prefix, repo_uuid)

    status, content = make_request(url_with_page, headers=headers)

    if status != STATUS_OK:

        return status, ""

    rows = json.loads(content)

    log.logger.info('get_repo_detail end')

    return STATUS_OK, rows["result"]["ssh_url"]

 

 

def process_plan(config, plan):

    # 獲取項目下的組織列表

    project_id = plan["project_id"]

    status, group_dict = get_groups(config, project_id)

    if status != STATUS_OK:

        return status, ""

    group = ""

    last_group = {}

    has_group = False

    for g in plan["groups"]:

        # 檢查目標代碼組,如果存在則檢查下一層

        if group == "":

            group = " %s" % g

        else:

            group = "%s / %s" % (group, g)

        if group in group_dict:

            last_group = group_dict[group]

            has_group = True

            continue

        # 不存在則創(chuàng)建,并更新

        status = create_group(config, project_id, g, last_group, has_group)

        if status != STATUS_OK:

            return status, ""

        status, group_dict = get_groups(config, project_id)

        if status != STATUS_OK:

            return status, ""

        last_group = group_dict[group]

        has_group = True

 

    status, ssh_url = create_repo(config, project_id, plan["repo_name"], last_group, has_group)

    if status != STATUS_OK:

        return status, ""

 

    return status, ssh_url

 

 

def create_group_and_repos(config, plans):

    if os.path.exists(FILE_TARGET_REPO_INFO):

        log.logger.info('create_group_and_repos skip: %s already exist' % FILE_TARGET_REPO_INFO)

        return STATUS_OK

 

    log.logger.info('create_group_and_repos start')

    with open(FILE_SOURCE_REPO_INFO, 'r') as f:

        repos = json.load(f)

        target_repo_info = {}

    for plan in plans:

        status, ssh_url = process_plan(config, plan)

        if status != STATUS_OK:

            return status

 

        status, repo = get_repo_by_plan(plan["path_with_namespace"], repos)

        if status == STATUS_NOT_FOUND:

            return

        repo['codehub_sshUrl'] = ssh_url

        target_repo_info[repo['path_with_namespace']] = repo

 

    with open(FILE_TARGET_REPO_INFO, 'w') as f:

        json.dump(target_repo_info, f, indent=4)

    log.logger.info('create_group_and_repos end')

    return STATUS_OK

 

 

def push_to_target(config, plans):

    log.logger.info('push_repos start')

    with open(FILE_TARGET_REPO_INFO, 'r') as f:

        repos = json.load(f)

    for r in repos:

        repo = repos[r]

        name = repo["name"]

        dir_name = get_repo_dir(repo)

 

        status = exec_cmd("git config remote.origin.url", repo['codehub_sshUrl'], dir_name + "/" + name + ".git")

        if status != STATUS_OK:

            log.logger.error("%s git config failed" % name)

            return

 

        status = exec_cmd("git push --mirror -f", "", dir_name + "/" + name + ".git")

        if status != STATUS_OK:

            log.logger.error("%s git push failed" % name)

            return

    log.logger.info('push_repos end')

 

 

def main():

    with open(FILE_CONFIG, 'r') as f:

        config = json.load(f)

    # read plan

    status, plans = read_migrate_plan()

    if status != STATUS_OK:

        return

    # 獲取自建gitlab倉庫列表,結(jié)果輸出到FILE_SOURCE_REPO_INFO文件中

    if repo_info_from_source(config) != STATUS_OK:

        return

    # clone倉庫到本地

    status = clone_from_source(config, plans)

    if status != STATUS_OK:

        return

 

    # 調(diào)用接口創(chuàng)建倉庫,并記錄倉庫地址到FILE_SOURCE_REPO_INFO中

    if create_group_and_repos(config, plans) != STATUS_OK:

        return

 

    # 推送時使用ssh方式推送,請?zhí)崆霸贑odeArts Repo服務(wù)配置ssh key

    push_to_target(config, plans)

 

 

if __name__ == '__main__':

    main()

 

9. 執(zhí)行如下命令,啟動腳本并完成代碼倉的批量遷移。

python migrate_to_repo.py