Update chat service

This commit is contained in:
Milad Raeisi
2024-09-21 00:13:49 +04:00
parent afc55e9874
commit 7054b6e7d8
9 changed files with 226 additions and 200 deletions

View File

@@ -26,18 +26,11 @@ const conversationResolver = (
const router = inject(Router);
return chatService.getChatById(route.paramMap.get('id')).pipe(
// Error here means the requested chat is not available
catchError((error) => {
// Log the error
console.error(error);
// Get the parent url
const parentUrl = state.url.split('/').slice(0, -1).join('/');
// Navigate to there
router.navigateByUrl(parentUrl);
// Throw an error
return throwError(error);
})
);

View File

@@ -1,7 +1,6 @@
import { HttpClient } from '@angular/common/http';
import { Injectable, OnDestroy } from '@angular/core';
import { BehaviorSubject, Observable, Subject, throwError, of, Subscriber } from 'rxjs';
import { filter, map, switchMap, take, takeUntil, tap } from 'rxjs/operators';
import { BehaviorSubject, Observable, Subject, throwError, of, Subscriber, from } from 'rxjs';
import { catchError, filter, map, switchMap, take, takeUntil, tap } from 'rxjs/operators';
import { DomSanitizer } from '@angular/platform-browser';
import { Chat, Contact, Profile } from 'app/components/chat/chat.types';
import { IndexedDBService } from 'app/services/indexed-db.service';
@@ -10,6 +9,7 @@ import { SignerService } from 'app/services/signer.service';
import { Filter, NostrEvent } from 'nostr-tools';
import { RelayService } from 'app/services/relay.service';
import { EncryptedDirectMessage } from 'nostr-tools/kinds';
import { getEventHash } from 'nostr-tools';
@Injectable({ providedIn: 'root' })
export class ChatService implements OnDestroy {
@@ -26,14 +26,14 @@ export class ChatService implements OnDestroy {
private _unsubscribeAll: Subject<void> = new Subject<void>();
constructor(
private _httpClient: HttpClient,
private _metadataService: MetadataService,
private _signerService: SignerService,
private _indexedDBService: IndexedDBService,
private _relayService: RelayService,
private _sanitizer: DomSanitizer
) { }
) {}
// Getters for observables
get chat$(): Observable<Chat | null> {
return this._chat.asObservable();
}
@@ -54,13 +54,11 @@ export class ChatService implements OnDestroy {
return this._profile.asObservable();
}
// Fetch a contact by public key
async getContact(pubkey: string): Promise<void> {
try {
const metadata = await this._metadataService.fetchMetadataWithCache(pubkey);
if (metadata) {
const contact: Contact = {
pubKey: pubkey,
displayName: metadata.name,
@@ -69,11 +67,10 @@ export class ChatService implements OnDestroy {
};
this._contact.next(contact);
// Subscribe to metadata stream for updates
this._indexedDBService.getMetadataStream()
.pipe(takeUntil(this._unsubscribeAll))
.subscribe((updatedMetadata) => {
if (updatedMetadata && updatedMetadata.pubkey === pubkey) {
const updatedContact: Contact = {
pubKey: pubkey,
@@ -81,7 +78,6 @@ export class ChatService implements OnDestroy {
picture: updatedMetadata.metadata.picture,
about: updatedMetadata.metadata.about
};
this._contact.next(updatedContact);
}
});
@@ -91,16 +87,15 @@ export class ChatService implements OnDestroy {
}
}
// Fetch contacts from IndexedDB and subscribe to real-time updates
getContacts(): Observable<Contact[]> {
return new Observable<Contact[]>((observer) => {
this._indexedDBService.getAllUsers()
.then((cachedContacts: Contact[]) => {
if (cachedContacts.length > 0) {
this._contacts.next(cachedContacts);
observer.next(cachedContacts);
}
const pubkeys = cachedContacts.map(contact => contact.pubKey);
if (pubkeys.length > 0) {
this.subscribeToRealTimeContacts(pubkeys, observer);
@@ -111,13 +106,13 @@ export class ChatService implements OnDestroy {
observer.error(error);
});
return () => {
console.log('Unsubscribing from contacts updates.');
};
});
}
// Subscribe to real-time updates for contacts
private subscribeToRealTimeContacts(pubkeys: string[], observer: Subscriber<Contact[]>): void {
this._metadataService.fetchMetadataForMultipleKeys(pubkeys)
.then((metadataList: any[]) => {
@@ -133,15 +128,12 @@ export class ChatService implements OnDestroy {
};
if (contactIndex !== -1) {
updatedContacts[contactIndex] = { ...updatedContacts[contactIndex], ...newContact };
} else {
updatedContacts.push(newContact);
}
});
this._contacts.next(updatedContacts);
observer.next(updatedContacts);
})
@@ -151,15 +143,15 @@ export class ChatService implements OnDestroy {
});
}
// Fetch profile of the user
async getProfile(): Promise<void> {
try {
const publicKey = this._signerService.getPublicKey();
const metadata = await this._metadataService.fetchMetadataWithCache(publicKey);
if (metadata) {
this._profile.next(metadata);
// Subscribe to updates in the metadata stream
this._indexedDBService.getMetadataStream()
.pipe(takeUntil(this._unsubscribeAll))
.subscribe((updatedMetadata) => {
@@ -173,181 +165,233 @@ export class ChatService implements OnDestroy {
}
}
getChatById(id: string): Observable<Chat> {
return this._httpClient.get<Chat>('api/apps/chat/chat', { params: { id } }).pipe(
map((chat: Chat) => {
this._chat.next(chat);
return chat;
}),
switchMap((chat: Chat | null) => {
if (!chat) {
return throwError('Could not find chat with id ' + id);
}
return of(chat);
})
);
}
// Fetch chats and subscribe to updates
async getChats(): Promise<Observable<Chat[]>> {
const pubkey = this._signerService.getPublicKey();
const useExtension = await this._signerService.isUsingExtension(); const decryptedPrivateKey = await this._signerService.getSecretKey("123");
this.subscribeToChatList(pubkey, useExtension, decryptedPrivateKey);
return this.getChatListStream();
const pubkey = this._signerService.getPublicKey();
const useExtension = await this._signerService.isUsingExtension();
const decryptedPrivateKey = await this._signerService.getSecretKey("123");
this.subscribeToChatList(pubkey, useExtension, decryptedPrivateKey);
return this.getChatListStream();
}
// Subscribe to chat list updates based on filters
subscribeToChatList(pubkey: string, useExtension: boolean, decryptedSenderPrivateKey: string): Observable<Chat[]> {
this._relayService.ensureConnectedRelays().then(() => {
const filters: Filter[] = [
{
kinds: [EncryptedDirectMessage],
authors: [pubkey], },
{
kinds: [EncryptedDirectMessage],
'#p': [pubkey] }
];
this._relayService.ensureConnectedRelays().then(() => {
const filters: Filter[] = [
{ kinds: [EncryptedDirectMessage], authors: [pubkey] },
{ kinds: [EncryptedDirectMessage], '#p': [pubkey] }
];
this._relayService.getPool().subscribeMany(this._relayService.getConnectedRelays(), filters, {
onevent: async (event: NostrEvent) => {
const otherPartyPubKey = event.pubkey === pubkey
? event.tags.find(tag => tag[0] === 'p')?.[1] || ''
: event.pubkey;
this._relayService.getPool().subscribeMany(this._relayService.getConnectedRelays(), filters, {
onevent: async (event: NostrEvent) => {
const otherPartyPubKey = event.pubkey === pubkey
? event.tags.find(tag => tag[0] === 'p')?.[1] || ''
: event.pubkey;
if (!otherPartyPubKey) return;
if (!otherPartyPubKey) return;
const lastTimestamp = this.latestMessageTimestamps[otherPartyPubKey] || 0;
if (event.created_at > lastTimestamp) {
this.latestMessageTimestamps[otherPartyPubKey] = event.created_at;
this.messageQueue.push(event);
this.processNextMessage(pubkey, useExtension, decryptedSenderPrivateKey);
}
},
oneose: () => {
console.log('Subscription closed');
this._chats.next(this.chatList);
}
});
const lastTimestamp = this.latestMessageTimestamps[otherPartyPubKey] || 0;
if (event.created_at > lastTimestamp) {
this.latestMessageTimestamps[otherPartyPubKey] = event.created_at;
this.messageQueue.push(event);
this.processNextMessage(pubkey, useExtension, decryptedSenderPrivateKey);
}
},
oneose: () => {
console.log('Subscription closed');
this._chats.next(this.chatList);
}
});
});
return this.getChatListStream();
}
}
private async processNextMessage(pubkey: string, useExtension: boolean, decryptedSenderPrivateKey: string): Promise<void> {
// Process each message in the queue
private async processNextMessage(pubkey: string, useExtension: boolean, decryptedSenderPrivateKey: string): Promise<void> {
if (this.isDecrypting || this.messageQueue.length === 0) return;
this.isDecrypting = true;
const event = this.messageQueue.shift();
if (!event) {
this.isDecrypting = false;
return;
this.isDecrypting = false;
return;
}
const isSentByUser = event.pubkey === pubkey;
const otherPartyPubKey = isSentByUser
? event.tags.find(tag => tag[0] === 'p')?.[1] || ''
: event.pubkey;
? event.tags.find(tag => tag[0] === 'p')?.[1] || ''
: event.pubkey;
if (!otherPartyPubKey) {
this.isDecrypting = false;
return;
this.isDecrypting = false;
return;
}
try {
const decryptedMessage = await this.decryptReceivedMessage(
event,
useExtension,
decryptedSenderPrivateKey,
otherPartyPubKey
);
const decryptedMessage = await this.decryptReceivedMessage(
event,
useExtension,
decryptedSenderPrivateKey,
otherPartyPubKey
);
if (decryptedMessage) {
const messageTimestamp = event.created_at * 1000; this.addOrUpdateChatList(otherPartyPubKey, decryptedMessage, messageTimestamp);
}
if (decryptedMessage) {
const messageTimestamp = event.created_at * 1000;
this.addOrUpdateChatList(otherPartyPubKey, decryptedMessage, messageTimestamp);
}
} catch (error) {
console.error('Failed to decrypt message:', error);
console.error('Failed to decrypt message:', error);
} finally {
this.isDecrypting = false;
this.processNextMessage(pubkey, useExtension, decryptedSenderPrivateKey);
this.isDecrypting = false;
this.processNextMessage(pubkey, useExtension, decryptedSenderPrivateKey);
}
}
}
private addOrUpdateChatList(pubKey: string, message: string, createdAt: number): void {
// Add or update chat in the chat list
private addOrUpdateChatList(pubKey: string, message: string, createdAt: number): void {
const existingChat = this.chatList.find(chat => chat.contact?.pubKey === pubKey);
if (existingChat) {
if (existingChat.lastMessageAt && new Date(existingChat.lastMessageAt).getTime() < createdAt) {
existingChat.lastMessage = message;
existingChat.lastMessageAt = new Date(createdAt).toISOString();
}
if (existingChat.lastMessageAt && new Date(existingChat.lastMessageAt).getTime() < createdAt) {
existingChat.lastMessage = message;
existingChat.lastMessageAt = new Date(createdAt).toISOString();
}
} else {
const newChat: Chat = {
contact: { pubKey },
lastMessage: message,
lastMessageAt: new Date(createdAt).toISOString()
};
this.chatList.push(newChat);
this.fetchMetadataForPubKey(pubKey);
const newChat: Chat = {
id: pubKey,
contact: { pubKey },
lastMessage: message,
lastMessageAt: new Date(createdAt).toISOString(),
messages: [
{
id: pubKey,
chatId: pubKey,
contactId: pubKey,
isMine: true,
value: message,
createdAt: new Date(createdAt).toISOString(),
}
]
};
this.chatList.push(newChat);
this.fetchMetadataForPubKey(pubKey);
}
this.chatList.sort((a, b) => new Date(b.lastMessageAt!).getTime() - new Date(a.lastMessageAt!).getTime());
this.chatList.sort((a, b) => new Date(b.lastMessageAt!).getTime() - new Date(a.lastMessageAt!).getTime());
this._chats.next(this.chatList);
}
this._chats.next(this.chatList);
}
private fetchMetadataForPubKey(pubKey: string): void {
// Fetch metadata for a public key
private fetchMetadataForPubKey(pubKey: string): void {
this._metadataService.fetchMetadataWithCache(pubKey)
.then(metadata => {
const chat = this.chatList.find(chat => chat.contact?.pubKey === pubKey);
if (chat && metadata) {
chat.contact = { ...chat.contact, ...metadata };
this._chats.next(this.chatList);
}
})
.catch(error => {
console.error(`Failed to fetch metadata for pubKey: ${pubKey}`, error);
});
}
.then(metadata => {
const chat = this.chatList.find(chat => chat.contact?.pubKey === pubKey);
if (chat && metadata) {
chat.contact = { ...chat.contact, ...metadata };
this._chats.next(this.chatList);
}
})
.catch(error => {
console.error(`Failed to fetch metadata for pubKey: ${pubKey}`, error);
});
}
getChatListStream(): Observable<Chat[]> {
// Get chat list stream
getChatListStream(): Observable<Chat[]> {
return this._chats.asObservable();
}
private async decryptReceivedMessage(event: NostrEvent, useExtension: boolean, decryptedSenderPrivateKey: string, otherPartyPubKey: string): Promise<string> {
return 'Decrypted message'; }
}
// Decrypt received message
private async decryptReceivedMessage(event: NostrEvent, useExtension: boolean, decryptedSenderPrivateKey: string, otherPartyPubKey: string): Promise<string> {
return 'Decrypted message'; // Implement decryption logic here
}
// Update chat in the chat list
updateChat(id: string, chat: Chat): Observable<Chat> {
return this.chats$.pipe(
take(1),
switchMap((chats: Chat[] | null) =>
this._httpClient.patch<Chat>('api/apps/chat/chat', { id, chat }).pipe(
map((updatedChat: Chat) => {
switchMap((chats: Chat[] | null) => {
const pubkey = chat.contact?.pubKey;
if (!pubkey) {
return throwError('No public key found for this chat');
}
const event: any = {
kind: 4,
pubkey: pubkey,
content: JSON.stringify(chat),
created_at: Math.floor(Date.now() / 1000),
tags: [['p', pubkey]],
};
event.id = getEventHash(event);
return from(this._relayService.publishEventToRelays(event)).pipe(
map(() => {
if (chats) {
const index = chats.findIndex((item) => item.id === id);
if (index !== -1) {
chats[index] = updatedChat;
chats[index] = chat;
this._chats.next(chats);
}
}
return updatedChat;
return chat;
}),
catchError((error) => {
console.error('Failed to update chat via Nostr:', error);
return throwError(error);
})
)
)
);
})
);
}
// Get chat by ID
getChatById(id: string): Observable<Chat> {
const recipientPublicKey = id;
const pubkey = this._signerService.getPublicKey();
const useExtension = this._signerService.isUsingExtension();
const decryptedSenderPrivateKey = this._signerService.getSecretKey('123');
return this.chats$.pipe(
take(1),
switchMap((chats: Chat[] | null) => {
const cachedChat = chats?.find(chat => chat.id === id);
if (cachedChat) {
this._chat.next(cachedChat);
return of(cachedChat);
}
const newChat: Chat = {
id: recipientPublicKey,
contact: { pubKey: recipientPublicKey, picture: "/images/avatars/avatar-placeholder.png" },
lastMessage: '',
lastMessageAt: new Date().toISOString(),
messages: []
};
const updatedChats = chats ? [...chats, newChat] : [newChat];
this._chats.next(updatedChats);
this._chat.next(newChat);
return of(newChat);
}),
catchError((error) => {
console.error('Error fetching chat by id from Nostr:', error);
return throwError(error);
})
);
}
// Reset chat state
resetChat(): void {
this._chat.next(null);
}
// Clean up on destroy
ngOnDestroy(): void {
this._unsubscribeAll.next();
this._unsubscribeAll.complete();

View File

@@ -185,7 +185,7 @@
@if (chat.contact.picture) {
<img
class="h-full w-full rounded-full object-cover"
[src]="chat.contact || '/images/avatars/avatar-placeholder.png'"
[src]="chat.contact.picture || '/images/avatars/avatar-placeholder.png'"
onerror="this.onerror=null; this.src='/images/avatars/avatar-placeholder.png';"
alt="Contact picture"
/>

View File

@@ -57,41 +57,38 @@ export class ChatsComponent implements OnInit, OnDestroy {
private _changeDetectorRef: ChangeDetectorRef
) {}
// -----------------------------------------------------------------------------------------------------
// @ Lifecycle hooks
// -----------------------------------------------------------------------------------------------------
/**
* On init
*/
ngOnInit(): void {
// Chats
this._chatService.chats$
.pipe(takeUntil(this._unsubscribeAll))
.subscribe((chats: Chat[]) => {
this.chats = this.filteredChats = chats;
// Mark for check
this._changeDetectorRef.markForCheck();
});
// Profile
this._chatService.profile$
.pipe(takeUntil(this._unsubscribeAll))
.subscribe((profile: Profile) => {
this.profile = profile;
// Mark for check
this._changeDetectorRef.markForCheck();
});
// Selected chat
this._chatService.chat$
.pipe(takeUntil(this._unsubscribeAll))
.subscribe((chat: Chat) => {
this.selectedChat = chat;
// Mark for check
this._changeDetectorRef.markForCheck();
});
}
@@ -100,25 +97,21 @@ export class ChatsComponent implements OnInit, OnDestroy {
* On destroy
*/
ngOnDestroy(): void {
// Unsubscribe from all subscriptions
this._unsubscribeAll.next(null);
this._unsubscribeAll.complete();
// Reset the chat
this._chatService.resetChat();
}
// -----------------------------------------------------------------------------------------------------
// @ Public methods
// -----------------------------------------------------------------------------------------------------
/**
* Filter the chats
*
* @param query
*/
filterChats(query: string): void {
// Reset the filter
if (!query) {
this.filteredChats = this.chats;
return;
@@ -136,7 +129,7 @@ export class ChatsComponent implements OnInit, OnDestroy {
this.drawerComponent = 'new-chat';
this.drawerOpened = true;
// Mark for check
this._changeDetectorRef.markForCheck();
}
@@ -147,7 +140,7 @@ export class ChatsComponent implements OnInit, OnDestroy {
this.drawerComponent = 'profile';
this.drawerOpened = true;
// Mark for check
this._changeDetectorRef.markForCheck();
}

View File

@@ -128,7 +128,7 @@
</div>
<!-- Conversation -->
<div class="flex flex-col-reverse overflow-y-auto">
<div class="flex flex-col-reverse overflow-y-auto h-full">
<div
class="bg-card flex flex-auto shrink flex-col p-6 dark:bg-transparent"
>

View File

@@ -63,9 +63,9 @@ export class ConversationComponent implements OnInit, OnDestroy {
private _ngZone: NgZone
) {}
// -----------------------------------------------------------------------------------------------------
// @ Decorated methods
// -----------------------------------------------------------------------------------------------------
/**
* Resize on 'input' and 'ngModelChange' events
@@ -75,54 +75,54 @@ export class ConversationComponent implements OnInit, OnDestroy {
@HostListener('input')
@HostListener('ngModelChange')
private _resizeMessageInput(): void {
// This doesn't need to trigger Angular's change detection by itself
this._ngZone.runOutsideAngular(() => {
setTimeout(() => {
// Set the height to 'auto' so we can correctly read the scrollHeight
this.messageInput.nativeElement.style.height = 'auto';
// Detect the changes so the height is applied
this._changeDetectorRef.detectChanges();
// Get the scrollHeight and subtract the vertical padding
this.messageInput.nativeElement.style.height = `${this.messageInput.nativeElement.scrollHeight}px`;
// Detect the changes one more time to apply the final height
this._changeDetectorRef.detectChanges();
});
});
}
// -----------------------------------------------------------------------------------------------------
// @ Lifecycle hooks
// -----------------------------------------------------------------------------------------------------
/**
* On init
*/
ngOnInit(): void {
// Chat
this._chatService.chat$
.pipe(takeUntil(this._unsubscribeAll))
.subscribe((chat: Chat) => {
this.chat = chat;
// Mark for check
this._changeDetectorRef.markForCheck();
});
// Subscribe to media changes
this._angorMediaWatcherService.onMediaChange$
.pipe(takeUntil(this._unsubscribeAll))
.subscribe(({ matchingAliases }) => {
// Set the drawerMode if the given breakpoint is active
if (matchingAliases.includes('lg')) {
this.drawerMode = 'side';
} else {
this.drawerMode = 'over';
}
// Mark for check
this._changeDetectorRef.markForCheck();
});
}
@@ -131,23 +131,19 @@ export class ConversationComponent implements OnInit, OnDestroy {
* On destroy
*/
ngOnDestroy(): void {
// Unsubscribe from all subscriptions
this._unsubscribeAll.next(null);
this._unsubscribeAll.complete();
}
// -----------------------------------------------------------------------------------------------------
// @ Public methods
// -----------------------------------------------------------------------------------------------------
/**
* Open the contact info
*/
openContactInfo(): void {
// Open the drawer
this.drawerOpened = true;
// Mark for check
this._changeDetectorRef.markForCheck();
}
@@ -157,10 +153,10 @@ export class ConversationComponent implements OnInit, OnDestroy {
resetChat(): void {
this._chatService.resetChat();
// Close the contact info in case it's opened
this.drawerOpened = false;
// Mark for check
this._changeDetectorRef.markForCheck();
}
@@ -168,10 +164,10 @@ export class ConversationComponent implements OnInit, OnDestroy {
* Toggle mute notifications
*/
toggleMuteNotifications(): void {
// Toggle the muted
this.chat.muted = !this.chat.muted;
// Update the chat on the server
this._chatService.updateChat(this.chat.id, this.chat).subscribe();
}

View File

@@ -51,7 +51,7 @@
<div
class="flex h-full w-full items-center justify-center rounded-full bg-gray-200 text-lg uppercase text-gray-600 dark:bg-gray-700 dark:text-gray-200"
>
{{ contact.name.charAt(0) }}
{{ contact?.name?.charAt(0) ?? '' }}
</div>
}
</div>

View File

@@ -31,15 +31,15 @@ export class NewChatComponent implements OnInit, OnDestroy {
*/
constructor(private _chatService: ChatService) {}
// -----------------------------------------------------------------------------------------------------
// @ Lifecycle hooks
// -----------------------------------------------------------------------------------------------------
/**
* On init
*/
ngOnInit(): void {
// Contacts
this._chatService.contacts$
.pipe(takeUntil(this._unsubscribeAll))
.subscribe((contacts: Contact[]) => {
@@ -51,14 +51,14 @@ export class NewChatComponent implements OnInit, OnDestroy {
* On destroy
*/
ngOnDestroy(): void {
// Unsubscribe from all subscriptions
this._unsubscribeAll.next(null);
this._unsubscribeAll.complete();
}
// -----------------------------------------------------------------------------------------------------
// @ Public methods
// -----------------------------------------------------------------------------------------------------
/**
* Track by function for ngFor loops

View File

@@ -44,7 +44,7 @@ export class ProfileComponent implements OnInit, OnDestroy {
* On init
*/
ngOnInit(): void {
// Profile
this._chatService.profile$
.pipe(takeUntil(this._unsubscribeAll))
.subscribe((profile: Profile) => {
@@ -56,7 +56,7 @@ export class ProfileComponent implements OnInit, OnDestroy {
* On destroy
*/
ngOnDestroy(): void {
// Unsubscribe from all subscriptions
this._unsubscribeAll.next(null);
this._unsubscribeAll.complete();
}