This commit is contained in:
Believethehype
2024-06-21 16:10:43 +02:00
parent 0bc8e37b1c
commit 4b0d59723e
7 changed files with 112 additions and 535 deletions

View File

@@ -414,7 +414,7 @@ class Bot:
bolt11 = zaprequest(user.lud16, amount, "Zap", nostr_event, self.keys,
self.dvm_config,
"private")
if bolt11 == None:
if bolt11 is None:
print("Receiver has no Lightning address")
return
try:
@@ -428,7 +428,7 @@ class Bot:
except Exception as e:
print(e)
print(str(e))
async def handle_nip90_response_event(nostr_event: Event):
try:

View File

@@ -7,12 +7,15 @@ import ffmpegio
import requests
from nostr_dvm.utils.nostr_utils import get_event_by_id
from nostr_dvm.utils.scrapper.media_scrapper import OvercastDownload, XitterDownload, TiktokDownloadAll, \
InstagramDownload, YouTubeDownload, XDownload
from nostr_dvm.utils.scrapper.media_scrapper import YTDownload
async def input_data_file_duration(event, dvm_config, client, start=0, end=0):
# print("[" + dvm_config.NIP89.NAME + "] Getting Duration of the Media file..")
if end != 0:
return end-start
input_value = ""
input_type = ""
count = 0
@@ -37,7 +40,6 @@ async def input_data_file_duration(event, dvm_config, client, start=0, end=0):
if input_type == "url":
source_type = check_source_type(input_value)
filename, start, end, type = get_file_start_end_type(input_value, source_type, start, end, True)
if type != "audio" and type != "video":
return 1
@@ -217,7 +219,7 @@ def get_overcast(input_value, start, end):
print("Found overcast.fm Link.. downloading")
start_time = start
end_time = end
download_overcast(input_value, filename)
download(input_value, filename)
finaltag = str(input_value).replace("https://overcast.fm/", "").split('/')
if start == 0.0:
if len(finaltag) > 1:
@@ -235,7 +237,7 @@ def get_overcast(input_value, start, end):
def get_TikTok(input_value, start, end):
filepath = os.path.abspath(os.curdir + r'/outputs/')
try:
filename = download_tik_tok(input_value, filepath)
filename = download(input_value, filepath)
print(filename)
except Exception as e:
print(e)
@@ -246,7 +248,7 @@ def get_TikTok(input_value, start, end):
def get_Instagram(input_value, start, end):
filepath = os.path.abspath(os.curdir + r'/outputs/')
try:
filename = download_instagram(input_value, filepath)
filename = download(input_value, filepath)
print(filename)
except Exception as e:
print(e)
@@ -258,7 +260,7 @@ def get_Twitter(input_value, start, end):
filepath = os.path.abspath(os.curdir) + r'/outputs/'
cleanlink = str(input_value).replace("twitter.com", "x.com")
try:
filename = download_twitter(cleanlink, filepath)
filename = download(cleanlink, filepath)
except Exception as e:
print(e)
return "", start, end
@@ -270,7 +272,7 @@ def get_youtube(input_value, start, end, audioonly=True):
print(filepath)
filename = ""
try:
filename = download_youtube(input_value, filepath, audioonly)
filename = download(input_value, filepath, audioonly)
except Exception as e:
print("Youtube " + str(e))
@@ -340,26 +342,5 @@ def get_media_link(url) -> (str, str):
return None, None
def download_overcast(source_url, target_location):
result = OvercastDownload(source_url, target_location)
return result
def download_twitter(videourl, path):
result = XDownload(videourl, path + "x.mp4")
#result = XitterDownload(videourl, path + "x.mp4")
return result
def download_tik_tok(videourl, path):
result = TiktokDownloadAll([videourl], path)
return result
def download_instagram(videourl, path):
result = InstagramDownload(videourl, "insta", path)
return result
def download_youtube(link, path, audioonly=True):
return YouTubeDownload(link, path, audio_only=audioonly)
def download(videourl, path, audioonly=False):
return YTDownload(videourl, path, audio_only=False)

