import { Injectable } from '@angular/core'; import { Subject, BehaviorSubject, Observable, pipe } from 'rxjs'; import { Thread } from '../models/thread.model'; import { Message } from '../models/message.model'; import { MessagesService } from '../services/messages.service'; import * as _ from 'lodash'; import { map, combineLatest } from 'rxjs/operators' @Injectable() export class ThreadsService { // `threads` is a observable that contains the most up to date list of threads threads: Observable<{ [key: string]: Thread }>; // `orderedThreads` contains a newest-first chronological list of threads orderedThreads: Observable; // `currentThread` contains the currently selected thread currentThread: Subject = new BehaviorSubject(new Thread()); // `currentThreadMessages` contains the set of messages for the currently // selected thread currentThreadMessages: Observable; constructor(public messagesService: MessagesService) { this.threads = messagesService.messages .pipe(map( (messages: Message[]) => { const threads: {[key: string]: Thread} = {}; // Store the message's thread in our accumulator `threads` messages.map((message: Message) => { threads[message.thread.id] = threads[message.thread.id] || message.thread; // Cache the most recent message for each thread const messagesThread: Thread = threads[message.thread.id]; if (!messagesThread.lastMessage || messagesThread.lastMessage.sentAt < message.sentAt) { messagesThread.lastMessage = message; } }); return threads; }) // map ) // pipe ; this.orderedThreads = this.threads .pipe(map((threadGroups: { [key: string]: Thread }) => { const threads: Thread[] = _.values(threadGroups); return _.sortBy(threads, (t: Thread) => t.lastMessage.sentAt).reverse(); }) //map ) //pipe ; this.currentThreadMessages = this.currentThread .pipe(combineLatest(messagesService.messages, (currentThread: Thread, messages: Message[]) => { if (currentThread && messages.length > 0) { return _.chain(messages) .filter((message: Message) => (message.thread.id === currentThread.id)) .map((message: Message) => { message.isRead = true; return message; }) .value(); } else { return []; } }) // combineLatest ) // pipe ; this.currentThread.subscribe(this.messagesService.markThreadAsRead); } setCurrentThread(newThread: Thread): void { this.currentThread.next(newThread); } } export const threadsServiceInjectables: Array = [ ThreadsService ];