import { MqttmessagingService } from './mqttmessaging.service';
import { HttpTransferService } from './httpTransfer.service';
import { Injectable } from '@angular/core';
import { IMqttMessage, MqttConnectionState, MqttService } from "ngx-mqtt";
import { Observable } from 'rxjs';
import { CustomStorageService } from './custom-storage.service';
import { MessageService } from '../message.service';
import { ResourceLockerService } from 'app/resourcelocker.service';
import * as pako from 'pako';

@Injectable({
  providedIn: 'root'
})
export class MqttConnectionService {
  mqttSub: any;
  receivedPacketNumbers = new Set<number>(); // Maintain a Set to keep track of received packet numbers
  windowSize = 5; // Set the size of the sliding window
  lastLiveUpdatedTime: any = Date.now();
  minProcessDeletedPacket: number = 0;
  isSlidingWindowRunning = false;
  slidingWindowFunctions :any=[];
  
  constructor(private _mqttService: MqttService,
    private MqttmessagingService: MqttmessagingService,
    private customStorageService: CustomStorageService,
    private httpTransfer: HttpTransferService,
    private msgService: MessageService,
    public resourceLockerService: ResourceLockerService,


  ) { }

  async createMqttConnection() {
    this.handleMqttKey()
    this.listenToLiveUpdates();
    this._mqttService.state.subscribe(async (s: MqttConnectionState) => {
      const status = s === MqttConnectionState.CONNECTED ? 'CONNECTED' : 'DISCONNECTED';
      console.log(status, this.lastLiveUpdatedTime);
      if (status == 'CONNECTED') {
        try {
          await this.resourceLockerService.acquireLock("liveUpdateQuery")
          await this.getMissingLiveUpdates()
        } catch (err) {
        } finally {
          this.resourceLockerService.releaseLock("liveUpdateQuery")
        }
      }
    });
  }

  handleMqttKey() {
    if (this.customStorageService && this.customStorageService.getItem('id')) {
      this.httpTransfer.generateMqttKey({}).subscribe(res => {
        this.customStorageService.setItem('dashboard_live_update_key_expiry_time', res.result.expiry_time);
        this.mqttSub = this.connectToMqttTopic(res.result.key, res.result.channel).subscribe();
      })
    }
  }

  connectToMqttTopic(key, channel, param?): Observable<IMqttMessage> {
    let topicName;
    if (param) {
      topicName = key + '/' + channel + param;
    } else {
      topicName = key + '/' + channel;
    }
    return this._mqttService.observe(topicName);
  }

  publishPresence(key, channel): Observable<void> {
    const topicName = 'emitter/presence/';
    const data = {
      key: key,
      channel: channel,
      status: true,
      changes: true
    }
    return this._mqttService.publish(topicName, JSON.stringify(data))
  }


  listenToLiveUpdates() {
    this._mqttService.onMessage.subscribe((res: any) => {
      this.decompressData(res['payload']).then(async (data: Uint8Array) => {
        const jsonString = Buffer.from(data).toString('utf8');
        let liveUpdateJson = JSON.parse(jsonString);
        this.lastLiveUpdatedTime = Date.now()//tcalculate the current time
        // Add the received packet number to the set
        if(this.minProcessDeletedPacket<liveUpdateJson?.packet_number){
          this.receivedPacketNumbers.add(liveUpdateJson?.packet_number);
        }
        let dashId = liveUpdateJson?.additional_attributes?.dashboard_id;
        if (dashId) {
          try {
            const resourceIdentifier = "liveUpdateQuery_" + dashId;
            // Acquire the lock
            await this.resourceLockerService.acquireLock(resourceIdentifier, 15);
            // Perform operations with the lock
            console.log("currentTime", dashId, new Date().toLocaleTimeString('en-US', { minute: '2-digit', second: '2-digit' }));
            console.log("Live Update", liveUpdateJson?.packet_number, liveUpdateJson);
            this.MqttmessagingService.sendMqttMessage(jsonString);
            // Delay execution
            await this.delayExecution(10);
              // Log the end time
            console.log("End time:", new Date().toLocaleTimeString('en-US', { minute: '2-digit', second: '2-digit' }));
          } catch (err) {
            // Handle errors
            console.log("Error",err)
          } finally {
            // Release the lock after the specified delay
            this.resourceLockerService.releaseLock("liveUpdateQuery_" + dashId);
          }
        }
        else {
          this.MqttmessagingService.sendMqttMessage(jsonString);
          await this.delayExecution(10)
        }
        console.log("Existing Packet", this.receivedPacketNumbers)
        // Check for missing packets in the sliding window
       // try {
        //  await this.resourceLockerService.acquireLock("liveUpdateQuery")
          //await this.removeContinuousNumbers()
          //await this.processSlidingWindow(jsonString);
        //} catch (err) {
          //  console.log("Error in processing sliding window", err);
        //} finally {
          //this.resourceLockerService.releaseLock("liveUpdateQuery")
        //}
            this.slidingWindowFunctions.push(jsonString)
            await this.triggerhandleProcessSlidingWindow()
      });
    });

    this._mqttService.onConnect.subscribe((res: any) => {
      let exipiry_timme = this.customStorageService.getItem('dashboard_live_update_key_expiry_time');
      if (exipiry_timme && new Date().getTime() > +exipiry_timme) {
        this.msgService.setDashboardRefresh(true);
        this.handleMqttKey();
      }
    });
  }
  
