import Long from "long";
import Transaction from "./Transaction";

/**
 * @param {!Buffer} buf
 * @returns {object|null}
 */
function safeParseData(buf) {
  try {
    return JSON.parse(buf.toString());
  } catch (e) {
    return null;
  }
}

/**
 * @param {!string} esEventType
 * @param {string} prefix
 * @returns {string}
 */
function getLocalEventType(esEventType, prefix) {
  if (!prefix) {
    return esEventType;
  }
  if (esEventType.indexOf(prefix) === 0) {
    return esEventType.substr(prefix.length);
  }
  return esEventType;
}

/**
 * @class BuilderEventData
 * @property {string} streamId
 * @property {string} eventId
 * @property {number} eventNumber
 * @property {string} typeId
 * @property {object} event
 * @property {object} metadata
 * @property {number} creationTime
 * @property {EventStorePosition} position
 */
class BuilderEventData {
  /**
   * @param {string} streamId
   * @param {string} eventId
   * @param {number} eventNumber
   * @param {string} typeId
   * @param {object} event
   * @param {object} metadata
   * @param {number} creationTime
   * @param {EventStorePosition} position
   */
  constructor(streamId, eventId, eventNumber, typeId, event, metadata, creationTime, position) {
    this.streamId = streamId;
    this.eventId = eventId;
    this.eventNumber = eventNumber;
    this.typeId = typeId;
    this.event = event;
    this.metadata = metadata;
    this.creationTime = creationTime;
    this.position = position;
  }
}

/**
 * @param {!EventStoreData} esData
 * @returns {BuilderEventData}
 */
export function toEventData(esData) {
  const {
    position,
    eventId,
    createdEpoch,
    eventType,
    data,
    metadata,
    streamId,
    eventNumber
  } = esData;
  return new BuilderEventData(
    streamId,
    eventId,
    eventNumber,
    eventType,
    data,
    metadata,
    createdEpoch,
    position
  );
}

async function processEventForReadModel(esData, prefix, dbPool, transactionalRepositoryFactory, logger, readModel) {
  let conn, phase = 'setup';
  try {
    const eventData = toEventData(esData);
    eventData.typeId = getLocalEventType(eventData.typeId, prefix);
    conn = await dbPool.getConnection();
    const trx = new Transaction(conn);
    const repository = transactionalRepositoryFactory(readModel.name, trx);
    const lookups = {};
    for (const k in readModel.lookups) {
      lookups[k] = transactionalRepositoryFactory(`${readModel.name}_${k}_lookup`, trx);
    }
    phase = 'handler';
    await readModel.handler(repository, eventData, lookups);
    phase = 'commit';
    await trx.commit();
  } catch (err) {
    logger.error(`readModel handler failed in ${phase} (readModel=${readModel.name}`,
      `eventType=${esData.eventType}`,
      `logPos=${esData.eventNumber}@${esData.streamId}\n`,
      err.stack);
    //if (trx) await trx.rollback();
  } finally {
    if (conn) await dbPool.release(conn);
  }
}

/**
 * Process a ResolvedEvent for a set of readModels
 * @param {object} dbPool
 * @param {ReadRepository} readRepository
 * @param {TransactionalRepositoryFactory} transactionalRepositoryFactory
 * @param {string} prefix
 * @param {ConsoleLogger} logger
 * @param {ReadModel[]} readModels
 * @param {EventStoreData} esData
 * @returns {Promise<void>}
 */
async function processEvent(dbPool, readRepository, transactionalRepositoryFactory, prefix, logger, readModels, esData) {
  if (esData.streamId[0] === '$') {
    return;
  }
  logger.debug("Processing event", esData.eventType, '@', esData.position.toString(), "...");
  const promises = [];
  for (const readModel of readModels) {
    promises.push(processEventForReadModel(esData, prefix, dbPool, transactionalRepositoryFactory, logger, readModel));
  }
  await Promise.all(promises);
}

/**
 * Rebuild a set of readModels from $all
 * @async
 * @param {object} dbPool
 * @param {EventStore} eventStore
 * @param {ReadRepository} readRepository
 * @param {TransactionalRepositoryFactory} repositoryFactory
 * @param {!string} prefix
 * @param {ConsoleLogger} logger
 * @param {ReadModel[]} readModels
 * @param {EventStorePosition} startPosition
 * @returns {Promise<EventStorePosition>}
 */
