import { Kafka, KafkaConfig, Producer, logLevel } from "kafkajs";
import {
  CAMPAIGN_EVALUATIONS_TOPIC,
  TARGET_BATCH_CREATIONS_TOPIC,
  OVERVIEW_EVALUATIONS_TOPIC,
  KAFKA_PRODUCER_CLIENT_ID,
} from "./constants";

import { generateAuthToken } from "aws-msk-iam-sasl-signer-js";
import { Constants, createOrUpdateTeamEvals } from ".";
import { setTargetContactWaiting } from "./setTargetContactWaiting";
import { PrismaTXN } from "@openqlabs/drm-db";

type LogLevelType = typeof logLevel;

export class KafkaSingleton {
  private _producer: Producer;
  private _isConnected = false;
  kafka: Kafka;

  constructor(
    Kafka: new (config: KafkaConfig) => Kafka,
    logLevel: LogLevelType,
    kafkaBrokerUrls?: string,
    nodeEnv?: string
  ) {
    const localKafkaConfig: KafkaConfig = {
      clientId: KAFKA_PRODUCER_CLIENT_ID,
      brokers: kafkaBrokerUrls
        ? kafkaBrokerUrls.split(",")
        : ["localhost:9092"],
    };

    const remoteKafkaConfig: KafkaConfig = {
      clientId: KAFKA_PRODUCER_CLIENT_ID,
      brokers: kafkaBrokerUrls
        ? kafkaBrokerUrls.split(",")
        : ["localhost:9092"],
      ssl: true,
      sasl: {
        mechanism: "oauthbearer",
        oauthBearerProvider: () =>
          this.oauthBearerTokenProvider(
            process.env.AWS_MSK_REGION || "us-east-2"
          ),
      },
      ...Constants.OPENQ_KAFKA_CLIENT_CONFIGURATION,
      logLevel: logLevel.INFO,
    };

    const kafkaConfig =
      nodeEnv === "development" ? localKafkaConfig : remoteKafkaConfig;

    const kafkaEnv = nodeEnv === "development" ? "LOCAL" : "REMOTE";

    console.log(`Booting ${kafkaEnv} Kafka client...`);

    const kafka = new Kafka(kafkaConfig);
    this._producer = kafka.producer();
    this.kafka = kafka;
  }

  async connect(): Promise<void> {
    try {
      await this._producer.connect();
      this._isConnected = true;
    } catch (err) {
      console.error(err);
    }
  }

  public async disconnect(): Promise<void> {
    try {
      await this._producer.disconnect();
      this._isConnected = false;
    } catch (err) {
      console.error(err);
    }
  }

  public get isConnected() {
    return this._isConnected;
  }

  private async oauthBearerTokenProvider(region: string) {
    const authTokenResponse = await generateAuthToken({
      region,
    });

    return {
      value: authTokenResponse.token,
    };
  }

  public async publishTargetBatchCreation(
    targetBatchCreationId: string,
    teamAccountId: string,
    campaignId: string
  ): Promise<void> {
    await this.publishMessage(TARGET_BATCH_CREATIONS_TOPIC, {
      targetBatchCreationId,
      teamAccountId,
      campaignId,
    });
  }

  public async publishEvaluation(
    evaluationId: string,
    teamAccountId: string,
    campaignId: string,
    type: string,
    targetContactId: string,
    accountId: string
  ): Promise<void> {
    const topic = Constants.getEvaluationTopic(type);

    if (topic === null) {
      throw new Error(`No topic found for evaluation type: ${type}`);
    }

    await this.publishMessage(topic, {
      evaluationId,
      teamAccountId,
      campaignId,
      type,
      targetContactId,
      accountId,
    });
  }

  public async publishCampaignEvaluation(
    evaluationId: string,
    teamAccountId: string,
    campaignId: string,
    type: string
  ): Promise<void> {
    await this.publishMessage(CAMPAIGN_EVALUATIONS_TOPIC, {
      evaluationId,
      teamAccountId,
      campaignId,
      type,
    });
  }

  public async publishOverviewEvaluation(
    evaluationId: string,
    teamAccountId: string,
    type: string
  ): Promise<void> {
    await this.publishMessage(OVERVIEW_EVALUATIONS_TOPIC, {
      evaluationId,
      teamAccountId,
      type,
    });
  }

  public async publishMessage(topic: string, message: object): Promise<void> {
    try {
      if (!this.isConnected) {
        await this._producer.connect();
      }

      await this._producer.send({
        topic,
        messages: [{ value: JSON.stringify(message) }],
      });
    } catch (error) {
      console.error(error);
    }
  }
  public async publishMessageWithSideEffect(
    prisma: PrismaTXN,
    topic: string,
    message: {
      evaluationId: string;
      teamAccountId: string;
      campaignId: string;
      type: string;
      targetContactId: string;
      queued: boolean | null;
    }
  ): Promise<void> {
    if ((message as { evaluationId?: string }).evaluationId) {
      // run sideEffects

      await setTargetContactWaiting(prisma, {
        targetContactId: message.targetContactId,
        evaluationId: message.evaluationId,
        type: message.type,
      });

      await createOrUpdateTeamEvals(prisma, {
        teamAccountId: message.teamAccountId,
        campaignId: message.campaignId,
      });
    }
    if (!message.queued) {
      await this.publishMessage(topic, message)
        .then(async () => {
          console.info(
            `Successfully published evaluation ${message.evaluationId}. Updating status...`
          );
          return await prisma.evaluation.update({
            where: { id: message.evaluationId },
            data: { queued: true },
          });
        })
        .then(() => {
          console.info(
            `Evaluation ${message.evaluationId} status updated to queued.`
          );
        })
        .catch((err) => {
          console.error(
            `Error publishing evaluation ${message.evaluationId} to topic: ${err}`
          );
        });
    }
  }
}