View File

@@ -18,4 +18,4 @@ async def nip65_announce_relays(dvm_config, client):
event = EventBuilder(EventDefinitions.KIND_RELAY_ANNOUNCEMENT, content, tags).to_event(keys)
eventid = await send_event(event, client=client, dvm_config=dvm_config, blastr=True)
print(bcolors.BLUE + "[" + dvm_config.NIP89.NAME + "] Announced NIP 65 for " + dvm_config.NIP89.NAME +" (EventID: " + eventid.to_hex() +")" + bcolors.ENDC)
print(bcolors.BLUE + "[" + dvm_config.NIP89.NAME + "] Announced NIP 65 for " + dvm_config.NIP89.NAME +" (EventID: " + str(eventid.to_hex()) +")" + bcolors.ENDC)

View File

@@ -305,8 +305,8 @@ def check_and_decrypt_own_tags(event, dvm_config):
async def update_profile(dvm_config, client, lud16=""):
keys = Keys.parse(dvm_config.PRIVATE_KEY)
nip89content = json.loads(dvm_config.NIP89.CONTENT)
if nip89content.get("name"):
try:
nip89content = json.loads(dvm_config.NIP89.CONTENT)
name = nip89content.get("name")
about = nip89content.get("about")
image = nip89content.get("image")
@@ -320,9 +320,16 @@ async def update_profile(dvm_config, client, lud16=""):
.set_lud16(lud16) \
.set_nip05(lud16)
# .set_banner("https://example.com/banner.png") \
print("[" + dvm_config.NIP89.NAME + "] Setting profile metadata for " + keys.public_key().to_bech32() + "...")
print(metadata.as_json())
await client.set_metadata(metadata)
except:
metadata = Metadata() \
.set_lud16(lud16) \
.set_nip05(lud16)
print("[" + dvm_config.NIP89.NAME + "] Setting profile metadata for " + keys.public_key().to_bech32() + "...")
print(metadata.as_json())
await client.set_metadata(metadata)
def check_and_set_private_key(identifier):

View File