async function rebuildFromAllStream(dbPool, eventStore, readRepository, repositoryFactory, prefix, logger, readModels, startPosition) {
  const batchSize = 250;
  let position = startPosition || null, readResult;
  do {
    readResult = await eventStore.readAllBatch(position, batchSize);
    position = readResult.nextPosition;
    for (const esData of readResult.events) {
      await processEvent(dbPool, readRepository, repositoryFactory, prefix, logger, readModels, esData);
    }
  } while (!readResult.isEndOfStream);
  return position;
}

/**
 * @callback TransactionalRepositoryFactory
 * @param {string} modelName
 * @param {Transaction} Transaction
 * @return {TransactionalRepository}
 */

/**
 * Rebuild a set of readModels from $all
 * @async
 * @param {object} dbPool
 * @param {EventStore} eventStore
 * @param {ReadRepository} readRepository
 * @param {TransactionalRepositoryFactory} repositoryFactory
 * @param {!string} prefix
 * @param {ConsoleLogger} logger
 * @param {!string} streamName
 * @param {ReadModel[]} readModels
 * @param {Long} fromEventNumber
 * @returns {Promise<Long>}
 */
async function rebuildFromStream(dbPool, eventStore, readRepository, repositoryFactory, prefix, logger, streamName, readModels, fromEventNumber) {
  const batchSize = 250;
  let eventNumber = Long.fromValue(fromEventNumber || 0), readResult;
  do {
    readResult = await eventStore.readBatch(streamName, eventNumber, batchSize);
    eventNumber = readResult.nextEventNumber;
    for (const esData of readResult.sync) {
      await processEvent(dbPool, readRepository, repositoryFactory, prefix, logger, readModels, esData);
    }
  } while (!readResult.isEndOfStream);
  return eventNumber;
}

/**
 * @class
 */
class Builder {
  constructor(dbPool, eventStore, readRepository, transactionalRepositoryFactory, prefix, logger) {
    this._dbPool = dbPool;
    this._eventStore = eventStore;
    this._readRepository = readRepository;
    this._transactionalRepositoryFactory = transactionalRepositoryFactory;
    this._prefix = prefix;
    this._logger = logger;
  }

  /**
   * @param {ReadModel[]} readModels
   * @param {EventStoreData} esData
   * @return {Promise<void>}
   */
  processEvent(readModels, esData) {
    return processEvent(this._dbPool, this._readRepository, this._transactionalRepositoryFactory, this._prefix, this._logger, readModels, esData);
  }

  /**
   * @param {string} streamName
   * @param {ReadModel[]} readModels
   * @param {number} fromEventNumber
   * @return {Promise<Long>} nextEventNumber
   */
  rebuildFromStream(streamName, readModels, fromEventNumber) {
    return rebuildFromStream(this._dbPool, this._eventStore, this._readRepository, this._transactionalRepositoryFactory, this._prefix, this._logger, streamName, readModels, fromEventNumber);
  }

  /**
   * @param {ReadModel[]} readModels
   * @param {EventStorePosition} fromPosition
   * @return {Promise<EventStorePosition>} nextPosition
   */
  rebuildFromAllStream(readModels, fromPosition) {
    return rebuildFromAllStream(this._dbPool, this._eventStore, this._readRepository, this._transactionalRepositoryFactory, this._prefix, this._logger, readModels, fromPosition);
  }

  /**
   * @param {EventStoreData} esData
   * @return {BuilderEventData}
   */
  toEventData(esData) {
    return toEventData(esData);
  }

  /**
   * @param {string} esEventType
   * @return {string}
   */
  getLocalEventType(esEventType) {
    return getLocalEventType(esEventType, this._prefix);
  }
}

/**
 * Builder Factory
 * @param {object} services
 * @param {EventStore} eventStore
 * @returns {Builder}
 */
export function factory(services, eventStore) {
  const {dbPool, readRepository, transactionalRepositoryFactory, config: {eventStore: {namespace}}, logger} = services;
  const prefix = namespace ? `${namespace}.` : '';
  return new Builder(dbPool, eventStore, readRepository, transactionalRepositoryFactory, prefix, logger);
}

export default factory;
