import { KafkaJS } from "@confluentinc/kafka-javascript";
import {
  CAMPAIGN_EVALUATIONS_TOPIC,
  TARGET_BATCH_CREATIONS_TOPIC,
  OVERVIEW_EVALUATIONS_TOPIC,
  KAFKA_PRODUCER_CLIENT_ID,
} from "./constants";

import { generateAuthTokenFromCredentialsProvider } from "aws-msk-iam-sasl-signer-js";

import { Constants, createOrUpdateTeamEvals, KafkaConstants, sleep } from ".";
import { setTargetContactWaiting } from "./setTargetContactWaiting";
import { PrismaTXN } from "@openqlabs/drm-db";
import {
  CommonConstructorConfig,
  type ProducerConstructorConfig,
} from "@confluentinc/kafka-javascript/types/kafkajs";
import { fromNodeProviderChain } from "@aws-sdk/credential-providers";

export class KafkaSingleton {
  private _producer: KafkaJS.Producer;
  private _isConnected = false;
  private _consumerReferences: KafkaJS.Consumer[] = [];
  kafka: KafkaJS.Kafka;

  constructor(
    Kafka: new (config: CommonConstructorConfig) => KafkaJS.Kafka,
    logLevel: 1,
    kafkaBrokerUrls?: string,
    nodeEnv?: string
  ) {
    let remoteKafkaConfig: CommonConstructorConfig = {};
    const localKafkaConfig: CommonConstructorConfig = {
      kafkaJS: {
        clientId: KAFKA_PRODUCER_CLIENT_ID,
        brokers: kafkaBrokerUrls
          ? kafkaBrokerUrls.split(",")
          : ["localhost:9092"],
        ...KafkaConstants.OPENQ_KAFKA_CLIENT_CONFIGURATION,
      },
    };
    if (nodeEnv !== "development") {
      remoteKafkaConfig = {
        kafkaJS: {
          clientId: KAFKA_PRODUCER_CLIENT_ID,
          brokers: kafkaBrokerUrls
            ? kafkaBrokerUrls.split(",")
            : ["localhost:9092"],
          ssl: true,
          sasl: {
            mechanism: "oauthbearer",
            oauthBearerProvider: this.oauthBearerTokenProvider,
          },
          ...KafkaConstants.OPENQ_KAFKA_CLIENT_CONFIGURATION,
        },
      };
    }

    const kafkaConfig =
      nodeEnv === "development" ? localKafkaConfig : remoteKafkaConfig;

    const kafkaEnv = nodeEnv === "development" ? "LOCAL" : "REMOTE";

    console.log(`Booting ${kafkaEnv} Kafka client...`);

    const kafka = new Kafka(kafkaConfig);
    this._producer = kafka.producer({
      kafkaJS: {
        acks: 1,
      },
    } as ProducerConstructorConfig);

    this.kafka = kafka;
  }
  public async connect(): Promise<void> {
    try {
      if (this._isConnected) {
        return;
      }
      await this._producer.connect();
      this._isConnected = true;
    } catch (err) {
      console.error(err);
    }
  }
  public async connectConsumerWithRetries(
    consumer: KafkaJS.Consumer,
    retries: number,
    delay: number
  ): Promise<void> {
    while (retries > 0) {
      try {
        await consumer.connect();
        break; // Exit the loop if connection is successful
      } catch (e) {
        console.error(`Error connecting to evaluations consumer: ${e}`);
        retries--;
        if (retries > 0) {
          console.log(`Retrying connection, waiting ${delay}ms...`);
          await sleep(delay);
        } else {
          console.error(
            `Failed to connect to evaluations consumer after ${3} retries.`
          );
          throw e; // Throw the error if all retries fail
        }
      }
    }
  }

  public async disconnect(): Promise<void> {
    try {
      await this._producer.disconnect();
      this._isConnected = false;
      for (const consumer of this._consumerReferences) {
        await consumer.disconnect();
      }
    } catch (err) {
      console.error(err);
    }
  }