@@ -1,14 +1,11 @@
import json
import os
import re
import sys
import urllib.parse
from typing import Any
from urllib.request import urlopen, Request
import requests
import instaloader
from pytube import YouTube
import json
import yt_dlp
import sys
import os
import re
@@ -19,461 +16,13 @@ import bs4
from tqdm import tqdm
from pathlib import Path
# Depricated, currently not functional
def XitterDownload(source_url, target_location):
script_dir = os.path.dirname(os.path.realpath(__file__))
request_details_file = f"{script_dir}{os.sep}request_details.json"
request_details = json.load(open(request_details_file, "r")) # test
features, variables = request_details["features"], request_details["variables"]
def get_tokens(tweet_url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0",
"Accept": "*/*",
"Accept-Language": "de,en-US;q=0.7,en;q=0.3",
"Accept-Encoding": "gzip, deflate, br",
"TE": "trailers",
}
html = requests.get(tweet_url, headers=headers)
assert (
html.status_code == 200
), f"Failed to get tweet page. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {html.status_code}. Tweet url: {tweet_url}"
mainjs_url = re.findall(
r"https://abs.twimg.com/responsive-web/client-web-legacy/main.[^\.]+.js",
html.text,
)
assert (
mainjs_url is not None and len(mainjs_url) > 0
), f"Failed to find main.js file. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}"
mainjs_url = mainjs_url[0]
mainjs = requests.get(mainjs_url)
assert (
mainjs.status_code == 200
), f"Failed to get main.js file. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {mainjs.status_code}. Tweet url: {tweet_url}"
bearer_token = re.findall(r'AAAAAAAAA[^"]+', mainjs.text)
assert (
bearer_token is not None and len(bearer_token) > 0
), f"Failed to find bearer token. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}, main.js url: {mainjs_url}"
bearer_token = bearer_token[0]
# get the guest token
with requests.Session() as s:
s.headers.update(
{
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0",
"accept": "*/*",
"accept-language": "de,en-US;q=0.7,en;q=0.3",
"accept-encoding": "gzip, deflate, br",
"te": "trailers",
}
)
s.headers.update({"authorization": f"Bearer {bearer_token}"})
# activate bearer token and get guest token
guest_token = s.post("https://api.twitter.com/1.1/guest/activate.json").json()[
"guest_token"
]
assert (
guest_token is not None
), f"Failed to find guest token. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}, main.js url: {mainjs_url}"
return bearer_token, guest_token
def get_details_url(tweet_id, features, variables):
# create a copy of variables - we don't want to modify the original
variables = {**variables}
variables["tweetId"] = tweet_id
return f"https://twitter.com/i/api/graphql/0hWvDhmW8YQ-S_ib3azIrw/TweetResultByRestId?variables={urllib.parse.quote(json.dumps(variables))}&features={urllib.parse.quote(json.dumps(features))}"
# return f"https://api.twitter.com/graphql/ncDeACNGIApPMaqGVuF_rw/TweetResultByRestId?variables={urllib.parse.quote(json.dumps(variables))}&features={urllib.parse.quote(json.dumps(features))}"
def get_tweet_details(tweet_url, guest_token, bearer_token):
tweet_id = re.findall(r"(?<=status/)\d+", tweet_url)
assert (
tweet_id is not None and len(tweet_id) == 1
), f"Could not parse tweet id from your url. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {tweet_url}"
tweet_id = tweet_id[0]
# the url needs a url encoded version of variables and features as a query string
url = get_details_url(tweet_id, features, variables)
details = requests.get(
url,
headers={
"authorization": f"Bearer {bearer_token}",
"x-guest-token": guest_token,
},
)
max_retries = 10
cur_retry = 0
while details.status_code == 400 and cur_retry < max_retries:
try:
error_json = json.loads(details.text)
except json.JSONDecodeError:
assert (
False
), f"Failed to parse json from details error. details text: {details.text} If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}"
assert (
"errors" in error_json
), f"Failed to find errors in details error json. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}"
needed_variable_pattern = re.compile(r"Variable '([^']+)'")
needed_features_pattern = re.compile(
r'The following features cannot be null: ([^"]+)'
)
for error in error_json["errors"]:
needed_vars = needed_variable_pattern.findall(error["message"])
for needed_var in needed_vars:
variables[needed_var] = True
needed_features = needed_features_pattern.findall(error["message"])
for nf in needed_features:
for feature in nf.split(","):
features[feature.strip()] = True
url = get_details_url(tweet_id, features, variables)
details = requests.get(
url,
headers={
"authorization": f"Bearer {bearer_token}",
"x-guest-token": guest_token,
},
)
cur_retry += 1
if details.status_code == 200:
# save new variables
request_details["variables"] = variables
request_details["features"] = features
with open(request_details_file, "w") as f:
json.dump(request_details, f, indent=4)
assert (
details.status_code == 200
), f"Failed to get tweet details. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}"
return details
def get_tweet_status_id(tweet_url):
sid_patern = r'https://(?:x\.com|twitter\.com)/[^/]+/status/(\d+)'
if tweet_url[len(tweet_url) - 1] != "/":
tweet_url = tweet_url + "/"
match = re.findall(sid_patern, tweet_url)
if len(match) == 0:
print("error, could not get status id from this tweet url :", tweet_url)
exit()
status_id = match[0]
return status_id
def get_associated_media_id(j, tweet_url):
sid = get_tweet_status_id(tweet_url)
pattern = (
r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/'
+ sid
+ r'/[^"]+",\s*"id_str"\s*:\s*"\d+",'
)
matches = re.findall(pattern, j)
if len(matches) > 0:
target = matches[0]
target = target[0: len(target) - 1] # remove the coma at the end
return json.loads("{" + target + "}")["id_str"]
return None
def extract_mp4s(j, tweet_url, target_all_mp4s=False):
# pattern looks like https://video.twimg.com/amplify_video/1638969830442237953/vid/1080x1920/lXSFa54mAVp7KHim.mp4?tag=16 or https://video.twimg.com/ext_tw_video/1451958820348080133/pu/vid/720x1280/GddnMJ7KszCQQFvA.mp4?tag=12
amplitude_pattern = re.compile(
r"(https://video.twimg.com/amplify_video/(\d+)/vid/(\d+x\d+)/[^.]+.mp4\?tag=\d+)"
)
ext_tw_pattern = re.compile(
r"(https://video.twimg.com/ext_tw_video/(\d+)/pu/vid/(avc1/)?(\d+x\d+)/[^.]+.mp4\?tag=\d+)"
)
# format - https://video.twimg.com/tweet_video/Fvh6brqWAAQhU9p.mp4
tweet_video_pattern = re.compile(r'https://video.twimg.com/tweet_video/[^"]+')
# https://video.twimg.com/ext_tw_video/1451958820348080133/pu/pl/b-CiC-gZClIwXgDz.m3u8?tag=12&container=fmp4
container_pattern = re.compile(r'https://video.twimg.com/[^"]*container=fmp4')
media_id = get_associated_media_id(j, tweet_url)
# find all the matches
matches = amplitude_pattern.findall(j)
matches += ext_tw_pattern.findall(j)
container_matches = container_pattern.findall(j)
tweet_video_matches = tweet_video_pattern.findall(j)
if len(matches) == 0 and len(tweet_video_matches) > 0:
return tweet_video_matches
results = {}
for match in matches:
url, tweet_id, _, resolution = match
if tweet_id not in results:
results[tweet_id] = {"resolution": resolution, "url": url}
else:
# if we already have a higher resolution video, then don't overwrite it
my_dims = [int(x) for x in resolution.split("x")]
their_dims = [int(x) for x in results[tweet_id]["resolution"].split("x")]
if my_dims[0] * my_dims[1] > their_dims[0] * their_dims[1]:
results[tweet_id] = {"resolution": resolution, "url": url}
if media_id:
all_urls = []
for twid in results:
all_urls.append(results[twid]["url"])
all_urls += container_matches
url_with_media_id = []
for url in all_urls:
if url.__contains__(media_id):
url_with_media_id.append(url)
if len(url_with_media_id) > 0:
return url_with_media_id
if len(container_matches) > 0 and not target_all_mp4s:
return container_matches
if target_all_mp4s:
urls = [x["url"] for x in results.values()]
urls += container_matches
return urls
return [x["url"] for x in results.values()]
def extract_mp4_fmp4(j):
"""
Extract the URL of the MP4 video from the detailed information of the tweet.
Returns a list of URLs, tweet IDs, and resolution information (dictionary type)
and a list of tweet IDs as return values.
"""
# Empty list to store tweet IDs
tweet_id_list = []
mp4_info_dict_list = []
amplitude_pattern = re.compile(
r"(https://video.twimg.com/amplify_video/(\d+)/vid/(avc1/)(\d+x\d+)/[^.]+.mp4\?tag=\d+)"
)
ext_tw_pattern = re.compile(
r"(https://video.twimg.com/ext_tw_video/(\d+)/pu/vid/(avc1/)?(\d+x\d+)/[^.]+.mp4\?tag=\d+)"
)
tweet_video_pattern = re.compile(r'https://video.twimg.com/tweet_video/[^"]+')
container_pattern = re.compile(r'https://video.twimg.com/[^"]*container=fmp4')
matches = amplitude_pattern.findall(j)
matches += ext_tw_pattern.findall(j)
container_matches = container_pattern.findall(j)
tweet_video_url_list = tweet_video_pattern.findall(j)
for match in matches:
url, tweet_id, _, resolution = match
tweet_id_list.append(int(tweet_id))
mp4_info_dict_list.append({"resolution": resolution, "url": url})
tweet_id_list = list(dict.fromkeys(tweet_id_list))
if len(container_matches) > 0:
for url in container_matches:
mp4_info_dict_list.append({"url": url})
return tweet_id_list, mp4_info_dict_list, tweet_video_url_list
def download_parts(url, output_filename):
resp = requests.get(url, stream=True)
pattern = re.compile(r"(/[^\n]*/(\d+x\d+)/[^\n]*container=fmp4)")
matches = pattern.findall(resp.text)
max_res = 0
max_res_url = None
for match in matches:
url, resolution = match
width, height = resolution.split("x")
res = int(width) * int(height)
if res > max_res:
max_res = res
max_res_url = url
assert (
max_res_url is not None
), f"Could not find a url to download from. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {url}"
video_part_prefix = "https://video.twimg.com"
resp = requests.get(video_part_prefix + max_res_url, stream=True)
mp4_pattern = re.compile(r"(/[^\n]*\.mp4)")
mp4_parts = mp4_pattern.findall(resp.text)
assert (
len(mp4_parts) == 1
), f"There should be exactly 1 mp4 container at this point. Instead, found {len(mp4_parts)}. Please open a GitHub issue and copy and paste this message into it. Tweet url: {url}"
mp4_url = video_part_prefix + mp4_parts[0]
m4s_part_pattern = re.compile(r"(/[^\n]*\.m4s)")
m4s_parts = m4s_part_pattern.findall(resp.text)
with open(output_filename, "wb") as f:
r = requests.get(mp4_url, stream=True)
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
f.flush()
for part in m4s_parts:
part_url = video_part_prefix + part
r = requests.get(part_url, stream=True)
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
f.flush()
return True
def repost_check(j, exclude_replies=True):
try:
reply_index = j.index('"conversationthread-')
except ValueError:
reply_index = len(j)
if exclude_replies:
j = j[0:reply_index]
# We use this regular expression to extract the source status
source_status_pattern = r'"source_status_id_str"\s*:\s*"\d+"'
matches = re.findall(source_status_pattern, j)
if len(matches) > 0 and exclude_replies:
# We extract the source status id (ssid)
ssid = json.loads("{" + matches[0] + "}")["source_status_id_str"]
# We plug it in this regular expression to find expanded_url (the original tweet url)
expanded_url_pattern = (
r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/' + ssid + '[^"]+"'
)
matches2 = re.findall(expanded_url_pattern, j)
if len(matches2) > 0:
# We extract the url and return it
status_url = json.loads("{" + matches2[0] + "}")["expanded_url"]
return status_url
if not exclude_replies:
# If we include replies we'll have to get all ssids and remove duplicates
ssids = []
for match in matches:
ssids.append(json.loads("{" + match + "}")["source_status_id_str"])
# we remove duplicates (this line is messy but it's the easiest way to do it)
ssids = list(set(ssids))
if len(ssids) > 0:
for ssid in ssids:
expanded_url_pattern = (
r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/'
+ ssid
+ '[^"]+"'
)
matches2 = re.findall(expanded_url_pattern, j)
if len(matches2) > 0:
status_urls = []
for match in matches2:
status_urls.append(
json.loads("{" + match + "}")["expanded_url"]
)
# We remove duplicates another time
status_urls = list(set(status_urls))
return status_urls
# If we don't find source_status_id_str, the tweet doesn't feature a reposted video
return None
def download_video_from_x(tweet_url, output_file, target_all_videos=False):
bearer_token, guest_token = get_tokens(tweet_url)
resp = get_tweet_details(tweet_url, guest_token, bearer_token)
mp4s = extract_mp4s(resp.text, tweet_url, target_all_videos)
if target_all_videos:
video_counter = 1
original_urls = repost_check(resp.text, exclude_replies=False)
if len(original_urls) > 0:
for url in original_urls:
download_video_from_x(
url, output_file.replace(".mp4", f"_{video_counter}.mp4")
)
video_counter += 1
if len(mp4s) > 0:
for mp4 in mp4s:
output_file = output_file.replace(".mp4", f"_{video_counter}.mp4")
if "container" in mp4:
download_parts(mp4, output_file)
else:
# use a stream to download the file
r = requests.get(mp4, stream=True)
with open(output_file, "wb") as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
f.flush()
video_counter += 1
else:
original_url = repost_check(resp.text)
if original_url:
download_video_from_x(original_url, output_file)
else:
assert (
len(mp4s) > 0
), f"Could not find any mp4s to download. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {tweet_url}"
mp4 = mp4s[0]
if "container" in mp4:
download_parts(mp4, output_file)
else:
# use a stream to download the file
r = requests.get(mp4, stream=True)
with open(output_file, "wb") as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
f.flush()
return target_location
return download_video_from_x(source_url, target_location)
browser = "chrome" #"firefox"
def download_xvideo(url, target_location) -> None:
"""Download a video from a URL into a filename.
Args:
url (str): The video URL to download
file_name (str): The file name or path to save the video to.
"""
response = requests.get(url, stream=True)
total_size = int(response.headers.get("content-length", 0))
block_size = 1024
progress_bar = tqdm(total=total_size, unit="B", unit_scale=True)
#download_path = os.path.join(Path.home(), "Downloads", file_name)
download_path = target_location
with open(download_path, "wb") as file:
for data in response.iter_content(block_size):
@@ -485,12 +34,6 @@ def download_xvideo(url, target_location) -> None:
def XDownload(url, filepath=""):
"""Extract the highest quality video url to download into a file
Args:
url (str): The twitter post URL to download from
"""
api_url = f"https://twitsave.com/info?url={url}"
response = requests.get(api_url)
@@ -498,20 +41,10 @@ def XDownload(url, filepath=""):
download_button = data.find_all("div", class_="origin-top-right")[0]
quality_buttons = download_button.find_all("a")
highest_quality_url = quality_buttons[0].get("href") # Highest quality video url
#file_name = data.find_all("div", class_="leading-tight")[0].find_all("p", class_="m-2")[0].text # Video file name
#file_name = re.sub(r"[^a-zA-Z0-9]+", ' ', file_name).strip() + ".mp4" # Remove special characters from file name
download_xvideo(highest_quality_url, filepath)
return filepath
# TIKTOK/INSTA
def getDict() -> dict:
response = requests.get('https://ttdownloader.com/')
point = response.text.find('<input type="hidden" id="token" name="token" value="') + \
@@ -603,7 +136,6 @@ def InstagramDownload(url, name, path) -> str:
f.write(response.content)
return path + "\\" + name + ".jpg"
def InstagramDownloadAll(linklist, path) -> str:
for i in linklist:
try:
@@ -615,34 +147,81 @@ def InstagramDownloadAll(linklist, path) -> str:
print(err)
exit(1)
def YTDownload(link, path, audio_only=True):
# YOUTUBE
def YouTubeDownload(link, path, audio_only=True):
youtubeObject = YouTube(link)
if audio_only:
youtubeObject = youtubeObject.streams.get_audio_only()
youtubeObject.download(path, "yt.mp3")
print("Download is completed successfully")
return path + "yt.mp3"
return get_audio([link])
else:
youtubeObject = youtubeObject.streams.get_highest_resolution()
youtubeObject.download(path, "yt.mp4")
print("Download is completed successfully")
return path + "yt.mp4"
return get_video([link])
def checkYoutubeLinkValid(link):
def get_media_duration(url):
try:
# TODO find a way to test without fully downloading the file
youtubeObject = YouTube(link)
youtubeObject = youtubeObject.streams.get_audio_only()
youtubeObject.download(".", "yt.mp3")
os.remove("yt.mp3")
return True
# See help(yt_dlp.YoutubeDL) for a list of available options and public functions
ydl_opts = {
'cookiesfrombrowser': (browser, None, None, None),
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
except Exception as e:
print(str(e))
return False
# ydl.sanitize_info makes the info json-serializable
return float(json.dumps(ydl.sanitize_info(info)["duration"]))
except:
return None
def get_media_info(url):
try:
# See help(yt_dlp.YoutubeDL) for a list of available options and public functions
ydl_opts = {
'cookiesfrombrowser': (browser, None, None, None),
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
# ydl.sanitize_info makes the info json-serializable
return json.dumps(ydl.sanitize_info(info))
except:
return None
def get_audio(URLS):
try:
ydl_opts = {
'cookiesfrombrowser': (browser, None, None, None),
'format': 'm4a/bestaudio/best',
"outtmpl": 'outputs/audio',
'overwrites': 'True',
# See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments
'postprocessors': [{ # Extract audio using ffmpeg
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
}]
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
error_code = ydl.download(URLS)
return "outputs/audio.mp3"
except:
return None
def get_video(URLS):
try:
ydl_opts = {
'cookiesfrombrowser': (browser, None, None, None),
'format': 'mp4',
'overwrites': 'True',
# "outtmpl": '/%(uploader)s_%(title)s.%(ext)s',
"outtmpl": 'outputs/video.mp4',
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download(URLS)
return "outputs/video.mp4"
except:
return None
# OVERCAST

View File

@@ -17,6 +17,7 @@ setup(
install_requires=["nostr-sdk==0.32.2",
"bech32==1.2.0",
"pycryptodome==3.20.0",
"yt-dlp==2024.5.27",
"python-dotenv==1.0.0",
"emoji==2.12.1",
"ffmpegio==0.9.1",
@@ -24,9 +25,8 @@ setup(
"Pillow==10.1.0",
"PyUpload==0.1.4",
"requests==2.32.3",
"instaloader==4.10.1",
"pytube==15.0.0",
"moviepy==2.0.0.dev2",
"instaloader==4.11",
"zipp==3.17.0",
"urllib3==2.2.1",
"networkx==3.3",

View File

@@ -8,7 +8,7 @@ import dotenv
from nostr_sdk import Keys
from nostr_dvm.bot import Bot
from nostr_dvm.tasks import textextraction_pdf
from nostr_dvm.tasks import textextraction_pdf, convert_media
from nostr_dvm.utils.admin_utils import AdminConfig
from nostr_dvm.utils.backend_utils import keep_alive
from nostr_dvm.utils.definitions import EventDefinitions
@@ -26,6 +26,7 @@ def playground():
bot_config.PRIVATE_KEY = check_and_set_private_key(identifier)
npub = Keys.parse(bot_config.PRIVATE_KEY).public_key().to_bech32()
invoice_key, admin_key, wallet_id, user_id, lnaddress = check_and_set_ln_bits_keys(identifier, npub)
bot_config.LN_ADDRESS = lnaddress
bot_config.LNBITS_INVOICE_KEY = invoice_key
bot_config.LNBITS_ADMIN_KEY = admin_key # The dvm might pay failed jobs back
bot_config.LNBITS_URL = os.getenv("LNBITS_HOST")
@@ -46,8 +47,17 @@ def playground():
bot_config.SUPPORTED_DVMS.append(ymhm_external)
admin_config_media = AdminConfig()
admin_config_media.UPDATE_PROFILE = True
admin_config_media.REBROADCAST_NIP65_RELAY_LIST = True
media_bringer = convert_media.build_example("Nostr AI DVM Media Converter",
"media_converter", admin_config_media)
bot_config.SUPPORTED_DVMS.append(media_bringer)
media_bringer.run()
admin_config = AdminConfig()
admin_config.REBROADCAST_NIP65_RELAY_LIST = True
admin_config.UPDATE_PROFILE = True
x = threading.Thread(target=Bot, args=([bot_config, admin_config]))
x.start()