import {
  AST,
  Expression,
  FluentOrderBy,
  Relation,
  RelationRef,
  TC,
  Ty,
} from '@cotera/era';
import { Sink, Schedule } from '@cotera/api';
import { EventStreamDefinition } from '../manifest/manifest';
import _ from 'lodash';
import { SinkOutputTypeReqs } from './sink-output-types';

const DEFAULT_BATCH_SIZE = 5000;

export type EventStreamThrottle = {
  frequency: AST.DateTruncUnit;
  priorityBy: (t: RelationRef) => FluentOrderBy;
  maxEvents: number;
};

export const EventStream = (props: {
  name: string;
  entityId: Ty.IdType;
  identifier: (t: RelationRef) => string | Expression;
  rel: Relation;
  when?: Schedule;
  onChange: (t: RelationRef) => Expression | string;
  sinks: {
    [sinkName: string]: Sink;
  };
  batchSize?: number;
  debounceByIdForSecs?: number;
  throttle?: EventStreamThrottle;
}): EventStreamDefinition => {
  const rel = Relation.wrap(props.rel);
  const ref = rel.ref('from');

  const sinkRef = new RelationRef(
    {
      attributes: {
        ...rel.attributes,
        __event_cotera_stable_id: Ty.makeNotNullable('string'),
        __event_detected_at: Ty.makeNotNullable('timestamp'),
        __event_previous_value: Ty.shorthandToTy('string'),
        __event_value: Ty.shorthandToTy('string'),
      },
    },
    'from'
  );

  const sinks = _.mapValues(props.sinks, (sink, sinkName) => {
    const select = _.mapValues(sink.select(sinkRef), (expr) =>
      Expression.wrap(expr)
    );

    const reqs = SinkOutputTypeReqs[sink.config.t];

    if (reqs) {
      const res = TC.implementsRel({
        subject: _.mapValues(select, (x) => x.ty),
        reqs,
      });

      if (res.isErr()) {
        const err = res.error
          .toStackTrace({ location: ['sink', sinkName] })
          .toError({
            jsStackPointer: EventStream,
          });

        throw err;
      }
    }

    return {
      manualVerification: sink.manualVerification ?? false,
      config: sink.config,
      condition: Expression.wrap(sink.condition(sinkRef)).ast,
      select: _.mapValues(select, (x) => x.ast),
    };
  });

  return {
    name: props.name,
    identifier: Expression.wrap(props.identifier(ref)).ast,
    rel: rel.ast,
    when: props.when ?? 'daily',
    onChange: Expression.wrap(props.onChange(ref)).ast,
    entityId: props.entityId,
    batchSize: props.batchSize ?? DEFAULT_BATCH_SIZE,
    debounceByIdForSecs: props.debounceByIdForSecs,
    throttle: props.throttle,
    sinks,
  };
};