  public get isConnected() {
    return this._isConnected;
  }
  public storeConsumerReference(
    consumer: KafkaJS.Consumer | null | void
  ): void {
    if (consumer) {
      this._consumerReferences.push(consumer);
    }
  }

  public async oauthBearerTokenProvider(): Promise<KafkaJS.OauthbearerProviderResponse> {
    const myRegion = process.env.AWS_MSK_REGION || "us-east-2";
    const authTokenResponse = await generateAuthTokenFromCredentialsProvider({
      region: myRegion,

      awsCredentialsProvider: fromNodeProviderChain({
        durationSeconds: 600,
        maxRetries: 3,
        clientConfig: {
          region: myRegion,
        },
      }),
    });
    console.log("authTokenResponse", authTokenResponse.expiryTime);
    return {
      principal: "admin",
      lifetime: authTokenResponse.expiryTime,
      value: authTokenResponse.token,
    };
  }

  public async publishTargetBatchCreation(
    targetBatchCreationId: string,
    teamAccountId: string,
    campaignId: string
  ): Promise<void> {
    await this.publishMessage(TARGET_BATCH_CREATIONS_TOPIC, {
      targetBatchCreationId,
      teamAccountId,
      campaignId,
    });
  }

  public async publishEvaluation(
    evaluationId: string,
    teamAccountId: string,
    campaignId: string,
    type: string,
    targetContactId: string,
    accountId: string
  ): Promise<void> {
    const topic = Constants.getEvaluationTopic(type);

    if (topic === null) {
      throw new Error(`No topic found for evaluation type: ${type}`);
    }

    await this.publishMessage(topic, {
      evaluationId,
      teamAccountId,
      campaignId,
      type,
      targetContactId,
      accountId,
    });
  }

  public async publishCampaignEvaluation(
    evaluationId: string,
    teamAccountId: string,
    campaignId: string,
    type: string
  ): Promise<void> {
    await this.publishMessage(CAMPAIGN_EVALUATIONS_TOPIC, {
      evaluationId,
      teamAccountId,
      campaignId,
      type,
    });
  }

  public async publishOverviewEvaluation(
    evaluationId: string,
    teamAccountId: string,
    type: string
  ): Promise<void> {
    await this.publishMessage(OVERVIEW_EVALUATIONS_TOPIC, {
      evaluationId,
      teamAccountId,
      type,
    });
  }

  public async publishMessage(topic: string, message: object): Promise<void> {
    try {
      await this.connect();

      await this._producer.send({
        topic,
        messages: [{ value: JSON.stringify(message) }],
      });
    } catch (error) {
      console.error(error);
    }
  }
  public async publishMessageWithSideEffect(
    prisma: PrismaTXN,
    topic: string,
    message: {
      evaluationId: string;
      teamAccountId: string;
      campaignId: string;
      type: string;
      targetContactId: string;
      queued: boolean | null;
    }
  ): Promise<void> {
    if ((message as { evaluationId?: string }).evaluationId) {
      // run sideEffects

      await setTargetContactWaiting(prisma, {
        targetContactId: message.targetContactId,
        evaluationId: message.evaluationId,
        type: message.type,
      });

      await createOrUpdateTeamEvals(prisma, {
        teamAccountId: message.teamAccountId,
        campaignId: message.campaignId,
      });
    }
    if (!message.queued) {
      await this.publishMessage(topic, message)
        .then(async () => {
          console.info(
            `Successfully published evaluation ${message.evaluationId}. Updating status...`
          );
          return await prisma.evaluation.update({
            where: { id: message.evaluationId },
            data: { queued: true },
          });
        })
        .then(() => {
          console.info(
            `Evaluation ${message.evaluationId} status updated to queued.`
          );
        })
        .catch((err) => {
          console.error(
            `Error publishing evaluation ${message.evaluationId} to topic: ${err}`
          );
        });
    }
  }
}
