import crypto from "crypto";
import Chrono from "./utils/Chrono";
import builderFactory from "./builder";
import ReadRepository from "./ReadRepository";
import TransactionalRepository from "./TransactionalRepository";
import EventStoreWithConversionWrapper from "./EventStoreWithConversionWrapper";
import {buildModelDefs, getModelsFor} from "./readModels";
import eventConverterFactory from "./eventConverter";

function addHashes(list) {
  return list.map(item => ({
    ...item,
    hash: calculateReadModelHash(item)
  }));
}

/**
 * Initialize CQRS read side
 * @param {Object} services Services registry
 * @param {Function} storageBootstrap
 * @param {Array} readModels
 * @returns {Promise}
 */
export default async function initRead(services, storageBootstrap, readModels) {
  const { logger, eventStore, config } = services;
  services.readModels = addHashes(readModels);
  services.modelDefs = buildModelDefs(readModels);
  await storageBootstrap(services);
  const { mapper, dbPool } = services;
  const readRepository = new ReadRepository(mapper, dbPool, logger);
  if (config.writeOnly) {
    Object.assign(services, {readRepository});
    return;
  }
  const eventConverter = eventConverterFactory(services);
  services.eventConverter = eventConverter;
  const eventStoreWithConversion = new EventStoreWithConversionWrapper(eventStore, eventConverter);
  services.eventStoreWithConversion = eventStoreWithConversion;
  const lastCheckPointStore = services.checkPointStoreFactory('lastCheckPoint');
  const transactionalRepositoryFactory = (modelName, trx) => new TransactionalRepository(mapper, modelName, new ReadRepository(mapper, trx.connection, logger), trx, logger);
  Object.assign(services, { readRepository, lastCheckPointStore, transactionalRepositoryFactory });
  services.builder = builderFactory(services, eventStoreWithConversion);
  await updateReadModels(services, eventStoreWithConversion);
  await subscribeFromLastCheckPoint(services, eventStoreWithConversion);
}

// This functions handles 3 scenarios:
// - Creating the read models "tables" at first run
// - Updating the read models "tables" if version changed
// - Continue updating read models "tables" if it did not finish (crash)
async function updateReadModels(services, eventStore) {
  const {logger} = services;
  try {
    const { builder, checkPointStoreFactory, dbPool, esStreamReaderFactory, lastCheckPointStore, mapper, readModels } = services;

    const readModelsToUpdate = await getReadModelsToUpdate(dbPool, readModels, mapper);
    if (!readModelsToUpdate.length) return;
    logger.info(`Rebuilding ReadModels: ${readModelsToUpdate.map(x => x.name).join(', ')}...`);

    const lastCheckPoint = eventStore.createPosition(await lastCheckPointStore.get());
    const checkPointStore = checkPointStoreFactory('readModelUpdater');
    const startFrom = eventStore.createPosition(await checkPointStore.get());
    const START = eventStore.createPosition();
    const createTable = startFrom.compareTo(START) === 0;
    const timer = new Chrono();

    if (createTable) {
      logger.info(`Creating tables...`);
      timer.start();
      let count = 0;
      for (const readModelToUpdate of readModelsToUpdate) {
        const models = getModelsFor(readModelToUpdate);
        for (const model of models) {
          await mapper.tryDropModel(dbPool, model);
          await mapper.tryCreateModel(dbPool, model);
          count++;
        }
      }
      logger.info(`${count} tables schemas updated in ${timer.elapsedTime}ms.`);
    }

    if (lastCheckPoint.compareTo(START) > 0) {
      logger.info(`Processing events up to ${lastCheckPoint}...`);
      const allStreamReader = esStreamReaderFactory(eventStore, "$all", startFrom);
      timer.start();
      let count = 0;
      let ev;
      while ((ev = await allStreamReader.readNext())) {
        if (ev.position.compareTo(lastCheckPoint) > 0) break;
        await builder.processEvent(readModelsToUpdate, ev);
        await checkPointStore.put(ev.position);
      }
      timer.stop();
      logger.info(`${count} events processed in ${timer.elapsedTime}ms`,
        (count > 0 && `(avg ${(timer.elapsedTime/count).toFixed(3)} of ms/event).`) || '');
    }

    logger.info(`Updating read models versions...`);
    await checkPointStore.put(START);
    for (const readModelToUpdate of readModelsToUpdate) {
      const models = getModelsFor(readModelToUpdate);
      for (const model of models) {
        await mapper.setModelHash(dbPool, model, readModelToUpdate.hash);
      }
    }

    logger.info(`Done rebuilding read models.`);
  } catch (e) {
    logger.debug(e.stack);
    const error = new Error(`Failed to update read models: ${e.message}`);
    error.inner = e;
    throw error;
  }
}

async function getReadModelsToUpdate(conn, readModels, mapper) {
  const readModelsToUpdate = [];
  for (const k in readModels) {
    const readModel = readModels[k];
    const currentHash = readModel.hash;
    const hash = await mapper.getModelHash(conn, readModel.name);
    if (currentHash !== hash || process.env.LES_FORCE_RM_REBUILD) {
      readModelsToUpdate.push(readModel);
    }
  }
  return readModelsToUpdate;
}

async function subscribeFromLastCheckPoint(services, eventStore) {
  const { builder, subscriberFactory, lastCheckPointStore, readModels } = services;
  const lastCheckPoint = eventStore.createPosition(await lastCheckPointStore.get());
  const updateLastCheckPoint = lastCheckPoint => lastCheckPointStore.put(lastCheckPoint);
  const subscriber = subscriberFactory(eventStore, updateLastCheckPoint);
  subscriber.addHandler(esData => builder.processEvent(readModels, esData));
  Object.assign(services, {subscriber});
  return subscriber.startFrom(lastCheckPoint || null);
}

function calculateReadModelHash(rm) {
  const h = crypto.createHash('sha1');
  h.update(JSON.stringify(rm));
  h.update(rm.handler.toString().replace(/\s/g,''));
  return h.digest('base64');
}
