diff --git "a/.\\outputs\\audio.mp3" "b/.\\outputs\\audio.mp3" new file mode 100644 index 0000000..d90f2f3 Binary files /dev/null and "b/.\\outputs\\audio.mp3" differ diff --git "a/.\\outputs\\file.mp4" "b/.\\outputs\\file.mp4" new file mode 100644 index 0000000..5f8f82c Binary files /dev/null and "b/.\\outputs\\file.mp4" differ diff --git a/backends/nova_server.py b/backends/nova_server.py index 60d8e1b..62398bb 100644 --- a/backends/nova_server.py +++ b/backends/nova_server.py @@ -3,6 +3,8 @@ import json import os import time import zipfile +from pathlib import Path + import pandas as pd import requests import PIL.Image as Image @@ -36,6 +38,18 @@ def send_request_to_nova_server(request_form, address): return response.text +def send_file_to_nova_server(filepath, address): + print("Sending file to NOVA-Server") + url = ('http://' + address + '/upload') + fp = open(filepath, 'rb') + response = requests.post(url, files={'file': fp}) + result = response.content.decode('utf-8') + print(result) + return result + + # headers = {'Content-type': 'application/x-www-form-urlencoded'} + + """ check_nova_server_status(request_form, address) Function that requests the status of the current process with the jobID (we use the Nostr event as jobID). @@ -44,7 +58,7 @@ We throw an exception on error """ -def check_nova_server_status(jobID, address): +def check_nova_server_status(jobID, address) -> str | pd.DataFrame: headers = {'Content-type': 'application/x-www-form-urlencoded'} url_status = 'http://' + address + '/job_status' url_log = 'http://' + address + '/log' @@ -68,7 +82,6 @@ def check_nova_server_status(jobID, address): if status == 2: try: - result = "" url_fetch = 'http://' + address + '/fetch_result' print("Fetching Results from NOVA-Server...") data = {"jobID": jobID, "delete_after_download": True} @@ -79,10 +92,12 @@ def check_nova_server_status(jobID, address): image = Image.open(io.BytesIO(response.content)) image.save("./outputs/image.jpg") result = upload_media_to_hoster("./outputs/image.jpg") + return result os.remove("./outputs/image.jpg") elif content_type == 'text/plain; charset=utf-8': result = response.content.decode('utf-8') - elif content_type == "zip": + return result + elif content_type == "application/x-zip-compressed": zf = zipfile.ZipFile(io.BytesIO(response.content), "r") for fileinfo in zf.infolist(): @@ -92,14 +107,15 @@ def check_nova_server_status(jobID, address): columns = ['from', 'to', 'name', 'conf'] result = pd.DataFrame([row.split(';') for row in anno_string.split('\n')], columns=columns) - print(result) - with open("response.zip", "wb") as f: - f.write(response.content) + #print(str(result)) + return result + #with open("response.zip", "wb") as f: + # f.write(response.content) except Exception as e: #zf.extractall() print(e) - return result + except Exception as e: print("Couldn't fetch result: " + str(e)) diff --git a/bot.py b/bot.py index be5161d..4a0108b 100644 --- a/bot.py +++ b/bot.py @@ -100,18 +100,21 @@ class Bot: elif user.balance >= required_amount or required_amount == 0: command = decrypted_text.replace(decrypted_text.split(' ')[0] + " ", "") - input = command.split("-")[0].rstrip() + input = command.split(" -")[0].rstrip() + input_type = "text" + if input.startswith("http"): + input_type = "url" - i_tag = Tag.parse(["i", input, "text"]) - bid = str(self.dvm_config.SUPPORTED_DVMS[index].COST * 1000) - bid_tag = Tag.parse(['bid', bid, bid]) + i_tag = Tag.parse(["i", input, input_type]) + #bid = str(self.dvm_config.SUPPORTED_DVMS[index].COST * 1000) + #bid_tag = Tag.parse(['bid', bid, bid]) relays_tag = Tag.parse(["relays", json.dumps(self.dvm_config.RELAY_LIST)]) alt_tag = Tag.parse(["alt", self.dvm_config.SUPPORTED_DVMS[index].TASK]) - tags = [i_tag.as_vec(), bid_tag.as_vec(), relays_tag.as_vec(), alt_tag.as_vec()] + tags = [i_tag.as_vec(), relays_tag.as_vec(), alt_tag.as_vec()] remaining_text = command.replace(input, "") - params = remaining_text.rstrip().split("-") + params = remaining_text.split(" -") for i in params: if i != " ": diff --git a/dvm.py b/dvm.py index c99a338..9500aa7 100644 --- a/dvm.py +++ b/dvm.py @@ -1,7 +1,7 @@ import json -import typing from datetime import timedelta +import pandas as pd from nostr_sdk import PublicKey, Keys, Client, Tag, Event, EventBuilder, Filter, HandleNotification, Timestamp, \ init_logger, LogLevel, Options, nip04_encrypt @@ -12,6 +12,7 @@ from utils.dvmconfig import DVMConfig from utils.admin_utils import admin_make_database_updates, AdminConfig from utils.backend_utils import get_amount_per_task, check_task_is_supported, get_task from utils.database_utils import create_sql_table, get_or_add_user, update_user_balance, update_sql_table +from utils.mediasource_utils import input_data_file_duration from utils.nostr_utils import get_event_by_id, get_referenced_event_by_id, send_event, check_and_decrypt_tags from utils.output_utils import post_process_result, build_status_reaction from utils.zap_utils import check_bolt11_ln_bits_is_paid, create_bolt11_ln_bits, parse_zap_event_tags, redeem_cashu @@ -91,9 +92,9 @@ class DVM: elif tag.as_vec()[0] == "p": p_tag_str = tag.as_vec()[1] - task_supported, task, duration = check_task_is_supported(nip90_event, client=self.client, - get_duration=(not user.iswhitelisted), - config=self.dvm_config) + task_supported, task = check_task_is_supported(nip90_event, client=self.client, + config=self.dvm_config) + if user.isblacklisted: send_job_status_reaction(nip90_event, "error", client=self.client, dvm_config=self.dvm_config) @@ -101,6 +102,8 @@ class DVM: elif task_supported: print("[" + self.dvm_config.NIP89.name + "] Received new Request: " + task + " from " + user.name) + duration = input_data_file_duration(nip90_event, dvm_config=self.dvm_config, client=self.client) + print("File Duration: " + str(duration)) amount = get_amount_per_task(task, self.dvm_config, duration) if amount is None: return @@ -169,8 +172,8 @@ class DVM: send_job_status_reaction(nip90_event, "payment-required", False, amount, client=self.client, dvm_config=self.dvm_config) - #else: - #print("[" + self.dvm_config.NIP89.name + "] Task " + task + " not supported on this DVM, skipping..") + # else: + # print("[" + self.dvm_config.NIP89.name + "] Task " + task + " not supported on this DVM, skipping..") def handle_zap(zap_event): try: @@ -180,7 +183,6 @@ class DVM: self.client, self.dvm_config) user = get_or_add_user(db=self.dvm_config.DB, npub=sender, client=self.client, config=self.dvm_config) - if zapped_event is not None: if zapped_event.kind() == EventDefinitions.KIND_FEEDBACK: @@ -201,10 +203,8 @@ class DVM: # if a reaction by us got zapped - task_supported, task, duration = check_task_is_supported(job_event, - client=self.client, - get_duration=False, - config=self.dvm_config) + task_supported, task = check_task_is_supported(job_event, client=self.client, + config=self.dvm_config) if job_event is not None and task_supported: print("Zap received for NIP90 task: " + str(invoice_amount) + " Sats from " + str( user.name)) @@ -257,8 +257,7 @@ class DVM: print("[" + self.dvm_config.NIP89.name + "] Error during content decryption: " + str(e)) def check_event_has_not_unfinished_job_input(nevent, append, client, dvmconfig): - task_supported, task, duration = check_task_is_supported(nevent, client, False, - config=dvmconfig) + task_supported, task = check_task_is_supported(nevent, client, config=dvmconfig) if not task_supported: return False @@ -312,6 +311,7 @@ class DVM: break try: + post_processed_content = post_process_result(data, original_event) send_nostr_reply_event(post_processed_content, original_event_str) except Exception as e: diff --git a/interfaces/dvmtaskinterface.py b/interfaces/dvmtaskinterface.py index 195c78c..206750c 100644 --- a/interfaces/dvmtaskinterface.py +++ b/interfaces/dvmtaskinterface.py @@ -75,5 +75,4 @@ class DVMTaskInterface: if request_form.get("options"): opts = json.loads(request_form["options"]) print(opts) - return dict(opts) diff --git a/main.py b/main.py index dcdf143..533c13a 100644 --- a/main.py +++ b/main.py @@ -9,7 +9,8 @@ import dotenv from nostr_sdk import Keys from bot import Bot -from playground import build_pdf_extractor, build_translator, build_unstable_diffusion, build_sketcher, build_dalle +from playground import build_pdf_extractor, build_translator, build_unstable_diffusion, build_sketcher, build_dalle, \ + build_whisperx from utils.dvmconfig import DVMConfig @@ -48,6 +49,13 @@ def run_nostr_dvm_with_local_config(): bot_config.SUPPORTED_DVMS.append(sketcher) # We also add Sketcher to the bot sketcher.run() + if os.getenv("NOVA_SERVER") is not None and os.getenv("NOVA_SERVER") != "": + whisperer = build_whisperx("Whisperer") + bot_config.SUPPORTED_DVMS.append(whisperer) # We also add Sketcher to the bot + whisperer.run() + + + # Spawn DVM5, this one requires an OPENAI API Key and balance with OpenAI, you will move the task to them and pay # per call. Make sure you have enough balance and the DVM's cost is set higher than what you pay yourself, except, you know, # you're being generous. diff --git a/outputs/image.jpg b/outputs/image.jpg new file mode 100644 index 0000000..5cd4242 Binary files /dev/null and b/outputs/image.jpg differ diff --git a/outputs/test.mp4 b/outputs/test.mp4 new file mode 100644 index 0000000..5f8f82c Binary files /dev/null and b/outputs/test.mp4 differ diff --git a/playground.py b/playground.py index 856386e..ac87cbe 100644 --- a/playground.py +++ b/playground.py @@ -6,6 +6,7 @@ from nostr_sdk import PublicKey, Keys from interfaces.dvmtaskinterface import DVMTaskInterface from tasks.imagegeneration_openai_dalle import ImageGenerationDALLE from tasks.imagegeneration_sdxl import ImageGenerationSDXL +from tasks.textextraction_whisperx import SpeechToTextWhisperX from tasks.textextractionpdf import TextExtractionPDF from tasks.translation import Translation from utils.admin_utils import AdminConfig @@ -125,6 +126,38 @@ def build_unstable_diffusion(name): return ImageGenerationSDXL(name=name, dvm_config=dvm_config, nip89config=nip89config, admin_config=admin_config, options=options) +def build_whisperx(name): + dvm_config = DVMConfig() + dvm_config.PRIVATE_KEY = os.getenv("NOSTR_PRIVATE_KEY4") + dvm_config.LNBITS_INVOICE_KEY = os.getenv("LNBITS_INVOICE_KEY") + dvm_config.LNBITS_URL = os.getenv("LNBITS_HOST") + + # A module might have options it can be initialized with, here we set a default model, and the nova-server + # address it should use. These parameters can be freely defined in the task component + options = {'default_model': "base", 'nova_server': os.getenv("NOVA_SERVER")} + + nip90params = { + "model": { + "required": False, + "values": ["base","tiny","small","medium","large-v1","large-v2","tiny.en","base.en","small.en","medium.en"] + }, + "alignment": { + "required": False, + "values": ["raw", "segment","word"] + } + } + nip89info = { + "name": name, + "image": "https://image.nostr.build/c33ca6fc4cc038ca4adb46fdfdfda34951656f87ee364ef59095bae1495ce669.jpg", + "about": "I am a test dvm to extract text from media files (very beta)", + "nip90Params": nip90params + } + nip89config = NIP89Config() + nip89config.DTAG = os.getenv("TASK_SPEECH_TO_TEXT_NIP89") + nip89config.CONTENT = json.dumps(nip89info) + return SpeechToTextWhisperX(name=name, dvm_config=dvm_config, nip89config=nip89config, + admin_config=admin_config, options=options) + def build_sketcher(name): dvm_config = DVMConfig() diff --git a/requirements.txt b/requirements.txt index 82a7a88..78f36bf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,35 +1,58 @@ +anyio==3.7.1 beautifulsoup4==4.12.2 bech32==1.2.0 +bitarray==2.8.3 +bitstring==4.1.3 blessed==1.20.0 +cassidy==0.1.4 certifi==2023.7.22 charset-normalizer==3.3.2 +click==8.1.7 +distro==1.8.0 emoji==2.8.0 +enumb==0.1.5 +eva-decord==0.6.1 +exceptiongroup==1.2.0 +expo==0.1.2 ffmpegio==0.8.5 ffmpegio-core==0.8.5 +h11==0.14.0 +httpcore==1.0.2 +httpx==0.25.1 idna==3.4 inquirer==3.1.3 install==1.3.5 +instaloader==4.10.1 +lnurl==0.4.1 +mediatype==0.1.6 nostr-sdk==0.0.5 numpy==1.26.2 +openai==1.3.5 packaging==23.2 pandas==2.1.3 Pillow==10.1.0 pluggy==1.3.0 pycryptodome==3.19.0 +pydantic==1.10.13 +pydantic_core==2.14.5 pypdf==3.17.1 python-dateutil==2.8.2 python-dotenv==1.0.0 python-editor==1.0.4 +pytube==15.0.0 pytz==2023.3.post1 -PyUpload~=0.1.4 +PyUpload==0.1.4 pyuseragents==1.0.5 readchar==4.0.5 requests==2.31.0 +requests-toolbelt==1.0.0 safeIO==1.2 six==1.16.0 +sniffio==1.3.0 soupsieve==2.5 +tqdm==4.66.1 translatepy==2.3 +typing_extensions==4.8.0 tzdata==2023.3 urllib3==2.1.0 wcwidth==0.2.10 - diff --git a/tasks/imagegeneration_openai_dalle.py b/tasks/imagegeneration_openai_dalle.py index a94f97a..94e8f5d 100644 --- a/tasks/imagegeneration_openai_dalle.py +++ b/tasks/imagegeneration_openai_dalle.py @@ -31,9 +31,22 @@ class ImageGenerationDALLE(DVMTaskInterface): admin_config: AdminConfig = None, options=None): super().__init__(name, dvm_config, nip89config, admin_config, options) - def is_input_supported(self, input_type, input_content): - if input_type != "text": - return False + def is_input_supported(self, tags): + for tag in tags: + if tag.as_vec()[0] == 'i': + input_value = tag.as_vec()[1] + input_type = tag.as_vec()[2] + if input_type != "text": + return False + + elif tag.as_vec()[0] == 'output': + output = tag.as_vec()[1] + if (output == "" or + not (output == "image/png" or "image/jpg" + or output == "image/png;format=url" or output == "image/jpg;format=url")): + print("Output format not supported, skipping..") + return False + return True def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None): diff --git a/tasks/imagegeneration_sdxl.py b/tasks/imagegeneration_sdxl.py index 6e0ae14..cc62689 100644 --- a/tasks/imagegeneration_sdxl.py +++ b/tasks/imagegeneration_sdxl.py @@ -30,15 +30,19 @@ class ImageGenerationSDXL(DVMTaskInterface): def is_input_supported(self, tags): for tag in tags: if tag.as_vec()[0] == 'i': - if len(tag.as_vec()) < 3: - print("Job Event missing/malformed i tag, skipping..") + input_value = tag.as_vec()[1] + input_type = tag.as_vec()[2] + if input_type != "text": + return False + + elif tag.as_vec()[0] == 'output': + output = tag.as_vec()[1] + if (output == "" or + not (output == "image/png" or "image/jpg" + or output == "image/png;format=url" or output == "image/jpg;format=url")): + print("Output format not supported, skipping..") return False - else: - input_value = tag.as_vec()[1] - input_type = tag.as_vec()[2] - if input_type != "text": - return False return True def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None): @@ -134,11 +138,6 @@ class ImageGenerationSDXL(DVMTaskInterface): } request_form['options'] = json.dumps(options) - # old format, deprecated, will remove - request_form["optStr"] = ('model=' + model + ';ratio=' + str(ratio_width) + '-' + str(ratio_height) + ';size=' + - str(width) + '-' + str(height) + ';strength=' + str(strength) + ';guidance_scale=' + - str(guidance_scale) + ';lora=' + lora + ';lora_weight=' + lora_weight) - return request_form def process(self, request_form): @@ -152,7 +151,7 @@ class ImageGenerationSDXL(DVMTaskInterface): thread = pool.apply_async(check_nova_server_status, (request_form['jobID'], self.options['nova_server'])) print("Wait for results of NOVA-Server...") result = thread.get() - return str(result) + return result except Exception as e: raise Exception(e) diff --git a/tasks/textextraction_whisperx.py b/tasks/textextraction_whisperx.py new file mode 100644 index 0000000..e3f0163 --- /dev/null +++ b/tasks/textextraction_whisperx.py @@ -0,0 +1,143 @@ +import json +import os +import time +from multiprocessing.pool import ThreadPool +from pathlib import Path + +from backends.nova_server import check_nova_server_status, send_request_to_nova_server, send_file_to_nova_server +from interfaces.dvmtaskinterface import DVMTaskInterface +from utils.admin_utils import AdminConfig +from utils.dvmconfig import DVMConfig +from utils.mediasource_utils import organize_input_data +from utils.nip89_utils import NIP89Config +from utils.definitions import EventDefinitions + +""" +This File contains a Module to transform Text input on NOVA-Server and receive results back. + +Accepted Inputs: Prompt (text) +Outputs: An url to an Image +Params: -model # models: juggernaut, dynavision, colossusProject, newreality, unstable + -lora # loras (weights on top of models) voxel, +""" + + +class SpeechToTextWhisperX(DVMTaskInterface): + KIND: int = EventDefinitions.KIND_NIP90_EXTRACT_TEXT + TASK: str = "speech-to-text" + COST: int = 1 + + def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, + admin_config: AdminConfig = None, options=None): + super().__init__(name, dvm_config, nip89config, admin_config, options) + + def is_input_supported(self, tags): + for tag in tags: + if tag.as_vec()[0] == 'i': + input_value = tag.as_vec()[1] + input_type = tag.as_vec()[2] + if input_type != "url": + return False + + elif tag.as_vec()[0] == 'output': + output = tag.as_vec()[1] + if (output == "" or not (output == "text/plain")): + print("Output format not supported, skipping..") + return False + + return True + + def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None): + request_form = {"jobID": event.id().to_hex() + "_" + self.NAME.replace(" ", ""), + "trainerFilePath": 'modules\\whisperx\\whisperx_transcript.trainer'} + + if self.options.get("default_model"): + model = self.options['default_model'] + else: + model = "base" + if self.options.get("alignment"): + alignment = self.options['alignment'] + else: + alignment = "raw" + + url = "" + input_type = "url" + start_time = 0 + end_time = 0 + + for tag in event.tags(): + if tag.as_vec()[0] == 'i': + input_type = tag.as_vec()[2] + if input_type == "url": + url = tag.as_vec()[1] + + elif tag.as_vec()[0] == 'param': + print("Param: " + tag.as_vec()[1] + ": " + tag.as_vec()[2]) + if tag.as_vec()[1] == "alignment": + alignment = tag.as_vec()[2] + elif tag.as_vec()[1] == "model": + model = tag.as_vec()[2] + elif tag.as_vec()[1] == "range": #hui + try: + t = time.strptime(tag.as_vec()[2], "%H:%M:%S") + seconds = t.tm_hour * 60 * 60 + t.tm_min * 60 + t.tm_sec + start_time = float(seconds) + except: + try: + t = time.strptime(tag.as_vec()[2], "%M:%S") + seconds = t.tm_min * 60 + t.tm_sec + start_time = float(seconds) + except: + start_time = tag.as_vec()[2] + try: + t = time.strptime(tag.as_vec()[3], "%H:%M:%S") + seconds = t.tm_hour * 60 * 60 + t.tm_min * 60 + t.tm_sec + end_time = float(seconds) + except: + try: + t = time.strptime(tag.as_vec()[3], "%M:%S") + seconds = t.tm_min * 60 + t.tm_sec + end_time = float(seconds) + except: + end_time = float(tag.as_vec()[3]) + + filepath = organize_input_data(url, input_type, start_time, end_time, dvm_config, client) + pathonserver = send_file_to_nova_server(filepath, self.options['nova_server']) + + io_input = { + "id": "audio", + "type": "input", + "src": "file:stream", + "uri": pathonserver + } + + io_output = { + "id": "transcript", + "type": "output", + "src": "request:annotation:free" + } + + request_form['data'] = json.dumps([io_input, io_output]) + + options = { + "model": model, + "alignment_mode": alignment, + } + request_form['options'] = json.dumps(options) + return request_form + + def process(self, request_form): + try: + # Call the process route of NOVA-Server with our request form. + response = send_request_to_nova_server(request_form, self.options['nova_server']) + if bool(json.loads(response)['success']): + print("Job " + request_form['jobID'] + " sent to NOVA-server") + + pool = ThreadPool(processes=1) + thread = pool.apply_async(check_nova_server_status, (request_form['jobID'], self.options['nova_server'])) + print("Wait for results of NOVA-Server...") + result = thread.get() + return result + + except Exception as e: + raise Exception(e) diff --git a/tasks/textextractionpdf.py b/tasks/textextractionpdf.py index f1ca68c..6f7bc2d 100644 --- a/tasks/textextractionpdf.py +++ b/tasks/textextractionpdf.py @@ -29,10 +29,13 @@ class TextExtractionPDF(DVMTaskInterface): admin_config: AdminConfig = None, options=None): super().__init__(name, dvm_config, nip89config, admin_config, options) - - def is_input_supported(self, input_type, input_content): - if input_type != "url" and input_type != "event": - return False + def is_input_supported(self, tags): + for tag in tags: + if tag.as_vec()[0] == 'i': + input_value = tag.as_vec()[1] + input_type = tag.as_vec()[2] + if input_type != "url" and input_type != "event": + return False return True def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None): diff --git a/tasks/translation.py b/tasks/translation.py index d900e39..d2f7d1d 100644 --- a/tasks/translation.py +++ b/tasks/translation.py @@ -27,11 +27,15 @@ class Translation(DVMTaskInterface): admin_config: AdminConfig = None, options=None): super().__init__(name, dvm_config, nip89config, admin_config, options) - def is_input_supported(self, input_type, input_content): - if input_type != "event" and input_type != "job" and input_type != "text": - return False - if input_type != "text" and len(input_content) > 4999: - return False + def is_input_supported(self, tags): + for tag in tags: + if tag.as_vec()[0] == 'i': + input_value = tag.as_vec()[1] + input_type = tag.as_vec()[2] + if input_type != "event" and input_type != "job" and input_type != "text": + return False + if input_type != "text" and len(input_value) > 4999: + return False return True def create_request_form_from_nostr_event(self, event, client=None, dvm_config=None): diff --git a/utils/backend_utils.py b/utils/backend_utils.py index aa0642d..50af4ff 100644 --- a/utils/backend_utils.py +++ b/utils/backend_utils.py @@ -4,114 +4,109 @@ import requests from nostr_sdk import Event, Tag from utils.definitions import EventDefinitions +from utils.mediasource_utils import check_source_type, media_source from utils.nostr_utils import get_event_by_id def get_task(event, client, dvm_config): - if event.kind() == EventDefinitions.KIND_NIP90_GENERIC: # use this for events that have no id yet, inclufr j tag - for tag in event.tags(): - if tag.as_vec()[0] == 'j': - return tag.as_vec()[1] - else: - return "unknown job: " + event.as_json() - elif event.kind() == EventDefinitions.KIND_DM: # dm - for tag in event.tags(): - if tag.as_vec()[0] == 'j': - return tag.as_vec()[1] - else: - return "unknown job: " + event.as_json() + try: + if event.kind() == EventDefinitions.KIND_NIP90_GENERIC: # use this for events that have no id yet, inclufr j tag + for tag in event.tags(): + if tag.as_vec()[0] == 'j': + return tag.as_vec()[1] + else: + return "unknown job: " + event.as_json() + elif event.kind() == EventDefinitions.KIND_DM: # dm + for tag in event.tags(): + if tag.as_vec()[0] == 'j': + return tag.as_vec()[1] + else: + return "unknown job: " + event.as_json() - # This looks a bit more complicated, but we do several tasks for text-extraction in the future - elif event.kind() == EventDefinitions.KIND_NIP90_EXTRACT_TEXT: - for tag in event.tags: - if tag.as_vec()[0] == "i": - if tag.as_vec()[2] == "url": - file_type = check_url_is_readable(tag.as_vec()[1]) - if file_type == "pdf": - return "pdf-to-text" + # This looks a bit more complicated, but we do several tasks for text-extraction in the future + elif event.kind() == EventDefinitions.KIND_NIP90_EXTRACT_TEXT: + for tag in event.tags(): + if tag.as_vec()[0] == "i": + if tag.as_vec()[2] == "url": + file_type = check_url_is_readable(tag.as_vec()[1]) + print(file_type) + if file_type == "pdf": + return "pdf-to-text" + elif file_type == "audio" or file_type == "video": + return "speech-to-text" + else: + return "unknown job" + elif tag.as_vec()[2] == "event": + evt = get_event_by_id(tag.as_vec()[1], client=client, config=dvm_config) + if evt is not None: + if evt.kind() == 1063: + for tg in evt.tags(): + if tg.as_vec()[0] == 'url': + file_type = check_url_is_readable(tg.as_vec()[1]) + if file_type == "pdf": + return "pdf-to-text" + elif file_type == "audio" or file_type == "video": + return "speech-to-text" + else: + return "unknown job" + else: + return "unknown type" else: return "unknown job" - elif tag.as_vec()[2] == "event": - evt = get_event_by_id(tag.as_vec()[1], client=client, config=dvm_config) - if evt is not None: - if evt.kind() == 1063: - for tg in evt.tags(): - if tg.as_vec()[0] == 'url': - file_type = check_url_is_readable(tg.as_vec()[1]) - if file_type == "pdf": - return "pdf-to-text" - else: - return "unknown job" - else: - return "unknown type" - # TODO if a task can consist of multiple inputs add them here - # else if kind is supported, simply return task - else: - for dvm in dvm_config.SUPPORTED_DVMS: - if dvm.KIND == event.kind(): - return dvm.TASK - return "unknown type" + + # TODO if a task can consist of multiple inputs add them here + # else if kind is supported, simply return task + else: + + for dvm in dvm_config.SUPPORTED_DVMS: + if dvm.KIND == event.kind(): + return dvm.TASK + except Exception as e: + print("Get task: " + str(e)) + + return "unknown type" -def is_input_supported__generic(tags, client, dvm_config) -> bool: - for tag in tags: +def is_input_supported_generic(tags, client, dvm_config) -> bool: + try: + for tag in tags: if tag.as_vec()[0] == 'i': if len(tag.as_vec()) < 3: print("Job Event missing/malformed i tag, skipping..") return False - else: - input_value = tag.as_vec()[1] - input_type = tag.as_vec()[2] + else: + input_value = tag.as_vec()[1] + input_type = tag.as_vec()[2] + if input_type == "event": + evt = get_event_by_id(input_value, client=client, config=dvm_config) + if evt is None: + print("Event not found") + return False + # TODO check_url_is_readable might be more relevant per task in the future + # if input_type == 'url' and check_url_is_readable(input_value) is None: + # print("Url not readable / supported") + # return False - if input_type == "event": - evt = get_event_by_id(input_value, client=client, config=dvm_config) - if evt is None: - print("Event not found") - - return True + return True + except Exception as e: + print("Generic input check: " + str(e)) - -def check_task_is_supported(event: Event, client, get_duration=False, config=None): +def check_task_is_supported(event: Event, client, config=None): try: dvm_config = config - input_value = "" - input_type = "" - duration = 1 task = get_task(event, client=client, dvm_config=dvm_config) - - if not is_input_supported__generic(event.tags(), client, dvm_config): - return False, "", 0 - - - for tag in event.tags(): - if tag.as_vec()[0] == 'i': - input_value = tag.as_vec()[1] - input_type = tag.as_vec()[2] - if input_type == 'url' and check_url_is_readable(input_value) is None: - print("Url not readable / supported") - return False, task, duration # - - elif tag.as_vec()[0] == 'output': - # TODO move this to individual modules - output = tag.as_vec()[1] - if not (output == "text/plain" - or output == "text/json" or output == "json" - or output == "image/png" or "image/jpg" - or output == "image/png;format=url" or output == "image/jpg;format=url" - or output == ""): - print("Output format not supported, skipping..") - return False, "", 0 - if task not in (x.TASK for x in dvm_config.SUPPORTED_DVMS): - return False, task, duration + return False, task + if not is_input_supported_generic(event.tags(), client, dvm_config): + return False, "" for dvm in dvm_config.SUPPORTED_DVMS: if dvm.TASK == task: if not dvm.is_input_supported(event.tags()): - return False, task, duration + return False, task - return True, task, duration + return True, task except Exception as e: @@ -121,30 +116,40 @@ def check_task_is_supported(event: Event, client, get_duration=False, config=Non def check_url_is_readable(url): if not str(url).startswith("http"): return None - # If link is comaptible with one of these file formats, move on. - req = requests.get(url) - content_type = req.headers['content-type'] - if content_type == 'audio/x-wav' or str(url).endswith(".wav") or content_type == 'audio/mpeg' or str(url).endswith( - ".mp3") or content_type == 'audio/ogg' or str(url).endswith(".ogg"): - return "audio" - elif (content_type == 'image/png' or str(url).endswith(".png") or content_type == 'image/jpg' or str(url).endswith( - ".jpg") or content_type == 'image/jpeg' or str(url).endswith(".jpeg") or content_type == 'image/png' or - str(url).endswith(".png")): - return "image" - elif content_type == 'video/mp4' or str(url).endswith(".mp4") or content_type == 'video/avi' or str(url).endswith( - ".avi") or content_type == 'video/mov' or str(url).endswith(".mov"): - return "video" - elif (str(url)).endswith(".pdf"): - return "pdf" + + source = check_source_type(url) + type = media_source(source) + + if type == "url": + # If link is comaptible with one of these file formats, move on. + req = requests.get(url) + content_type = req.headers['content-type'] + if content_type == 'audio/x-wav' or str(url).endswith(".wav") or content_type == 'audio/mpeg' or str(url).endswith( + ".mp3") or content_type == 'audio/ogg' or str(url).endswith(".ogg"): + return "audio" + elif (content_type == 'image/png' or str(url).endswith(".png") or content_type == 'image/jpg' or str(url).endswith( + ".jpg") or content_type == 'image/jpeg' or str(url).endswith(".jpeg") or content_type == 'image/png' or + str(url).endswith(".png")): + return "image" + elif content_type == 'video/mp4' or str(url).endswith(".mp4") or content_type == 'video/avi' or str(url).endswith( + ".avi") or content_type == 'video/mov' or str(url).endswith(".mov"): + return "video" + elif (str(url)).endswith(".pdf"): + return "pdf" + else: + return type # Otherwise we will not offer to do the job. return None def get_amount_per_task(task, dvm_config, duration=1): + # duration is either static 1 (for images etc) or in seconds + if duration == 0: + duration = 1 for dvm in dvm_config.SUPPORTED_DVMS: # this is currently just one if dvm.TASK == task: - amount = dvm.COST * duration + amount = dvm.COST * int(duration) return amount else: print("[" + dvm_config.SUPPORTED_DVMS[ diff --git a/utils/mediasource_utils.py b/utils/mediasource_utils.py new file mode 100644 index 0000000..9c6112c --- /dev/null +++ b/utils/mediasource_utils.py @@ -0,0 +1,330 @@ +import os +import urllib +from datetime import time +from urllib.parse import urlparse +import ffmpegio +from decord import AudioReader, cpu +import requests +from utils.nostr_utils import get_event_by_id + + +def input_data_file_duration(event, dvm_config, client, start=0, end=0): + input_value = "" + input_type = "url" + for tag in event.tags(): + if tag.as_vec()[0] == 'i': + input_value = tag.as_vec()[1] + input_type = tag.as_vec()[2] + + if input_type == "event": # NIP94 event + evt = get_event_by_id(input_value, client=client, config=dvm_config) + if evt is not None: + input_value, input_type = check_nip94_event_for_media(evt, input_value, input_type) + + + if input_type == "url": + source_type = check_source_type(input_value) + + filename, start, end, type = get_file_start_end_type(input_value, source_type, start, end) + if type != "audio" and type != "video": + return 1 + if filename == "" or filename is None: + return 0 + try: + file_reader = AudioReader(filename, ctx=cpu(0), mono=False) + duration = float(file_reader.duration()) + except Exception as e: + print(e) + return 0 + print("Original Duration of the Media file: " + str(duration)) + start_time, end_time, new_duration = ( + convert_media_length(start, end, duration)) + print("New Duration of the Media file: " + str(new_duration)) + return new_duration + + return 1 + + + + +def organize_input_data(input_value, input_type, start, end, dvm_config, client, process=True) -> str: + if input_type == "event": # NIP94 event + evt = get_event_by_id(input_value, client=client, config=dvm_config) + if evt is not None: + input_value, input_type = check_nip94_event_for_media(evt, input_value, input_type) + + if input_type == "url": + source_type = check_source_type(input_value) + filename, start, end, type = get_file_start_end_type(input_value, source_type, start, end) + if filename == "" or filename is None: + return "" + try: + file_reader = AudioReader(filename, ctx=cpu(0), mono=False) + duration = float(file_reader.duration()) + except Exception as e: + print(e) + return "" + + print("Original Duration of the Media file: " + str(duration)) + start_time, end_time, new_duration = ( + convert_media_length(start, end, duration)) + print("New Duration of the Media file: " + str(new_duration)) + + + # TODO if already in a working format and time is 0 0, dont convert + print("Converting from " + str(start_time) + " until " + str(end_time)) + # for now, we cut and convert all files to mp3 + final_filename = '.\\outputs\\audio.mp3' + print(final_filename) + fs, x = ffmpegio.audio.read(filename, ss=start_time, to=end_time, sample_fmt='dbl', ac=1) + ffmpegio.audio.write(final_filename, fs, x, overwrite=True) + return final_filename + +def check_nip94_event_for_media(evt, input_value, input_type): + # Parse NIP94 event for url, if found, use it. + if evt.kind() == 1063: + for tag in evt.tags(): + if tag.as_vec()[0] == 'url': + input_type = "url" + input_value = tag.as_vec()[1] + return input_value, input_type + + return input_value, input_type + +def convert_media_length(start: float, end: float, duration: float): + if end == 0.0: + end_time = duration + elif end > duration: + end_time = duration + else: + end_time = end + if start <= 0.0 or start > end_time: + start_time = 0.0 + else: + start_time = start + dur = end_time - start_time + return start_time, end_time, dur + + +def get_file_start_end_type(url, source_type, start, end) -> (str, str): + # Overcast + if source_type == "overcast": + name, start, end = get_overcast(url, start, end) + return name, start, end, "audio" + # Youtube + elif source_type == "youtube": + audio_only = True + + name, start, end = get_youtube(url, start, end, audio_only) + + return name, start, end, "audio" + # Xitter + elif source_type == "xitter": + name, start, end = get_Twitter(url, start, end) + return name, start, end, "video" + # Tiktok + elif source_type == "tiktok": + name, start, end = get_TikTok(url, start, end) + return name, start, end, "video" + # Instagram + elif source_type == "instagram": + name, start, end = get_Instagram(url, start, end) + if name.endswith("jpg"): + type = "image" + else: + type = "video" + return name, start, end, type + # A file link + else: + filename, filetype = get_media_link(url) + return filename, start, end, filetype + + +def media_source(source_type): + if source_type == "overcast": + return "audio" + elif source_type == "youtube": + return "audio" + elif source_type == "xitter": + return "video" + elif source_type == "tiktok": + return "video" + elif source_type == "instagram": + return "video" + else: + return "url" + + +def check_source_type(url): + if str(url).startswith("https://overcast.fm/"): + return "overcast" + elif str(url).replace("http://", "").replace("https://", "").replace( + "www.", "").replace("youtu.be/", "youtube.com?v=")[0:11] == "youtube.com": + return "youtube" + elif str(url).startswith("https://x.com") or str(url).startswith("https://twitter.com"): + return "xitter" + elif str(url).startswith("https://vm.tiktok.com") or str(url).startswith( + "https://www.tiktok.com") or str(url).startswith("https://m.tiktok.com"): + return "tiktok" + elif str(url).startswith("https://www.instagram.com") or str(url).startswith( + "https://instagram.com"): + return "instagram" + else: + return "url" + + +def get_overcast(input_value, start, end): + filename = '.\\outputs\\' + ".originalaudio.mp3" + print("Found overcast.fm Link.. downloading") + start_time = start + end_time = end + downloadOvercast(input_value, filename) + finaltag = str(input_value).replace("https://overcast.fm/", "").split('/') + if start == 0.0: + if len(finaltag) > 1: + t = time.strptime(finaltag[1], "%H:%M:%S") + seconds = t.tm_hour * 60 * 60 + t.tm_min * 60 + t.tm_sec + start_time = float(seconds) + print("Setting start time automatically to " + str(start_time)) + if end > 0.0: + end_time = float(seconds + end) + print("Moving end time automatically to " + str(end_time)) + + return filename, start_time, end_time + + +def get_TikTok(input_value, start, end): + filepath = '.\\outputs\\' + try: + filename = downloadTikTok(input_value, filepath) + print(filename) + except Exception as e: + print(e) + return "", start, end + return filename, start, end + + +def get_Instagram(input_value, start, end): + filepath = '.\\outputs\\' + try: + filename = downloadInstagram(input_value, filepath) + print(filename) + except Exception as e: + print(e) + return "", start, end + return filename, start, end + + +def get_Twitter(input_value, start, end): + filepath = '.\\outputs\\' + cleanlink = str(input_value).replace("twitter.com", "x.com") + try: + filename = downloadTwitter(cleanlink, filepath) + print(filename) + except Exception as e: + print(e) + return "", start, end + return filename, start, end + + +def get_youtube(input_value, start, end, audioonly=True): + filepath = '.\\outputs\\' + filename = "" + try: + filename = downloadYouTube(input_value, filepath, audioonly) + + except Exception as e: + print("Youtube" + str(e)) + return filename, start, end + try: + o = urlparse(input_value) + q = urllib.parse.parse_qs(o.query) + if start == 0.0: + if o.query.find('?t=') != -1: + start = q['t'][0] # overwrite from link.. why not.. + print("Setting start time automatically to " + start) + if end > 0.0: + end = float(q['t'][0]) + end + print("Moving end time automatically to " + str(end)) + + except Exception as e: + print(e) + return filename, start, end + + return filename, start, end + + +def get_media_link(url) -> (str, str): + req = requests.get(url) + content_type = req.headers['content-type'] + print(content_type) + if content_type == 'audio/x-wav' or str(url).lower().endswith(".wav"): + ext = "wav" + file_type = "audio" + with open('.\\outputs\\file.' + ext, 'wb') as fd: + fd.write(req.content) + return '.\\outputs\\file.' + ext, file_type + elif content_type == 'audio/mpeg' or str(url).lower().endswith(".mp3"): + ext = "mp3" + file_type = "audio" + with open('.\\outputs\\file.' + '\\file.' + ext, 'wb') as fd: + fd.write(req.content) + return '.\\outputs\\file.' + ext, file_type + elif content_type == 'audio/ogg' or str(url).lower().endswith(".ogg"): + ext = "ogg" + file_type = "audio" + with open('.\\outputs\\file.' + ext, 'wb') as fd: + fd.write(req.content) + return '.\\outputs\\file.' + ext, file_type + elif content_type == 'video/mp4' or str(url).lower().endswith(".mp4"): + ext = "mp4" + file_type = "video" + with open('.\\outputs\\file.' + ext, 'wb') as fd: + fd.write(req.content) + return '.\\outputs\\file.' + ext, file_type + elif content_type == 'video/avi' or str(url).lower().endswith(".avi"): + ext = "avi" + file_type = "video" + with open('.\\outputs\\file.' + ext, 'wb') as fd: + fd.write(req.content) + return '.\\outputs\\file.' + ext, file_type + elif content_type == 'video/quicktime' or str(url).lower().endswith(".mov"): + ext = "mov" + file_type = "video" + with open('.\\outputs\\file.' + ext, 'wb') as fd: + fd.write(req.content) + return '.\\outputs\\file.' + ext, file_type + + else: + print(str(url).lower()) + return None, None + + +def downloadOvercast(source_url, target_location): + from utils.scrapper.media_scrapper import OvercastDownload + result = OvercastDownload(source_url, target_location) + return result + + +def downloadTwitter(videourl, path): + from utils.scrapper.media_scrapper import XitterDownload + result = XitterDownload(videourl, path + "x.mp4") + return result + + +def downloadTikTok(videourl, path): + from utils.scrapper.media_scrapper import TiktokDownloadAll + result = TiktokDownloadAll([videourl], path) + return result + + +def downloadInstagram(videourl, path): + from utils.scrapper.media_scrapper import InstagramDownload + result = InstagramDownload(videourl, "insta", path) + return result + + +def downloadYouTube(link, path, audioonly=True): + from utils.scrapper.media_scrapper import YouTubeDownload + result = YouTubeDownload(link, path, audio_only=True) + return result diff --git a/utils/output_utils.py b/utils/output_utils.py index 676aaf9..daf50e8 100644 --- a/utils/output_utils.py +++ b/utils/output_utils.py @@ -17,82 +17,87 @@ Post process results to either given output format or a Nostr readable plain tex def post_process_result(anno, original_event): print("Post-processing...") if isinstance(anno, pandas.DataFrame): # if input is an anno we parse it to required output format - for tag in original_event.tags: + print("Pandas Dataframe...") + has_output_tag = False + output_format = "text/plain" + + for tag in original_event.tags(): if tag.as_vec()[0] == "output": output_format = tag.as_vec()[1] - print("requested output is " + str(tag.as_vec()[1]) + "...") - try: - if output_format == "text/plain": - result = "" - for each_row in anno['name']: - if each_row is not None: - for i in str(each_row).split('\n'): - result = result + i + "\n" - result = replace_broken_words( - str(result).replace("\"", "").replace('[', "").replace(']', - "").lstrip(None)) - return result + has_output_tag = True + print("requested output is " + str(output_format) + "...") - elif output_format == "text/vtt": - print(str(anno)) - result = "WEBVTT\n\n" - for element in anno: - name = element["name"] # name - start = float(element["from"]) - convertstart = str(datetime.timedelta(seconds=start)) - end = float(element["to"]) - convertend = str(datetime.timedelta(seconds=end)) - print(str(convertstart) + " --> " + str(convertend)) - cleared_name = str(name).lstrip("\'").rstrip("\'") - result = result + str(convertstart) + " --> " + str( - convertend) + "\n" + cleared_name + "\n\n" - result = replace_broken_words( - str(result).replace("\"", "").replace('[', "").replace(']', - "").lstrip(None)) - return result - - elif output_format == "text/json" or output_format == "json": - # result = json.dumps(json.loads(anno.data.to_json(orient="records"))) - result = replace_broken_words(json.dumps(anno.data.tolist())) - return result - # TODO add more - else: - result = "" - for element in anno.data: - element["name"] = str(element["name"]).lstrip() - element["from"] = (format(float(element["from"]), '.2f')).lstrip() # name - element["to"] = (format(float(element["to"]), '.2f')).lstrip() # name - result = result + "(" + str(element["from"]) + "," + str(element["to"]) + ")" + " " + str( - element["name"]) + "\n" - - print(result) - result = replace_broken_words(result) - return result - - except Exception as e: - print(e) - result = replace_broken_words(str(anno.data)) + if has_output_tag: + print("Output Tag found: " + output_format) + try: + if output_format == "text/plain": + result = pandas_to_plaintext(anno) + result = replace_broken_words( + str(result).replace("\"", "").replace('[', "").replace(']', + "").lstrip(None)) return result - else: - result = "" - for element in anno.data: - element["name"] = str(element["name"]).lstrip() - element["from"] = (format(float(element["from"]), '.2f')).lstrip() # name - element["to"] = (format(float(element["to"]), '.2f')).lstrip() # name - result = result + "(" + str(element["from"]) + "," + str(element["to"]) + ")" + " " + str( - element["name"]) + "\n" + elif output_format == "text/vtt": + print(str(anno)) + result = "WEBVTT\n\n" + for element in anno: + name = element["name"] # name + start = float(element["from"]) + convertstart = str(datetime.timedelta(seconds=start)) + end = float(element["to"]) + convertend = str(datetime.timedelta(seconds=end)) + print(str(convertstart) + " --> " + str(convertend)) + cleared_name = str(name).lstrip("\'").rstrip("\'") + result = result + str(convertstart) + " --> " + str( + convertend) + "\n" + cleared_name + "\n\n" + result = replace_broken_words( + str(result).replace("\"", "").replace('[', "").replace(']', + "").lstrip(None)) + return result + elif output_format == "text/json" or output_format == "json": + # result = json.dumps(json.loads(anno.data.to_json(orient="records"))) + result = replace_broken_words(json.dumps(anno.data.tolist())) + return result + # TODO add more + else: + print("Pandas Dataframe but output tag not supported.. falling back to default..") + result = pandas_to_plaintext(anno) + print(result) + result = str(result).replace("\"", "").replace('[', "").replace(']', + "").lstrip(None) + return result + + except Exception as e: + print(e) + result = replace_broken_words(str(anno.data)) + return result + + else: + print("Pandas Dataframe but no output tag set.. falling back to default..") + result = pandas_to_plaintext(anno) print(result) - result = replace_broken_words(result) + result = str(result).replace("\"", "").replace('[', "").replace(']', + "").lstrip(None) return result elif isinstance(anno, NoneType): return "An error occurred" else: + print("Nonetype") result = replace_broken_words(anno) # TODO return result +def pandas_to_plaintext(anno): + result = "" + for each_row in anno['name']: + if each_row is not None: + for i in str(each_row).split('\n'): + result = result + i + "\n" + return result + + + ''' Convenience function to replace words like Noster with Nostr ''' diff --git a/utils/scrapper/media_scrapper.py b/utils/scrapper/media_scrapper.py new file mode 100644 index 0000000..71b89a9 --- /dev/null +++ b/utils/scrapper/media_scrapper.py @@ -0,0 +1,599 @@ +import json +import os +import re +import sys +import urllib.parse +from typing import Any +from urllib.request import urlopen, Request + +import requests +import instaloader +from pytube import YouTube + + +def XitterDownload(source_url, target_location): + script_dir = os.path.dirname(os.path.realpath(__file__)) + request_details_file = f"{script_dir}{os.sep}request_details.json" + request_details = json.load(open(request_details_file, "r")) # test + features, variables = request_details["features"], request_details["variables"] + + def get_tokens(tweet_url): + html = requests.get(tweet_url) + + assert ( + html.status_code == 200 + ), f"Failed to get tweet page. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {html.status_code}. Tweet url: {tweet_url}" + + mainjs_url = re.findall( + r"https://abs.twimg.com/responsive-web/client-web-legacy/main.[^\.]+.js", + html.text, + ) + + assert ( + mainjs_url is not None and len(mainjs_url) > 0 + ), f"Failed to find main.js file. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}" + + mainjs_url = mainjs_url[0] + + mainjs = requests.get(mainjs_url) + + assert ( + mainjs.status_code == 200 + ), f"Failed to get main.js file. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {mainjs.status_code}. Tweet url: {tweet_url}" + + bearer_token = re.findall(r'AAAAAAAAA[^"]+', mainjs.text) + + assert ( + bearer_token is not None and len(bearer_token) > 0 + ), f"Failed to find bearer token. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}, main.js url: {mainjs_url}" + + bearer_token = bearer_token[0] + + # get the guest token + with requests.Session() as s: + s.headers.update( + { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0", + "accept": "*/*", + "accept-language": "de,en-US;q=0.7,en;q=0.3", + "accept-encoding": "gzip, deflate, br", + "te": "trailers", + } + ) + + s.headers.update({"authorization": f"Bearer {bearer_token}"}) + + # activate bearer token and get guest token + guest_token = s.post("https://api.twitter.com/1.1/guest/activate.json").json()[ + "guest_token" + ] + + assert ( + guest_token is not None + ), f"Failed to find guest token. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Tweet url: {tweet_url}, main.js url: {mainjs_url}" + + return bearer_token, guest_token + + def get_details_url(tweet_id, features, variables): + # create a copy of variables - we don't want to modify the original + variables = {**variables} + variables["tweetId"] = tweet_id + + return f"https://twitter.com/i/api/graphql/0hWvDhmW8YQ-S_ib3azIrw/TweetResultByRestId?variables={urllib.parse.quote(json.dumps(variables))}&features={urllib.parse.quote(json.dumps(features))}" + + def get_tweet_details(tweet_url, guest_token, bearer_token): + tweet_id = re.findall(r"(?<=status/)\d+", tweet_url) + assert ( + tweet_id is not None and len(tweet_id) == 1 + ), f"Could not parse tweet id from your url. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {tweet_url}" + + tweet_id = tweet_id[0] + + # the url needs a url encoded version of variables and features as a query string + url = get_details_url(tweet_id, features, variables) + + details = requests.get( + url, + headers={ + "authorization": f"Bearer {bearer_token}", + "x-guest-token": guest_token, + }, + ) + + max_retries = 10 + cur_retry = 0 + while details.status_code == 400 and cur_retry < max_retries: + try: + error_json = json.loads(details.text) + except json.JSONDecodeError: + assert ( + False + ), f"Failed to parse json from details error. details text: {details.text} If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}" + + assert ( + "errors" in error_json + ), f"Failed to find errors in details error json. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}" + + needed_variable_pattern = re.compile(r"Variable '([^']+)'") + needed_features_pattern = re.compile( + r'The following features cannot be null: ([^"]+)' + ) + + for error in error_json["errors"]: + needed_vars = needed_variable_pattern.findall(error["message"]) + for needed_var in needed_vars: + variables[needed_var] = True + + needed_features = needed_features_pattern.findall(error["message"]) + for nf in needed_features: + for feature in nf.split(","): + features[feature.strip()] = True + + url = get_details_url(tweet_id, features, variables) + + details = requests.get( + url, + headers={ + "authorization": f"Bearer {bearer_token}", + "x-guest-token": guest_token, + }, + ) + + cur_retry += 1 + + if details.status_code == 200: + # save new variables + request_details["variables"] = variables + request_details["features"] = features + + with open(request_details_file, "w") as f: + json.dump(request_details, f, indent=4) + + assert ( + details.status_code == 200 + ), f"Failed to get tweet details. If you are using the correct Twitter URL this suggests a bug in the script. Please open a GitHub issue and copy and paste this message. Status code: {details.status_code}. Tweet url: {tweet_url}" + + return details + + def get_tweet_status_id(tweet_url): + sid_patern = r"https://x\.com/[^/]+/status/(\d+)" + if tweet_url[len(tweet_url) - 1] != "/": + tweet_url = tweet_url + "/" + + match = re.findall(sid_patern, tweet_url) + if len(match) == 0: + print("error, could not get status id from this tweet url :", tweet_url) + exit() + status_id = match[0] + return status_id + + def get_associated_media_id(j, tweet_url): + sid = get_tweet_status_id(tweet_url) + pattern = ( + r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/' + + sid + + '/[^"]+",\s*"id_str"\s*:\s*"\d+",' + ) + matches = re.findall(pattern, j) + if len(matches) > 0: + target = matches[0] + target = target[0: len(target) - 1] # remove the coma at the end + return json.loads("{" + target + "}")["id_str"] + return None + + def extract_mp4s(j, tweet_url, target_all_mp4s=False): + # pattern looks like https://video.twimg.com/amplify_video/1638969830442237953/vid/1080x1920/lXSFa54mAVp7KHim.mp4?tag=16 or https://video.twimg.com/ext_tw_video/1451958820348080133/pu/vid/720x1280/GddnMJ7KszCQQFvA.mp4?tag=12 + amplitude_pattern = re.compile( + r"(https://video.twimg.com/amplify_video/(\d+)/vid/(\d+x\d+)/[^.]+.mp4\?tag=\d+)" + ) + ext_tw_pattern = re.compile( + r"(https://video.twimg.com/ext_tw_video/(\d+)/pu/vid/(avc1/)?(\d+x\d+)/[^.]+.mp4\?tag=\d+)" + ) + # format - https://video.twimg.com/tweet_video/Fvh6brqWAAQhU9p.mp4 + tweet_video_pattern = re.compile(r'https://video.twimg.com/tweet_video/[^"]+') + + # https://video.twimg.com/ext_tw_video/1451958820348080133/pu/pl/b-CiC-gZClIwXgDz.m3u8?tag=12&container=fmp4 + container_pattern = re.compile(r'https://video.twimg.com/[^"]*container=fmp4') + media_id = get_associated_media_id(j, tweet_url) + # find all the matches + matches = amplitude_pattern.findall(j) + matches += ext_tw_pattern.findall(j) + container_matches = container_pattern.findall(j) + + tweet_video_matches = tweet_video_pattern.findall(j) + + if len(matches) == 0 and len(tweet_video_matches) > 0: + return tweet_video_matches + + results = {} + + for match in matches: + url, tweet_id, _, resolution = match + if tweet_id not in results: + results[tweet_id] = {"resolution": resolution, "url": url} + else: + # if we already have a higher resolution video, then don't overwrite it + my_dims = [int(x) for x in resolution.split("x")] + their_dims = [int(x) for x in results[tweet_id]["resolution"].split("x")] + + if my_dims[0] * my_dims[1] > their_dims[0] * their_dims[1]: + results[tweet_id] = {"resolution": resolution, "url": url} + + if media_id: + all_urls = [] + for twid in results: + all_urls.append(results[twid]["url"]) + all_urls += container_matches + + url_with_media_id = [] + for url in all_urls: + if url.__contains__(media_id): + url_with_media_id.append(url) + + if len(url_with_media_id) > 0: + return url_with_media_id + + if len(container_matches) > 0 and not target_all_mp4s: + return container_matches + + if target_all_mp4s: + urls = [x["url"] for x in results.values()] + urls += container_matches + return urls + + return [x["url"] for x in results.values()] + + def download_parts(url, output_filename): + resp = requests.get(url, stream=True) + + # container begins with / ends with fmp4 and has a resolution in it we want to capture + pattern = re.compile(r"(/[^\n]*/(\d+x\d+)/[^\n]*container=fmp4)") + + matches = pattern.findall(resp.text) + + max_res = 0 + max_res_url = None + + for match in matches: + url, resolution = match + width, height = resolution.split("x") + res = int(width) * int(height) + if res > max_res: + max_res = res + max_res_url = url + + assert ( + max_res_url is not None + ), f"Could not find a url to download from. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {url}" + + video_part_prefix = "https://video.twimg.com" + + resp = requests.get(video_part_prefix + max_res_url, stream=True) + + mp4_pattern = re.compile(r"(/[^\n]*\.mp4)") + mp4_parts = mp4_pattern.findall(resp.text) + + assert ( + len(mp4_parts) == 1 + ), f"There should be exactly 1 mp4 container at this point. Instead, found {len(mp4_parts)}. Please open a GitHub issue and copy and paste this message into it. Tweet url: {url}" + + mp4_url = video_part_prefix + mp4_parts[0] + + m4s_part_pattern = re.compile(r"(/[^\n]*\.m4s)") + m4s_parts = m4s_part_pattern.findall(resp.text) + + with open(output_filename, "wb") as f: + r = requests.get(mp4_url, stream=True) + for chunk in r.iter_content(chunk_size=1024): + if chunk: + f.write(chunk) + f.flush() + + for part in m4s_parts: + part_url = video_part_prefix + part + r = requests.get(part_url, stream=True) + for chunk in r.iter_content(chunk_size=1024): + if chunk: + f.write(chunk) + f.flush() + + return True + + def repost_check(j, exclude_replies=True): + try: + # This line extract the index of the first reply + reply_index = j.index('"conversationthread-') + except ValueError: + # If there are no replies we use the enrire response data length + reply_index = len(j) + # We truncate the response data to exclude replies + if exclude_replies: + j = j[0:reply_index] + + # We use this regular expression to extract the source status + source_status_pattern = r'"source_status_id_str"\s*:\s*"\d+"' + matches = re.findall(source_status_pattern, j) + + if len(matches) > 0 and exclude_replies: + # We extract the source status id (ssid) + ssid = json.loads("{" + matches[0] + "}")["source_status_id_str"] + # We plug it in this regular expression to find expanded_url (the original tweet url) + expanded_url_pattern = ( + r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/' + ssid + '[^"]+"' + ) + matches2 = re.findall(expanded_url_pattern, j) + + if len(matches2) > 0: + # We extract the url and return it + status_url = json.loads("{" + matches2[0] + "}")["expanded_url"] + return status_url + + if not exclude_replies: + # If we include replies we'll have to get all ssids and remove duplicates + ssids = [] + for match in matches: + ssids.append(json.loads("{" + match + "}")["source_status_id_str"]) + # we remove duplicates (this line is messy but it's the easiest way to do it) + ssids = list(set(ssids)) + if len(ssids) > 0: + for ssid in ssids: + expanded_url_pattern = ( + r'"expanded_url"\s*:\s*"https://x\.com/[^/]+/status/' + + ssid + + '[^"]+"' + ) + matches2 = re.findall(expanded_url_pattern, j) + if len(matches2) > 0: + status_urls = [] + for match in matches2: + status_urls.append( + json.loads("{" + match + "}")["expanded_url"] + ) + # We remove duplicates another time + status_urls = list(set(status_urls)) + return status_urls + + # If we don't find source_status_id_str, the tweet doesn't feature a reposted video + return None + + def download_video_from_x(tweet_url, output_file, target_all_videos=False): + bearer_token, guest_token = get_tokens(tweet_url) + resp = get_tweet_details(tweet_url, guest_token, bearer_token) + mp4s = extract_mp4s(resp.text, tweet_url, target_all_videos) + if target_all_videos: + video_counter = 1 + original_urls = repost_check(resp.text, exclude_replies=False) + + if len(original_urls) > 0: + for url in original_urls: + download_video_from_x( + url, output_file.replace(".mp4", f"_{video_counter}.mp4") + ) + video_counter += 1 + if len(mp4s) > 0: + for mp4 in mp4s: + output_file = output_file.replace(".mp4", f"_{video_counter}.mp4") + if "container" in mp4: + download_parts(mp4, output_file) + + else: + r = requests.get(mp4, stream=True) + with open(output_file, "wb") as f: + for chunk in r.iter_content(chunk_size=1024): + if chunk: + f.write(chunk) + f.flush() + video_counter += 1 + else: + original_url = repost_check(resp.text) + + if original_url: + download_video_from_x(original_url, output_file) + else: + assert ( + len(mp4s) > 0 + ), f"Could not find any mp4s to download. Make sure you are using the correct url. If you are, then file a GitHub issue and copy and paste this message. Tweet url: {tweet_url}" + + mp4 = mp4s[0] + if "container" in mp4: + download_parts(mp4, output_file) + else: + # use a stream to download the file + r = requests.get(mp4, stream=True) + with open(output_file, "wb") as f: + for chunk in r.iter_content(chunk_size=1024): + if chunk: + f.write(chunk) + f.flush() + return target_location + + return download_video_from_x(source_url, target_location) + + +# TIKTOK/INSTA +def getDict() -> dict: + response = requests.get('https://ttdownloader.com/') + point = response.text.find(' str: + response = requests.post('https://ttdownloader.com/search/', + cookies=cookies, headers=headers, data=data) + parsed_link = [i for i in str(response.text).split() + if i.startswith("href=")][0] + + response = requests.get(parsed_link[6:-10]) + with open(path + "tiktok" + name + ".mp4", "wb") as f: + f.write(response.content) + return path + "tiktok" + name + ".mp4" + + +def TiktokDownloadAll(linkList, path) -> str: + parseDict = getDict() + cookies, headers, data = createHeader(parseDict) + # linkList = getLinkDict()['tiktok'] + for i in linkList: + try: + data['url'] = i + result = TikTokDownload(cookies, headers, data, str(linkList.index(i)), path) + return result + except IndexError: + parseDict = getDict() + cookies, headers, data = createHeader(parseDict) + except Exception as err: + print(err) + exit(1) + + +def InstagramDownload(url, name, path) -> str: + obj = instaloader.Instaloader() + post = instaloader.Post.from_shortcode(obj.context, url.split("/")[-2]) + photo_url = post.url + video_url = post.video_url + print(video_url) + if video_url: + response = requests.get(video_url) + with open(path + "insta" + name + ".mp4", "wb") as f: + f.write(response.content) + return path + "insta" + name + ".mp4" + elif photo_url: + response = requests.get(photo_url) + with open(path + "insta" + name + ".jpg", "wb") as f: + f.write(response.content) + return path + "insta" + name + ".jpg" + + +def InstagramDownloadAll(linklist, path) -> str: + for i in linklist: + try: + print(str(linklist.index(i))) + print(str(linklist[i])) + result = InstagramDownload(i, str(linklist.index(i)), path) + return result + except Exception as err: + print(err) + exit(1) + + +# YOUTUBE +def YouTubeDownload(link, path, audio_only=True): + youtubeObject = YouTube(link) + if audio_only: + youtubeObject = youtubeObject.streams.get_audio_only() + youtubeObject.download(path, "yt.mp3") + print("Download is completed successfully") + return path + "yt.mp3" + else: + youtubeObject = youtubeObject.streams.get_highest_resolution() + youtubeObject.download(path, "yt.mp4") + print("Download is completed successfully") + return path + "yt.mp4" + + +def checkYoutubeLinkValid(link): + try: + # TODO find a way to test without fully downloading the file + youtubeObject = YouTube(link) + youtubeObject = youtubeObject.streams.get_audio_only() + youtubeObject.download(".", "yt.mp3") + os.remove("yt.mp3") + return True + + except Exception as e: + print(str(e)) + return False + + +# OVERCAST +def OvercastDownload(source_url, target_location): + def get_title(html_str): + """Get the title from the meta tags""" + + title = re.findall(r".... tag""" + + url = re.findall(r" (str, str): try: res = requests.post(url, json=data, headers=headers) obj = json.loads(res.text) - return obj["payment_request"], obj["payment_hash"] + if obj.get("payment_request") and obj.get("payment_hash"): + return obj["payment_request"], obj["payment_hash"]# + else: + print(res.text) + return None, None except Exception as e: print("LNBITS: " + str(e)) return None, None @@ -121,7 +125,10 @@ def check_bolt11_ln_bits_is_paid(payment_hash: str, config: DVMConfig): try: res = requests.get(url, headers=headers) obj = json.loads(res.text) - return obj["paid"] + if obj.get("paid"): + return obj["paid"] + else: + return False except Exception as e: return None @@ -133,7 +140,10 @@ def pay_bolt11_ln_bits(bolt11: str, config: DVMConfig): try: res = requests.post(url, json=data, headers=headers) obj = json.loads(res.text) - return obj["payment_hash"] + if obj.get("payment_hash"): + return obj["payment_hash"] + else: + return "Error" except Exception as e: print("LNBITS: " + str(e)) return None, None