  async triggerhandleProcessSlidingWindow(){
      if(this.isSlidingWindowRunning) return
      this.isSlidingWindowRunning = true;
      await this.handleProcessSlidingWindow();
      this.isSlidingWindowRunning = false;
  }
  
  async handleProcessSlidingWindow(){
      if(this.slidingWindowFunctions.length>0){
          let jsonString = this.slidingWindowFunctions.shift();
          await this.processSlidingWindow(jsonString)
          await this.handleProcessSlidingWindow()
      }
      return
  }

  async processSlidingWindow(jsonString): Promise<void> {
    let liveUpdateJson = JSON.parse(jsonString);
    console.log("callslidingWindow: Current Packet Number"+liveUpdateJson?.packet_number,  new Date().toLocaleTimeString('en-US', { minute: '2-digit', second: '2-digit' }));
    const minExpectedPacketNumber = Math.min(...Array.from(this.receivedPacketNumbers));
    const missingPackets = [];
    for (let i = minExpectedPacketNumber; i <= liveUpdateJson?.packet_number - this.windowSize; i++) {
      if (!this.receivedPacketNumbers.has(i)) {
        missingPackets.push(i);
      }
    }
    console.log("Missing packets:Current Packet Number"+liveUpdateJson?.packet_number, missingPackets);
    // Process missing packets or update your logic accordingly
    if (missingPackets?.length) {
      await this.getMissingLiveUpdates(missingPackets, liveUpdateJson?.packet_number)
    }
    this.removeContinuousNumbers()
    console.log("removingprocessDone: Current Packet Number"+liveUpdateJson?.packet_number,  new Date().toLocaleTimeString('en-US', { minute: '2-digit', second: '2-digit' }));

  }


  async getMissingLiveUpdates(missingPackets?, currentPacketNumber?) {
    let params = {}
    if (missingPackets) {
      params['packet_number'] = missingPackets
    }
    else {
      params["create_date"] = {
        "from": this.lastLiveUpdatedTime,
        "to": Date.now()
      }
    }
    console.log("apiCallStart: Current Packet Number"+currentPacketNumber,  new Date().toLocaleTimeString('en-US', { minute: '2-digit', second: '2-digit' }));
    let res = await this.httpTransfer.getLiveUpdateQuery(params).toPromise()
    if (res.status == 200) {
      for (const m of res.result.data) {
        this.receivedPacketNumbers.add(m?.packet_number);
        console.log("Current Packet Number"+currentPacketNumber+" Adding Packet:"+ m?.packet_number)
        // Ensure that MqttmessagingService is properly implemented
        this.MqttmessagingService.sendMqttMessage(JSON.stringify(m?.live_update_message));
        await this.delayExecution(10)
      }
      console.log("apiCallDone",  new Date().toLocaleTimeString('en-US', { minute: '2-digit', second: '2-digit' }));
    }
  }

  removeContinuousNumbers(): void {
    const sortedNumbers = Array.from(this.receivedPacketNumbers).sort((a, b) => a - b); // Replace this with your array
    let missingNumber = false;
    for (let i = 0; i < sortedNumbers.length - 1; i++) {
      let currentNum = sortedNumbers[i];
      let nextNum = sortedNumbers[i + 1];
      if (currentNum + 1 === nextNum) {
        console.log("Deleted Packet", currentNum)
        this.receivedPacketNumbers.delete(currentNum)
        this.minProcessDeletedPacket = currentNum;
      }
      else {
        missingNumber = true
      }
      if (missingNumber)
        break;
    }
  }

  async decompressData(compressedFileContent){
    return new Promise((resolve,reject)=>{
      const decompressedData = pako.inflate(compressedFileContent, { to: 'string' });
      resolve( decompressedData);
    })
  }

  async delayExecution(timeInMs) {
    const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, timeInMs));
    await delay(timeInMs);
  }
}





