From 4b0d59723ea6ef082348ad0bd84eb2c778ecf1e3 Mon Sep 17 00:00:00 2001 From: Believethehype <1097224+believethehype@users.noreply.github.com> Date: Fri, 21 Jun 2024 16:10:43 +0200 Subject: [PATCH] bugfixes --- nostr_dvm/bot.py | 4 +- nostr_dvm/utils/mediasource_utils.py | 43 +- nostr_dvm/utils/nip65_utils.py | 2 +- nostr_dvm/utils/nostr_utils.py | 17 +- nostr_dvm/utils/scrapper/media_scrapper.py | 565 +++------------------ setup.py | 4 +- tests/bot.py | 12 +- 7 files changed, 112 insertions(+), 535 deletions(-) diff --git a/nostr_dvm/bot.py b/nostr_dvm/bot.py index 9cf2f1d..229b125 100644 --- a/nostr_dvm/bot.py +++ b/nostr_dvm/bot.py @@ -414,7 +414,7 @@ class Bot: bolt11 = zaprequest(user.lud16, amount, "Zap", nostr_event, self.keys, self.dvm_config, "private") - if bolt11 == None: + if bolt11 is None: print("Receiver has no Lightning address") return try: @@ -428,7 +428,7 @@ class Bot: except Exception as e: - print(e) + print(str(e)) async def handle_nip90_response_event(nostr_event: Event): try: diff --git a/nostr_dvm/utils/mediasource_utils.py b/nostr_dvm/utils/mediasource_utils.py index 74223fc..899face 100644 --- a/nostr_dvm/utils/mediasource_utils.py +++ b/nostr_dvm/utils/mediasource_utils.py @@ -7,12 +7,15 @@ import ffmpegio import requests from nostr_dvm.utils.nostr_utils import get_event_by_id -from nostr_dvm.utils.scrapper.media_scrapper import OvercastDownload, XitterDownload, TiktokDownloadAll, \ - InstagramDownload, YouTubeDownload, XDownload +from nostr_dvm.utils.scrapper.media_scrapper import YTDownload async def input_data_file_duration(event, dvm_config, client, start=0, end=0): # print("[" + dvm_config.NIP89.NAME + "] Getting Duration of the Media file..") + if end != 0: + return end-start + + input_value = "" input_type = "" count = 0 @@ -37,7 +40,6 @@ async def input_data_file_duration(event, dvm_config, client, start=0, end=0): if input_type == "url": source_type = check_source_type(input_value) - filename, start, end, type = get_file_start_end_type(input_value, source_type, start, end, True) if type != "audio" and type != "video": return 1 @@ -217,7 +219,7 @@ def get_overcast(input_value, start, end): print("Found overcast.fm Link.. downloading") start_time = start end_time = end - download_overcast(input_value, filename) + download(input_value, filename) finaltag = str(input_value).replace("https://overcast.fm/", "").split('/') if start == 0.0: if len(finaltag) > 1: @@ -235,7 +237,7 @@ def get_overcast(input_value, start, end): def get_TikTok(input_value, start, end): filepath = os.path.abspath(os.curdir + r'/outputs/') try: - filename = download_tik_tok(input_value, filepath) + filename = download(input_value, filepath) print(filename) except Exception as e: print(e) @@ -246,7 +248,7 @@ def get_TikTok(input_value, start, end): def get_Instagram(input_value, start, end): filepath = os.path.abspath(os.curdir + r'/outputs/') try: - filename = download_instagram(input_value, filepath) + filename = download(input_value, filepath) print(filename) except Exception as e: print(e) @@ -258,7 +260,7 @@ def get_Twitter(input_value, start, end): filepath = os.path.abspath(os.curdir) + r'/outputs/' cleanlink = str(input_value).replace("twitter.com", "x.com") try: - filename = download_twitter(cleanlink, filepath) + filename = download(cleanlink, filepath) except Exception as e: print(e) return "", start, end @@ -270,7 +272,7 @@ def get_youtube(input_value, start, end, audioonly=True): print(filepath) filename = "" try: - filename = download_youtube(input_value, filepath, audioonly) + filename = download(input_value, filepath, audioonly) except Exception as e: print("Youtube " + str(e)) @@ -340,26 +342,5 @@ def get_media_link(url) -> (str, str): return None, None -def download_overcast(source_url, target_location): - result = OvercastDownload(source_url, target_location) - return result - - -def download_twitter(videourl, path): - result = XDownload(videourl, path + "x.mp4") - #result = XitterDownload(videourl, path + "x.mp4") - return result - - -def download_tik_tok(videourl, path): - result = TiktokDownloadAll([videourl], path) - return result - - -def download_instagram(videourl, path): - result = InstagramDownload(videourl, "insta", path) - return result - - -def download_youtube(link, path, audioonly=True): - return YouTubeDownload(link, path, audio_only=audioonly) +def download(videourl, path, audioonly=False): + return YTDownload(videourl, path, audio_only=False) \ No newline at end of file diff --git a/nostr_dvm/utils/nip65_utils.py b/nostr_dvm/utils/nip65_utils.py index 53d3af1..a956c0a 100644 --- a/nostr_dvm/utils/nip65_utils.py +++ b/nostr_dvm/utils/nip65_utils.py @@ -18,4 +18,4 @@ async def nip65_announce_relays(dvm_config, client): event = EventBuilder(EventDefinitions.KIND_RELAY_ANNOUNCEMENT, content, tags).to_event(keys) eventid = await send_event(event, client=client, dvm_config=dvm_config, blastr=True) - print(bcolors.BLUE + "[" + dvm_config.NIP89.NAME + "] Announced NIP 65 for " + dvm_config.NIP89.NAME +" (EventID: " + eventid.to_hex() +")" + bcolors.ENDC) + print(bcolors.BLUE + "[" + dvm_config.NIP89.NAME + "] Announced NIP 65 for " + dvm_config.NIP89.NAME +" (EventID: " + str(eventid.to_hex()) +")" + bcolors.ENDC) diff --git a/nostr_dvm/utils/nostr_utils.py b/nostr_dvm/utils/nostr_utils.py index 51425a5..4fd0751 100644 --- a/nostr_dvm/utils/nostr_utils.py +++ b/nostr_dvm/utils/nostr_utils.py @@ -305,8 +305,8 @@ def check_and_decrypt_own_tags(event, dvm_config): async def update_profile(dvm_config, client, lud16=""): keys = Keys.parse(dvm_config.PRIVATE_KEY) - nip89content = json.loads(dvm_config.NIP89.CONTENT) - if nip89content.get("name"): + try: + nip89content = json.loads(dvm_config.NIP89.CONTENT) name = nip89content.get("name") about = nip89content.get("about") image = nip89content.get("image") @@ -320,9 +320,16 @@ async def update_profile(dvm_config, client, lud16=""): .set_lud16(lud16) \ .set_nip05(lud16) # .set_banner("https://example.com/banner.png") \ - print("[" + dvm_config.NIP89.NAME + "] Setting profile metadata for " + keys.public_key().to_bech32() + "...") - print(metadata.as_json()) - await client.set_metadata(metadata) + + + except: + metadata = Metadata() \ + .set_lud16(lud16) \ + .set_nip05(lud16) + + print("[" + dvm_config.NIP89.NAME + "] Setting profile metadata for " + keys.public_key().to_bech32() + "...") + print(metadata.as_json()) + await client.set_metadata(metadata) def check_and_set_private_key(identifier): diff --git a/nostr_dvm/utils/scrapper/media_scrapper.py b/nostr_dvm/utils/scrapper/media_scrapper.py index d656517..6231bd3 100644 --- a/nostr_dvm/utils/scrapper/media_scrapper.py +++ b/nostr_dvm/utils/scrapper/media_scrapper.py @@ -1,14 +1,11 @@ -import json -import os -import re -import sys -import urllib.parse + from typing import Any from urllib.request import urlopen, Request import requests import instaloader -from pytube import YouTube +import json +import yt_dlp import sys import os import re @@ -19,461 +16,13 @@ import bs4 from tqdm import tqdm from pathlib import Path - -# Depricated, currently not functional -def XitterDownload(source_url, target_location): - script_dir = os.path.dirname(os.path.realpath(__file__)) - request_details_file = f"{script_dir}{os.sep}request_details.json" - request_details = json.load(open(request_details_file, "r")) # test - features, variables = request_details["features"], request_details["variables"] - - def get_tokens(tweet_url): - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0", - "Accept": "*/*", - "Accept-Language": "de,en-US;q=0.7,en;q=0.3", - "Accept-Encoding": "gzip, deflate, br", - "TE": "trailers", - } - - html = requests.get(tweet_url, headers=headers) - - assert ( - html.status_code == 200 - ), f"Failed to get tweet page. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {html.status_code}. Tweet url: {tweet_url}" - - mainjs_url = re.findall( - r"https://abs.twimg.com/responsive-web/client-web-legacy/main.[^\.]+.js", - html.text, - ) - - assert ( - mainjs_url is not None and len(mainjs_url) > 0 - ), f"Failed to find main.js file. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}" - - mainjs_url = mainjs_url[0] - mainjs = requests.get(mainjs_url) - - assert ( - mainjs.status_code == 200 - ), f"Failed to get main.js file. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {mainjs.status_code}. Tweet url: {tweet_url}" - - bearer_token = re.findall(r'AAAAAAAAA[^"]+', mainjs.text) - - assert ( - bearer_token is not None and len(bearer_token) > 0 - ), f"Failed to find bearer token. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}, main.js url: {mainjs_url}" - - bearer_token = bearer_token[0] - - # get the guest token - with requests.Session() as s: - s.headers.update( - { - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0", - "accept": "*/*", - "accept-language": "de,en-US;q=0.7,en;q=0.3", - "accept-encoding": "gzip, deflate, br", - "te": "trailers", - } - ) - - s.headers.update({"authorization": f"Bearer {bearer_token}"}) - - # activate bearer token and get guest token - guest_token = s.post("https://api.twitter.com/1.1/guest/activate.json").json()[ - "guest_token" - ] - - assert ( - guest_token is not None - ), f"Failed to find guest token. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}, main.js url: {mainjs_url}" - - return bearer_token, guest_token - - def get_details_url(tweet_id, features, variables): - # create a copy of variables - we don't want to modify the original - variables = {**variables} - variables["tweetId"] = tweet_id - - return f"https://twitter.com/i/api/graphql/0hWvDhmW8YQ-S_ib3azIrw/TweetResultByRestId?variables={urllib.parse.quote(json.dumps(variables))}&features={urllib.parse.quote(json.dumps(features))}" - # return f"https://api.twitter.com/graphql/ncDeACNGIApPMaqGVuF_rw/TweetResultByRestId?variables={urllib.parse.quote(json.dumps(variables))}&features={urllib.parse.quote(json.dumps(features))}" - - def get_tweet_details(tweet_url, guest_token, bearer_token): - tweet_id = re.findall(r"(?<=status/)\d+", tweet_url) - - assert ( - tweet_id is not None and len(tweet_id) == 1 - ), f"Could not parse tweet id from your url. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {tweet_url}" - - tweet_id = tweet_id[0] - - # the url needs a url encoded version of variables and features as a query string - url = get_details_url(tweet_id, features, variables) - - details = requests.get( - url, - headers={ - "authorization": f"Bearer {bearer_token}", - "x-guest-token": guest_token, - }, - ) - - max_retries = 10 - cur_retry = 0 - while details.status_code == 400 and cur_retry < max_retries: - try: - error_json = json.loads(details.text) - except json.JSONDecodeError: - assert ( - False - ), f"Failed to parse json from details error. details text: {details.text} If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}" - - assert ( - "errors" in error_json - ), f"Failed to find errors in details error json. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}" - - needed_variable_pattern = re.compile(r"Variable '([^']+)'") - needed_features_pattern = re.compile( - r'The following features cannot be null: ([^"]+)' - ) - - for error in error_json["errors"]: - needed_vars = needed_variable_pattern.findall(error["message"]) - for needed_var in needed_vars: - variables[needed_var] = True - - needed_features = needed_features_pattern.findall(error["message"]) - for nf in needed_features: - for feature in nf.split(","): - features[feature.strip()] = True - - url = get_details_url(tweet_id, features, variables) - - details = requests.get( - url, - headers={ - "authorization": f"Bearer {bearer_token}", - "x-guest-token": guest_token, - }, - ) - - cur_retry += 1 - - if details.status_code == 200: - # save new variables - request_details["variables"] = variables - request_details["features"] = features - - with open(request_details_file, "w") as f: - json.dump(request_details, f, indent=4) - - assert ( - details.status_code == 200 - ), f"Failed to get tweet details. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}" - - return details - - def get_tweet_status_id(tweet_url): - sid_patern = r'https://(?:x\.com|twitter\.com)/[^/]+/status/(\d+)' - if tweet_url[len(tweet_url) - 1] != "/": - tweet_url = tweet_url + "/" - - match = re.findall(sid_patern, tweet_url) - if len(match) == 0: - print("error, could not get status id from this tweet url :", tweet_url) - exit() - status_id = match[0] - return status_id - - def get_associated_media_id(j, tweet_url): - sid = get_tweet_status_id(tweet_url) - pattern = ( - r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/' - + sid - + r'/[^"]+",\s*"id_str"\s*:\s*"\d+",' - ) - matches = re.findall(pattern, j) - if len(matches) > 0: - target = matches[0] - target = target[0: len(target) - 1] # remove the coma at the end - return json.loads("{" + target + "}")["id_str"] - return None - - def extract_mp4s(j, tweet_url, target_all_mp4s=False): - # pattern looks like https://video.twimg.com/amplify_video/1638969830442237953/vid/1080x1920/lXSFa54mAVp7KHim.mp4?tag=16 or https://video.twimg.com/ext_tw_video/1451958820348080133/pu/vid/720x1280/GddnMJ7KszCQQFvA.mp4?tag=12 - amplitude_pattern = re.compile( - r"(https://video.twimg.com/amplify_video/(\d+)/vid/(\d+x\d+)/[^.]+.mp4\?tag=\d+)" - ) - ext_tw_pattern = re.compile( - r"(https://video.twimg.com/ext_tw_video/(\d+)/pu/vid/(avc1/)?(\d+x\d+)/[^.]+.mp4\?tag=\d+)" - ) - # format - https://video.twimg.com/tweet_video/Fvh6brqWAAQhU9p.mp4 - tweet_video_pattern = re.compile(r'https://video.twimg.com/tweet_video/[^"]+') - - # https://video.twimg.com/ext_tw_video/1451958820348080133/pu/pl/b-CiC-gZClIwXgDz.m3u8?tag=12&container=fmp4 - container_pattern = re.compile(r'https://video.twimg.com/[^"]*container=fmp4') - media_id = get_associated_media_id(j, tweet_url) - # find all the matches - matches = amplitude_pattern.findall(j) - matches += ext_tw_pattern.findall(j) - container_matches = container_pattern.findall(j) - - tweet_video_matches = tweet_video_pattern.findall(j) - - if len(matches) == 0 and len(tweet_video_matches) > 0: - return tweet_video_matches - - results = {} - - for match in matches: - url, tweet_id, _, resolution = match - if tweet_id not in results: - results[tweet_id] = {"resolution": resolution, "url": url} - else: - # if we already have a higher resolution video, then don't overwrite it - my_dims = [int(x) for x in resolution.split("x")] - their_dims = [int(x) for x in results[tweet_id]["resolution"].split("x")] - - if my_dims[0] * my_dims[1] > their_dims[0] * their_dims[1]: - results[tweet_id] = {"resolution": resolution, "url": url} - - if media_id: - all_urls = [] - for twid in results: - all_urls.append(results[twid]["url"]) - all_urls += container_matches - - url_with_media_id = [] - for url in all_urls: - if url.__contains__(media_id): - url_with_media_id.append(url) - - if len(url_with_media_id) > 0: - return url_with_media_id - - if len(container_matches) > 0 and not target_all_mp4s: - return container_matches - - if target_all_mp4s: - urls = [x["url"] for x in results.values()] - urls += container_matches - return urls - return [x["url"] for x in results.values()] - - def extract_mp4_fmp4(j): - """ - Extract the URL of the MP4 video from the detailed information of the tweet. - Returns a list of URLs, tweet IDs, and resolution information (dictionary type) - and a list of tweet IDs as return values. - """ - - # Empty list to store tweet IDs - tweet_id_list = [] - mp4_info_dict_list = [] - amplitude_pattern = re.compile( - r"(https://video.twimg.com/amplify_video/(\d+)/vid/(avc1/)(\d+x\d+)/[^.]+.mp4\?tag=\d+)" - ) - ext_tw_pattern = re.compile( - r"(https://video.twimg.com/ext_tw_video/(\d+)/pu/vid/(avc1/)?(\d+x\d+)/[^.]+.mp4\?tag=\d+)" - ) - tweet_video_pattern = re.compile(r'https://video.twimg.com/tweet_video/[^"]+') - container_pattern = re.compile(r'https://video.twimg.com/[^"]*container=fmp4') - - matches = amplitude_pattern.findall(j) - matches += ext_tw_pattern.findall(j) - container_matches = container_pattern.findall(j) - tweet_video_url_list = tweet_video_pattern.findall(j) - - for match in matches: - url, tweet_id, _, resolution = match - tweet_id_list.append(int(tweet_id)) - mp4_info_dict_list.append({"resolution": resolution, "url": url}) - - tweet_id_list = list(dict.fromkeys(tweet_id_list)) - - if len(container_matches) > 0: - for url in container_matches: - mp4_info_dict_list.append({"url": url}) - - return tweet_id_list, mp4_info_dict_list, tweet_video_url_list - - def download_parts(url, output_filename): - resp = requests.get(url, stream=True) - pattern = re.compile(r"(/[^\n]*/(\d+x\d+)/[^\n]*container=fmp4)") - matches = pattern.findall(resp.text) - max_res = 0 - max_res_url = None - - for match in matches: - url, resolution = match - width, height = resolution.split("x") - res = int(width) * int(height) - if res > max_res: - max_res = res - max_res_url = url - - assert ( - max_res_url is not None - ), f"Could not find a url to download from. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {url}" - - video_part_prefix = "https://video.twimg.com" - - resp = requests.get(video_part_prefix + max_res_url, stream=True) - - mp4_pattern = re.compile(r"(/[^\n]*\.mp4)") - mp4_parts = mp4_pattern.findall(resp.text) - - assert ( - len(mp4_parts) == 1 - ), f"There should be exactly 1 mp4 container at this point. Instead, found {len(mp4_parts)}. Please open a GitHub issue and copy and paste this message into it. Tweet url: {url}" - - mp4_url = video_part_prefix + mp4_parts[0] - - m4s_part_pattern = re.compile(r"(/[^\n]*\.m4s)") - m4s_parts = m4s_part_pattern.findall(resp.text) - - with open(output_filename, "wb") as f: - r = requests.get(mp4_url, stream=True) - for chunk in r.iter_content(chunk_size=1024): - if chunk: - f.write(chunk) - f.flush() - - for part in m4s_parts: - part_url = video_part_prefix + part - r = requests.get(part_url, stream=True) - for chunk in r.iter_content(chunk_size=1024): - if chunk: - f.write(chunk) - f.flush() - - return True - - def repost_check(j, exclude_replies=True): - try: - reply_index = j.index('"conversationthread-') - except ValueError: - reply_index = len(j) - if exclude_replies: - j = j[0:reply_index] - - # We use this regular expression to extract the source status - source_status_pattern = r'"source_status_id_str"\s*:\s*"\d+"' - matches = re.findall(source_status_pattern, j) - - if len(matches) > 0 and exclude_replies: - # We extract the source status id (ssid) - ssid = json.loads("{" + matches[0] + "}")["source_status_id_str"] - # We plug it in this regular expression to find expanded_url (the original tweet url) - expanded_url_pattern = ( - r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/' + ssid + '[^"]+"' - ) - matches2 = re.findall(expanded_url_pattern, j) - - if len(matches2) > 0: - # We extract the url and return it - status_url = json.loads("{" + matches2[0] + "}")["expanded_url"] - return status_url - - if not exclude_replies: - # If we include replies we'll have to get all ssids and remove duplicates - ssids = [] - for match in matches: - ssids.append(json.loads("{" + match + "}")["source_status_id_str"]) - # we remove duplicates (this line is messy but it's the easiest way to do it) - ssids = list(set(ssids)) - if len(ssids) > 0: - for ssid in ssids: - expanded_url_pattern = ( - r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/' - + ssid - + '[^"]+"' - ) - matches2 = re.findall(expanded_url_pattern, j) - if len(matches2) > 0: - status_urls = [] - for match in matches2: - status_urls.append( - json.loads("{" + match + "}")["expanded_url"] - ) - # We remove duplicates another time - status_urls = list(set(status_urls)) - return status_urls - - # If we don't find source_status_id_str, the tweet doesn't feature a reposted video - return None - - def download_video_from_x(tweet_url, output_file, target_all_videos=False): - bearer_token, guest_token = get_tokens(tweet_url) - resp = get_tweet_details(tweet_url, guest_token, bearer_token) - mp4s = extract_mp4s(resp.text, tweet_url, target_all_videos) - - if target_all_videos: - video_counter = 1 - original_urls = repost_check(resp.text, exclude_replies=False) - - if len(original_urls) > 0: - for url in original_urls: - download_video_from_x( - url, output_file.replace(".mp4", f"_{video_counter}.mp4") - ) - video_counter += 1 - if len(mp4s) > 0: - for mp4 in mp4s: - output_file = output_file.replace(".mp4", f"_{video_counter}.mp4") - if "container" in mp4: - download_parts(mp4, output_file) - - else: - # use a stream to download the file - r = requests.get(mp4, stream=True) - with open(output_file, "wb") as f: - for chunk in r.iter_content(chunk_size=1024): - if chunk: - f.write(chunk) - f.flush() - video_counter += 1 - else: - original_url = repost_check(resp.text) - - if original_url: - download_video_from_x(original_url, output_file) - else: - assert ( - len(mp4s) > 0 - ), f"Could not find any mp4s to download. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {tweet_url}" - - mp4 = mp4s[0] - if "container" in mp4: - download_parts(mp4, output_file) - else: - # use a stream to download the file - r = requests.get(mp4, stream=True) - with open(output_file, "wb") as f: - for chunk in r.iter_content(chunk_size=1024): - if chunk: - f.write(chunk) - f.flush() - return target_location - - return download_video_from_x(source_url, target_location) - +browser = "chrome" #"firefox" def download_xvideo(url, target_location) -> None: - """Download a video from a URL into a filename. - - Args: - url (str): The video URL to download - file_name (str): The file name or path to save the video to. - """ - response = requests.get(url, stream=True) total_size = int(response.headers.get("content-length", 0)) block_size = 1024 progress_bar = tqdm(total=total_size, unit="B", unit_scale=True) - - #download_path = os.path.join(Path.home(), "Downloads", file_name) download_path = target_location with open(download_path, "wb") as file: for data in response.iter_content(block_size): @@ -485,12 +34,6 @@ def download_xvideo(url, target_location) -> None: def XDownload(url, filepath=""): - """Extract the highest quality video url to download into a file - - Args: - url (str): The twitter post URL to download from - """ - api_url = f"https://twitsave.com/info?url={url}" response = requests.get(api_url) @@ -498,20 +41,10 @@ def XDownload(url, filepath=""): download_button = data.find_all("div", class_="origin-top-right")[0] quality_buttons = download_button.find_all("a") highest_quality_url = quality_buttons[0].get("href") # Highest quality video url - - #file_name = data.find_all("div", class_="leading-tight")[0].find_all("p", class_="m-2")[0].text # Video file name - #file_name = re.sub(r"[^a-zA-Z0-9]+", ' ', file_name).strip() + ".mp4" # Remove special characters from file name - download_xvideo(highest_quality_url, filepath) return filepath - - - - - -# TIKTOK/INSTA def getDict() -> dict: response = requests.get('https://ttdownloader.com/') point = response.text.find(' str: for i in linklist: try: @@ -615,34 +147,81 @@ def InstagramDownloadAll(linklist, path) -> str: print(err) exit(1) +def YTDownload(link, path, audio_only=True): -# YOUTUBE -def YouTubeDownload(link, path, audio_only=True): - youtubeObject = YouTube(link) if audio_only: - youtubeObject = youtubeObject.streams.get_audio_only() - youtubeObject.download(path, "yt.mp3") - print("Download is completed successfully") - return path + "yt.mp3" + return get_audio([link]) else: - youtubeObject = youtubeObject.streams.get_highest_resolution() - youtubeObject.download(path, "yt.mp4") - print("Download is completed successfully") - return path + "yt.mp4" + return get_video([link]) -def checkYoutubeLinkValid(link): +def get_media_duration(url): try: - # TODO find a way to test without fully downloading the file - youtubeObject = YouTube(link) - youtubeObject = youtubeObject.streams.get_audio_only() - youtubeObject.download(".", "yt.mp3") - os.remove("yt.mp3") - return True + # ℹ️ See help(yt_dlp.YoutubeDL) for a list of available options and public functions + ydl_opts = { + 'cookiesfrombrowser': (browser, None, None, None), + } + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=False) - except Exception as e: - print(str(e)) - return False + # ℹ️ ydl.sanitize_info makes the info json-serializable + return float(json.dumps(ydl.sanitize_info(info)["duration"])) + except: + return None + +def get_media_info(url): + try: + # ℹ️ See help(yt_dlp.YoutubeDL) for a list of available options and public functions + ydl_opts = { + 'cookiesfrombrowser': (browser, None, None, None), + } + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=False) + + # ℹ️ ydl.sanitize_info makes the info json-serializable + return json.dumps(ydl.sanitize_info(info)) + except: + return None + + +def get_audio(URLS): + try: + ydl_opts = { + 'cookiesfrombrowser': (browser, None, None, None), + 'format': 'm4a/bestaudio/best', + "outtmpl": 'outputs/audio', + 'overwrites': 'True', + # ℹ️ See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments + 'postprocessors': [{ # Extract audio using ffmpeg + 'key': 'FFmpegExtractAudio', + 'preferredcodec': 'mp3', + }] + } + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + error_code = ydl.download(URLS) + + return "outputs/audio.mp3" + except: + return None + + +def get_video(URLS): + try: + ydl_opts = { + 'cookiesfrombrowser': (browser, None, None, None), + 'format': 'mp4', + 'overwrites': 'True', + # "outtmpl": '/%(uploader)s_%(title)s.%(ext)s', + "outtmpl": 'outputs/video.mp4', + } + + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + ydl.download(URLS) + return "outputs/video.mp4" + + except: + return None # OVERCAST diff --git a/setup.py b/setup.py index 4398500..fc9e734 100644 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ setup( install_requires=["nostr-sdk==0.32.2", "bech32==1.2.0", "pycryptodome==3.20.0", + "yt-dlp==2024.5.27", "python-dotenv==1.0.0", "emoji==2.12.1", "ffmpegio==0.9.1", @@ -24,9 +25,8 @@ setup( "Pillow==10.1.0", "PyUpload==0.1.4", "requests==2.32.3", - "instaloader==4.10.1", - "pytube==15.0.0", "moviepy==2.0.0.dev2", + "instaloader==4.11", "zipp==3.17.0", "urllib3==2.2.1", "networkx==3.3", diff --git a/tests/bot.py b/tests/bot.py index 42ca4df..4b95ec0 100644 --- a/tests/bot.py +++ b/tests/bot.py @@ -8,7 +8,7 @@ import dotenv from nostr_sdk import Keys from nostr_dvm.bot import Bot -from nostr_dvm.tasks import textextraction_pdf +from nostr_dvm.tasks import textextraction_pdf, convert_media from nostr_dvm.utils.admin_utils import AdminConfig from nostr_dvm.utils.backend_utils import keep_alive from nostr_dvm.utils.definitions import EventDefinitions @@ -26,6 +26,7 @@ def playground(): bot_config.PRIVATE_KEY = check_and_set_private_key(identifier) npub = Keys.parse(bot_config.PRIVATE_KEY).public_key().to_bech32() invoice_key, admin_key, wallet_id, user_id, lnaddress = check_and_set_ln_bits_keys(identifier, npub) + bot_config.LN_ADDRESS = lnaddress bot_config.LNBITS_INVOICE_KEY = invoice_key bot_config.LNBITS_ADMIN_KEY = admin_key # The dvm might pay failed jobs back bot_config.LNBITS_URL = os.getenv("LNBITS_HOST") @@ -46,8 +47,17 @@ def playground(): bot_config.SUPPORTED_DVMS.append(ymhm_external) + admin_config_media = AdminConfig() + admin_config_media.UPDATE_PROFILE = True + admin_config_media.REBROADCAST_NIP65_RELAY_LIST = True + media_bringer = convert_media.build_example("Nostr AI DVM Media Converter", + "media_converter", admin_config_media) + bot_config.SUPPORTED_DVMS.append(media_bringer) + media_bringer.run() + admin_config = AdminConfig() admin_config.REBROADCAST_NIP65_RELAY_LIST = True + admin_config.UPDATE_PROFILE = True x = threading.Thread(target=Bot, args=([bot_config, admin_config])) x.start()