Files
Tutorial-Codebase-Knowledge/utils/crawl_github_files.py
2025-04-04 15:06:41 -04:00

236 lines
11 KiB
Python

import requests
import base64
import os
import time
import fnmatch
from typing import Union, Set, List, Dict, Tuple, Any
from urllib.parse import urlparse
def crawl_github_files(
repo_url,
token=None,
max_file_size: int = 1 * 1024 * 1024, # 1 MB
use_relative_paths: bool = False,
include_patterns: Union[str, Set[str]] = None,
exclude_patterns: Union[str, Set[str]] = None
):
"""
Crawl files from a specific path in a GitHub repository at a specific commit.
Args:
repo_url (str): URL of the GitHub repository with specific path and commit
(e.g., 'https://github.com/microsoft/autogen/tree/e45a15766746d95f8cfaaa705b0371267bec812e/python/packages/autogen-core/src/autogen_core')
token (str, optional): GitHub personal access token. Required for private repositories and recommended for public repos to avoid rate limits.
max_file_size (int, optional): Maximum file size in bytes to download (default: 1 MB)
use_relative_paths (bool, optional): If True, file paths will be relative to the specified subdirectory
include_patterns (str or set of str, optional): Pattern or set of patterns specifying which files to include (e.g., "*.py", {"*.md", "*.txt"}).
If None, all files are included.
exclude_patterns (str or set of str, optional): Pattern or set of patterns specifying which files to exclude.
If None, no files are excluded.
Returns:
dict: Dictionary with files and statistics
"""
# Convert single pattern to set
if include_patterns and isinstance(include_patterns, str):
include_patterns = {include_patterns}
if exclude_patterns and isinstance(exclude_patterns, str):
exclude_patterns = {exclude_patterns}
# Parse GitHub URL to extract owner, repo, commit/branch, and path
parsed_url = urlparse(repo_url)
path_parts = parsed_url.path.strip('/').split('/')
if len(path_parts) < 2:
raise ValueError(f"Invalid GitHub URL: {repo_url}")
# Extract the basic components
owner = path_parts[0]
repo = path_parts[1]
# Check if URL contains a specific branch/commit
if 'tree' in path_parts:
tree_index = path_parts.index('tree')
ref = path_parts[tree_index + 1]
# Combine all parts after the ref as the path
path_start = tree_index + 2
specific_path = '/'.join(path_parts[path_start:]) if path_start < len(path_parts) else ""
else:
ref = "main" # Default branch
specific_path = ""
# Setup for GitHub API
headers = {"Accept": "application/vnd.github.v3+json"}
if token:
headers["Authorization"] = f"token {token}"
# Dictionary to store path -> content mapping
files = {}
skipped_files = []
def should_include_file(file_path: str, file_name: str) -> bool:
"""Determine if a file should be included based on patterns"""
# If no include patterns are specified, include all files
if not include_patterns:
include_file = True
else:
# Check if file matches any include pattern
include_file = any(fnmatch.fnmatch(file_name, pattern) for pattern in include_patterns)
# If exclude patterns are specified, check if file should be excluded
if exclude_patterns and include_file:
# Exclude if file matches any exclude pattern
exclude_file = any(fnmatch.fnmatch(file_path, pattern) for pattern in exclude_patterns)
return not exclude_file
return include_file
def fetch_contents(path):
"""Fetch contents of the repository at a specific path and commit"""
url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}"
params = {"ref": ref}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 403 and 'rate limit exceeded' in response.text.lower():
reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
wait_time = max(reset_time - time.time(), 0) + 1
print(f"Rate limit exceeded. Waiting for {wait_time:.0f} seconds...")
time.sleep(wait_time)
return fetch_contents(path)
if response.status_code == 404:
if not token:
print(f"Error 404: Repository not found or is private. If this is a private repository, you need to provide a token.")
else:
print(f"Error 404: Path '{path}' not found in repository or insufficient permissions.")
return
if response.status_code != 200:
print(f"Error fetching {path}: {response.status_code} - {response.text}")
return
contents = response.json()
# Handle both single file and directory responses
if not isinstance(contents, list):
contents = [contents]
for item in contents:
item_path = item["path"]
# Calculate relative path if requested
if use_relative_paths and specific_path:
# Make sure the path is relative to the specified subdirectory
if item_path.startswith(specific_path):
rel_path = item_path[len(specific_path):].lstrip('/')
else:
rel_path = item_path
else:
rel_path = item_path
if item["type"] == "file":
# Check if file should be included based on patterns
if not should_include_file(rel_path, item["name"]):
print(f"Skipping {rel_path}: Does not match include/exclude patterns")
continue
# Check file size if available
file_size = item.get("size", 0)
if file_size > max_file_size:
skipped_files.append((item_path, file_size))
print(f"Skipping {rel_path}: File size ({file_size} bytes) exceeds limit ({max_file_size} bytes)")
continue
# For files, get raw content
if "download_url" in item and item["download_url"]:
file_url = item["download_url"]
file_response = requests.get(file_url, headers=headers)
# Final size check in case content-length header is available but differs from metadata
content_length = int(file_response.headers.get('content-length', 0))
if content_length > max_file_size:
skipped_files.append((item_path, content_length))
print(f"Skipping {rel_path}: Content length ({content_length} bytes) exceeds limit ({max_file_size} bytes)")
continue
if file_response.status_code == 200:
files[rel_path] = file_response.text
print(f"Downloaded: {rel_path} ({file_size} bytes) ")
else:
print(f"Failed to download {rel_path}: {file_response.status_code}")
else:
# Alternative method if download_url is not available
content_response = requests.get(item["url"], headers=headers)
if content_response.status_code == 200:
content_data = content_response.json()
if content_data.get("encoding") == "base64" and "content" in content_data:
# Check size of base64 content before decoding
if len(content_data["content"]) * 0.75 > max_file_size: # Approximate size calculation
estimated_size = int(len(content_data["content"]) * 0.75)
skipped_files.append((item_path, estimated_size))
print(f"Skipping {rel_path}: Encoded content exceeds size limit")
continue
file_content = base64.b64decode(content_data["content"]).decode('utf-8')
files[rel_path] = file_content
print(f"Downloaded: {rel_path} ({file_size} bytes)")
else:
print(f"Unexpected content format for {rel_path}")
else:
print(f"Failed to get content for {rel_path}: {content_response.status_code}")
elif item["type"] == "dir":
# Recursively process subdirectories
fetch_contents(item_path)
# Start crawling from the specified path
fetch_contents(specific_path)
return {
"files": files,
"stats": {
"downloaded_count": len(files),
"skipped_count": len(skipped_files),
"skipped_files": skipped_files,
"base_path": specific_path if use_relative_paths else None,
"include_patterns": include_patterns,
"exclude_patterns": exclude_patterns
}
}
# Example usage
if __name__ == "__main__":
# Get token from environment variable (more secure than hardcoding)
github_token = os.environ.get("GITHUB_TOKEN")
repo_url = "https://github.com/pydantic/pydantic/tree/6c38dc93f40a47f4d1350adca9ec0d72502e223f/pydantic"
# Example: Get Python and Markdown files, but exclude test files
result = crawl_github_files(
repo_url,
token=github_token,
max_file_size=1 * 1024 * 1024, # 1 MB in bytes
use_relative_paths=True, # Enable relative paths
include_patterns={"*.py", "*.md"}, # Include Python and Markdown files
)
files = result["files"]
stats = result["stats"]
print(f"\nDownloaded {stats['downloaded_count']} files.")
print(f"Skipped {stats['skipped_count']} files due to size limits or patterns.")
print(f"Base path for relative paths: {stats['base_path']}")
print(f"Include patterns: {stats['include_patterns']}")
print(f"Exclude patterns: {stats['exclude_patterns']}")
# Display all file paths in the dictionary
print("\nFiles in dictionary:")
for file_path in sorted(files.keys()):
print(f" {file_path}")
# Example: accessing content of a specific file
if files:
sample_file = next(iter(files))
print(f"\nSample file: {sample_file}")
print(f"Content preview: {files[sample_file][:200]}...")