import { Directive } from '@angular/core';
import { IDataModel, LoggerService } from '@klickdata/core/application';
import { ConfigService } from '@klickdata/core/config';
import { Customer, CustomerData } from '@klickdata/core/customer';
import { ResponseData } from '@klickdata/core/http';
import { User, UserData } from '@klickdata/core/user';
import Echo, { Channel } from 'laravel-echo';
import Pusher from 'pusher-js';
import {
    BehaviorSubject,
    EMPTY,
    Observable,
    Subject,
    combineLatest,
    distinctUntilChanged,
    distinctUntilKeyChanged,
    filter,
    interval,
    map,
    switchMap,
    takeUntil,
    tap,
} from 'rxjs';

@Directive()
export abstract class EchoService {
    protected token: BehaviorSubject<string> = new BehaviorSubject(null);
    protected sharedUser = new BehaviorSubject<User>(null);
    protected sharedCustomer = new BehaviorSubject<Customer>(null);

    protected destroy = new Subject<boolean>();
    private echo: Echo;
    private userChannel: Observable<Channel>;
    private customerChannel: Observable<Channel>;

    constructor(protected configService: ConfigService, protected logger: LoggerService) {
        /**
         * Load Echo
         */
        Window.bind('Pusher', Pusher);

        this.userChannel = this.sharedUser.pipe(
            filter((user) => !!user?.id),
            distinctUntilKeyChanged('id'),
            map((user) => this.echo.private(`App.User.${user.id}`))
        );

        this.customerChannel = this.sharedCustomer.pipe(
            filter((customer) => !!customer?.id),
            distinctUntilKeyChanged('id'),
            map((customer) => this.echo.private(`customer.${customer.id}`))
        );

        combineLatest([this.token, this.sharedUser])
            .pipe(takeUntil(this.destroy))
            .subscribe(([token, user]) => {
                if (user && token) {
                    this.echo = this.getEchoInstance();
                    this.setEchoAuthToken(token);
                } else if (!user && !token) {
                    this.disconnect();
                }
            });
    }

    private disconnect() {
        if (this.echo) {
            this.echo.leaveAllChannels();
            this.echo.disconnect();
            this.echo = null;
        }
    }

    private getEchoInstance() {
        return new Echo({
            broadcaster: 'pusher',
            disableStats: true,

            wsHost: this.configService.config.WS_ECHO_SERVER,
            wsPort: this.configService.config.WS_ECHO_PORT,
            wssPort: this.configService.config.WSS_ECHO_PORT,

            key: this.configService.config.WS_ECHO_APP_KEY,
            cluster: this.configService.config.WS_ECHO_CLUSTER,
            authEndpoint: this.configService.config.WS_ECHO_AUTH,

            forceTLS: this.configService.config.WS_ECHO_TLS,
            encrypted: this.configService.config.WS_ECHO_TLS,
            enabledTransports: ['ws', 'wss'],
        });
    }

    /**
     * Set token to keep echo auth token updated.
     * @param token
     */
    private setEchoAuthToken(token: string) {
        // this.echo.connector.pusher.config.auth.headers['Authorization'] = `Bearer ${token}`;
        this.echo.connector.options.auth.headers['Authorization'] = `Bearer ${token}`;
    }

    public listen<T>(channel: string, event: string): Observable<T> {
        return new Observable((subs) => {
            this.echo.listen(channel, event, (data: T) => subs.next(data));
        });
    }

    public listenPrivate<T extends IDataModel | IDataModel[]>(
        channel: string,
        event: string,
        looper: Observable<ResponseData<T>> = null,
        looperInterval = 5000
    ): Observable<ResponseData<T>> {
        if (this.socketId) {
            return new Observable((subs) => {
                this.echo.private(channel).listen(event, (data: ResponseData<T>) => subs.next(data));
            });
        } else if (looper) {
            return interval(looperInterval).pipe(
                switchMap(() => looper),
                filter((res) => !!res.data),
                distinctUntilChanged((t1, t2) => this.detectChange(t1.data, t2.data))
            );
        } else {
            return EMPTY;
        }
    }

    private detectChange(t1: IDataModel | IDataModel[], t2: IDataModel | IDataModel[]): boolean {
        if (!Array.isArray(t1) && !Array.isArray(t2)) {
            return t1?.prompt_status === t2?.prompt_status;
        }

        if (Array.isArray(t1) && Array.isArray(t2)) {
            return t1.some((item1) => {
                const item2 = t2.find((item) => item.id === item1.id);
                return item1?.prompt_status === item2?.prompt_status;
            });
        }

        return true;
    }

    public get socketId(): string {
        return this.echo?.socketId() ?? '';
    }

    public listenUserChannel<T>(event: string): Observable<T> {
        return this.userChannel.pipe(
            switchMap((ch) => {
                return new Observable<T>((subs) => {
                    ch.listen(event, (data: T) => subs.next(data));
                });
            })
        );
    }

    public listenCustomerChannel<T>(event: string): Observable<T> {
        return this.customerChannel.pipe(
            switchMap((ch) => {
                return new Observable<T>((subs) => {
                    ch.listen(event, (data: T) => subs.next(data));
                });
            })
        );
    }

    /**
     * Auth socket
     */
    protected getAuthUserSocket(): Observable<ResponseData<UserData>> {
        return this.listenUserChannel('UserUpdateEvent');
    }

    protected getAuthCustomerSocket(): Observable<ResponseData<CustomerData>> {
        return this.listenCustomerChannel('CustomerUpdateEvent').pipe(tap((v) => this.logger.log(v)));
    }

    public tokenExpiringSocket(): Observable<ResponseData<{ token_expiry: string }>> {
        return this.listenUserChannel('TokenExpiringEvent');
    }

    protected tokenExpiredSocket(): Observable<ResponseData<any>> {
        return this.listenUserChannel('TokenExpiredEvent');
    }

    protected logOutOtherSocket(): Observable<ResponseData<any>> {
        return this.listenUserChannel('LogOutOtherEvent');
    }
}
