mirror of
https://github.com/aljazceru/turso.git
synced 2025-12-17 08:34:19 +01:00
Fix merge-py.py script to use github CLI and add makefile command
This commit is contained in:
@@ -1,14 +1,11 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# Copyright 2024 the Limbo authors. All rights reserved. MIT license.
|
||||
# Copyright 2024 the Turso authors. All rights reserved. MIT license.
|
||||
#
|
||||
# A script to merge a pull requests with a nice merge commit.
|
||||
# A script to merge a pull requests with a nice merge commit using GitHub CLI.
|
||||
#
|
||||
# Requirements:
|
||||
#
|
||||
# ```
|
||||
# pip install PyGithub
|
||||
# ```
|
||||
# - GitHub CLI (`gh`) must be installed and authenticated
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
@@ -17,13 +14,14 @@ import sys
|
||||
import tempfile
|
||||
import textwrap
|
||||
|
||||
from github import Github
|
||||
|
||||
|
||||
def run_command(command):
|
||||
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||||
output, error = process.communicate()
|
||||
return output.decode("utf-8").strip(), error.decode("utf-8").strip(), process.returncode
|
||||
def run_command(command, capture_output=True):
|
||||
if capture_output:
|
||||
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
||||
output, error = process.communicate()
|
||||
return output.decode("utf-8").strip(), error.decode("utf-8").strip(), process.returncode
|
||||
else:
|
||||
return "", "", subprocess.call(command, shell=True)
|
||||
|
||||
|
||||
def load_user_mapping(file_path=".github.json"):
|
||||
@@ -36,43 +34,49 @@ def load_user_mapping(file_path=".github.json"):
|
||||
user_mapping = load_user_mapping()
|
||||
|
||||
|
||||
def get_user_email(g, username):
|
||||
def get_user_email(username):
|
||||
if username in user_mapping:
|
||||
return f"{user_mapping[username]['name']} <{user_mapping[username]['email']}>"
|
||||
|
||||
try:
|
||||
user = g.get_user(username)
|
||||
name = user.name if user.name else username
|
||||
if user.email:
|
||||
return f"{name} <{user.email}>"
|
||||
# Try to get user info from gh CLI
|
||||
output, _, returncode = run_command(f"gh api users/{username}")
|
||||
if returncode == 0:
|
||||
user_data = json.loads(output)
|
||||
name = user_data.get("name", username)
|
||||
email = user_data.get("email")
|
||||
if email:
|
||||
return f"{name} <{email}>"
|
||||
return f"{name} (@{username})"
|
||||
except Exception as e:
|
||||
print(f"Error fetching email for user {username}: {str(e)}")
|
||||
|
||||
# If we couldn't find an email, return a noreply address
|
||||
# Fallback to noreply address
|
||||
return f"{username} <{username}@users.noreply.github.com>"
|
||||
|
||||
|
||||
def get_pr_info(g, repo, pr_number):
|
||||
pr = repo.get_pull(int(pr_number))
|
||||
author = pr.user
|
||||
author_name = author.name if author.name else author.login
|
||||
def get_pr_info(pr_number):
|
||||
output, error, returncode = run_command(
|
||||
f"gh pr view {pr_number} --json number,title,author,headRefName,body,reviews"
|
||||
)
|
||||
if returncode != 0:
|
||||
print(f"Error fetching PR #{pr_number}: {error}")
|
||||
sys.exit(1)
|
||||
|
||||
pr_data = json.loads(output)
|
||||
|
||||
# Get the list of users who reviewed the PR
|
||||
reviewed_by = []
|
||||
reviews = pr.get_reviews()
|
||||
for review in reviews:
|
||||
if review.state == "APPROVED":
|
||||
reviewer = review.user
|
||||
reviewed_by.append(get_user_email(g, reviewer.login))
|
||||
for review in pr_data.get("reviews", []):
|
||||
if review["state"] == "APPROVED":
|
||||
reviewed_by.append(get_user_email(review["author"]["login"]))
|
||||
|
||||
# Remove duplicates while preserving order
|
||||
reviewed_by = list(dict.fromkeys(reviewed_by))
|
||||
|
||||
return {
|
||||
"number": pr.number,
|
||||
"title": pr.title,
|
||||
"author": author_name,
|
||||
"head": pr.head.ref,
|
||||
"head_sha": pr.head.sha,
|
||||
"body": pr.body.strip() if pr.body else "",
|
||||
"number": pr_data["number"],
|
||||
"title": pr_data["title"],
|
||||
"author": pr_data["author"]["login"],
|
||||
"author_name": pr_data["author"].get("name", pr_data["author"]["login"]),
|
||||
"head": pr_data["headRefName"],
|
||||
"body": (pr_data.get("body") or "").strip(),
|
||||
"reviewed_by": reviewed_by,
|
||||
}
|
||||
|
||||
@@ -92,108 +96,131 @@ def wrap_text(text, width=72):
|
||||
return "\n".join(wrapped_lines)
|
||||
|
||||
|
||||
def merge_pr(pr_number, use_api=True):
|
||||
# GitHub authentication
|
||||
token = os.getenv("GITHUB_TOKEN")
|
||||
g = Github(token)
|
||||
def merge_remote(pr_number: int, commit_message: str, commit_title: str):
|
||||
output, error, returncode = run_command(f"gh pr checks {pr_number} --json state")
|
||||
if returncode == 0:
|
||||
checks_data = json.loads(output)
|
||||
if checks_data and any(check.get("state") == "FAILURE" for check in checks_data):
|
||||
print("Warning: Some checks are failing")
|
||||
if input("Do you want to proceed with the merge? (y/N): ").strip().lower() != "y":
|
||||
exit(0)
|
||||
|
||||
# Get the repository
|
||||
repo_name = os.getenv("GITHUB_REPOSITORY")
|
||||
if not repo_name:
|
||||
print("Error: GITHUB_REPOSITORY environment variable not set")
|
||||
# Create a temporary file for the commit message
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as temp_file:
|
||||
temp_file.write(commit_message)
|
||||
temp_file_path = temp_file.name
|
||||
|
||||
try:
|
||||
print(f"\nMerging PR #{pr_number} with custom commit message...")
|
||||
# Use gh pr merge with the commit message file
|
||||
cmd = f'gh pr merge {pr_number} --merge --subject "{commit_title}" --body-file "{temp_file_path}"'
|
||||
output, error, returncode = run_command(cmd, capture_output=False)
|
||||
|
||||
if returncode == 0:
|
||||
print(f"\nPull request #{pr_number} merged successfully!")
|
||||
print(f"\nMerge commit message:\n{commit_message}")
|
||||
else:
|
||||
print(f"Error merging PR: {error}")
|
||||
status_output, _, _ = run_command("gh pr status --json number,mergeable,mergeStateStatus")
|
||||
if status_output:
|
||||
print("\nPR status information:")
|
||||
print(status_output)
|
||||
sys.exit(1)
|
||||
finally:
|
||||
# Clean up the temporary file
|
||||
os.unlink(temp_file_path)
|
||||
|
||||
|
||||
def merge_local(pr_number: int, commit_message: str):
|
||||
current_branch, _, _ = run_command("git branch --show-current")
|
||||
|
||||
print(f"Fetching PR #{pr_number}...")
|
||||
cmd = f"gh pr checkout {pr_number}"
|
||||
_, error, returncode = run_command(cmd)
|
||||
if returncode != 0:
|
||||
print(f"Error checking out PR: {error}")
|
||||
sys.exit(1)
|
||||
repo = g.get_repo(repo_name)
|
||||
|
||||
# Get PR information
|
||||
pr_info = get_pr_info(g, repo, pr_number)
|
||||
pr_branch, _, _ = run_command("git branch --show-current")
|
||||
|
||||
# Format commit message
|
||||
commit_title = f"Merge '{pr_info['title']}' from {pr_info['author']}"
|
||||
commit_body = wrap_text(pr_info["body"])
|
||||
cmd = "git checkout main"
|
||||
_, error, returncode = run_command(cmd)
|
||||
if returncode != 0:
|
||||
print(f"Error checking out main branch: {error}")
|
||||
sys.exit(1)
|
||||
|
||||
commit_message = f"{commit_title}\n\n{commit_body}\n"
|
||||
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
|
||||
temp_file.write(commit_message)
|
||||
temp_file_path = temp_file.name
|
||||
|
||||
# Add Reviewed-by lines
|
||||
for approver in pr_info["reviewed_by"]:
|
||||
commit_message += f"\nReviewed-by: {approver}"
|
||||
|
||||
# Add Closes line
|
||||
commit_message += f"\n\nCloses #{pr_info['number']}"
|
||||
|
||||
if use_api:
|
||||
# Merge using GitHub API
|
||||
try:
|
||||
pr = pr_info["pr_object"]
|
||||
# Check if PR is mergeable
|
||||
if not pr.mergeable:
|
||||
print(f"Error: PR #{pr_number} is not mergeable. State: {pr.mergeable_state}")
|
||||
sys.exit(1)
|
||||
result = pr.merge(
|
||||
commit_title=commit_title,
|
||||
commit_message=commit_message.replace(commit_title + "\n\n", ""),
|
||||
merge_method="merge",
|
||||
)
|
||||
if result.merged:
|
||||
print(f"Pull request #{pr_number} merged successfully via GitHub API!")
|
||||
print(f"Merge commit SHA: {result.sha}")
|
||||
print(f"\nMerge commit message:\n{commit_message}")
|
||||
else:
|
||||
print(f"Error: Failed to merge PR #{pr_number}")
|
||||
print(f"Message: {result.message}")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"Error merging PR via API: {str(e)}")
|
||||
try:
|
||||
# Merge the PR branch with the custom message
|
||||
# Using -F with the full message (title + body)
|
||||
cmd = f"git merge --no-ff {pr_branch} -F {temp_file_path}"
|
||||
output, error, returncode = run_command(cmd)
|
||||
if returncode != 0:
|
||||
print(f"Error merging PR: {error}")
|
||||
# Try to go back to original branch
|
||||
run_command(f"git checkout {current_branch}")
|
||||
sys.exit(1)
|
||||
|
||||
print("\nPull request merged successfully locally!")
|
||||
print(f"\nMerge commit message:\n{commit_message}")
|
||||
|
||||
finally:
|
||||
# Clean up the temporary file
|
||||
os.unlink(temp_file_path)
|
||||
|
||||
|
||||
def merge_pr(pr_number, use_api=True):
|
||||
"""Merge a pull request with a formatted commit message"""
|
||||
check_gh_auth()
|
||||
|
||||
print(f"Fetching PR #{pr_number}...")
|
||||
pr_info = get_pr_info(pr_number)
|
||||
print(f"PR found: '{pr_info['title']}' by {pr_info['author']}")
|
||||
|
||||
# Format commit message
|
||||
commit_title = f"Merge '{pr_info['title']}' from {pr_info['author_name']}"
|
||||
commit_body = wrap_text(pr_info["body"])
|
||||
|
||||
commit_message_parts = [commit_title]
|
||||
if commit_body:
|
||||
commit_message_parts.append("") # Empty line between title and body
|
||||
commit_message_parts.append(commit_body)
|
||||
if pr_info["reviewed_by"]:
|
||||
commit_message_parts.append("") # Empty line before reviewed-by
|
||||
for approver in pr_info["reviewed_by"]:
|
||||
commit_message_parts.append(f"Reviewed-by: {approver}")
|
||||
commit_message_parts.append("") # Empty line before Closes
|
||||
commit_message_parts.append(f"Closes #{pr_info['number']}")
|
||||
commit_message = "\n".join(commit_message_parts)
|
||||
|
||||
if use_api:
|
||||
# For remote merge, we need to separate title from body
|
||||
commit_body_for_api = "\n".join(commit_message_parts[2:])
|
||||
merge_remote(pr_number, commit_body_for_api, commit_title)
|
||||
else:
|
||||
# Create a temporary file for the commit message
|
||||
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
|
||||
temp_file.write(commit_message)
|
||||
temp_file_path = temp_file.name
|
||||
merge_local(pr_number, commit_message)
|
||||
|
||||
try:
|
||||
# Instead of fetching to a branch, fetch the specific commit
|
||||
cmd = f"git fetch origin pull/{pr_number}/head"
|
||||
output, error, returncode = run_command(cmd)
|
||||
if returncode != 0:
|
||||
print(f"Error fetching PR: {error}")
|
||||
sys.exit(1)
|
||||
|
||||
# Checkout main branch
|
||||
cmd = "git checkout main"
|
||||
output, error, returncode = run_command(cmd)
|
||||
if returncode != 0:
|
||||
print(f"Error checking out main branch: {error}")
|
||||
sys.exit(1)
|
||||
|
||||
# Merge using the commit SHA instead of branch name
|
||||
cmd = f"git merge --no-ff {pr_info['head_sha']} -F {temp_file_path}"
|
||||
output, error, returncode = run_command(cmd)
|
||||
if returncode != 0:
|
||||
print(f"Error merging PR: {error}")
|
||||
sys.exit(1)
|
||||
|
||||
print("Pull request merged successfully!")
|
||||
print(f"Merge commit message:\n{commit_message}")
|
||||
print("\nNote: You'll need to push this merge to mark the PR as merged on GitHub")
|
||||
|
||||
finally:
|
||||
# Clean up the temporary file
|
||||
os.unlink(temp_file_path)
|
||||
def check_gh_auth():
|
||||
"""Check if gh CLI is authenticated"""
|
||||
_, _, returncode = run_command("gh auth status")
|
||||
if returncode != 0:
|
||||
print("Error: GitHub CLI is not authenticated. Run 'gh auth login' first.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 2:
|
||||
print("Usage: python merge_pr.py <pr_number>")
|
||||
sys.exit(1)
|
||||
import argparse
|
||||
|
||||
pr_number = sys.argv[1]
|
||||
if not re.match(r"^\d+$", pr_number):
|
||||
parser = argparse.ArgumentParser(description="Merge a pull request with a nice merge commit using GitHub CLI")
|
||||
parser.add_argument("pr_number", type=str, help="Pull request number to merge")
|
||||
parser.add_argument("--local", action="store_true", help="Use local git commands instead of GitHub API")
|
||||
args = parser.parse_args()
|
||||
if not re.match(r"^\d+$", args.pr_number):
|
||||
print("Error: PR number must be a positive integer")
|
||||
sys.exit(1)
|
||||
|
||||
use_api = True
|
||||
if len(sys.argv) == 3 and sys.argv[2] == "--local":
|
||||
use_api = False
|
||||
|
||||
merge_pr(pr_number, use_api)
|
||||
use_api = not args.local
|
||||
merge_pr(args.pr_number, use_api)
|
||||
|
||||
Reference in New Issue
Block a user