import requests from bs4 import BeautifulSoup from config import Config from llm_utils import create_chat_completion from urllib.parse import urlparse, urljoin cfg = Config() # Function to check if the URL is valid def is_valid_url(url): try: result = urlparse(url) return all([result.scheme, result.netloc]) except ValueError: return False # Function to sanitize the URL def sanitize_url(url): return urljoin(url, urlparse(url).path) # Function to make a request with a specified timeout and handle exceptions def make_request(url, timeout=10): try: response = requests.get(url, timeout=timeout) response.raise_for_status() return response except requests.exceptions.RequestException as e: return "Error: " + str(e) def scrape_text(url): # Validate the input URL if not is_valid_url(url): return "Error: Invalid URL" # Sanitize the input URL sanitized_url = sanitize_url(url) # Make the request with a timeout and handle exceptions response = make_request(sanitized_url) if isinstance(response, str): return response soup = BeautifulSoup(response.text, "html.parser") for script in soup(["script", "style"]): script.extract() text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = '\n'.join(chunk for chunk in chunks if chunk) return text def extract_hyperlinks(soup): hyperlinks = [] for link in soup.find_all('a', href=True): hyperlinks.append((link.text, link['href'])) return hyperlinks def format_hyperlinks(hyperlinks): formatted_links = [] for link_text, link_url in hyperlinks: formatted_links.append(f"{link_text} ({link_url})") return formatted_links def scrape_links(url): response = requests.get(url) # Check if the response contains an HTTP error if response.status_code >= 400: return "error" soup = BeautifulSoup(response.text, "html.parser") for script in soup(["script", "style"]): script.extract() hyperlinks = extract_hyperlinks(soup) return format_hyperlinks(hyperlinks) def split_text(text, max_length=8192): paragraphs = text.split("\n") current_length = 0 current_chunk = [] for paragraph in paragraphs: if current_length + len(paragraph) + 1 <= max_length: current_chunk.append(paragraph) current_length += len(paragraph) + 1 else: yield "\n".join(current_chunk) current_chunk = [paragraph] current_length = len(paragraph) + 1 if current_chunk: yield "\n".join(current_chunk) def create_message(chunk, question): return { "role": "user", "content": f"\"\"\"{chunk}\"\"\" Using the above text, please answer the following question: \"{question}\" -- if the question cannot be answered using the text, please summarize the text." } def summarize_text(text, question): if not text: return "Error: No text to summarize" text_length = len(text) print(f"Text length: {text_length} characters") summaries = [] chunks = list(split_text(text)) for i, chunk in enumerate(chunks): print(f"Summarizing chunk {i + 1} / {len(chunks)}") messages = [create_message(chunk, question)] summary = create_chat_completion( model=cfg.fast_llm_model, messages=messages, max_tokens=300, ) summaries.append(summary) print(f"Summarized {len(chunks)} chunks.") combined_summary = "\n".join(summaries) messages = [create_message(combined_summary, question)] final_summary = create_chat_completion( model=cfg.fast_llm_model, messages=messages, max_tokens=300, ) return final_summary