import { Constant, From, UnionAll } from '../../builder';
import { intersperse, SqlExprAst, sqlExprMacro } from '../sql-ast';
import { Ty } from '../../ty';
import { AST } from '../../ast';
import { SqlDialect } from './dialect';
import { Assert } from '@cotera/utilities';
import { Expression } from '../../builder';
import { TC } from '../../type-checker';
import { Interpreter } from '../../interpreter';
import { assertConstantString } from './assert-constant-string';

const attr = (ident: string) => `\`${ident}\``;

const PrimitiveAttributeTypeToBigQueryType: Record<
  Ty.PrimitiveAttributeType,
  string
> = {
  string: 'string',
  boolean: 'bool',
  int: 'int',
  float: 'float64',
  day: 'timestamp',
  month: 'timestamp',
  year: 'timestamp',
  timestamp: 'timestamp',
  super: 'json',
};

const PRIMITIVE_JSON_TO_PRIMITIVE_SQL_TYPE: Record<
  Ty.PrimitiveAttributeType,
  (expr: SqlExprAst | AST.ExprIR) => SqlExprAst
> = {
  string: (expr) => sqlExprMacro`LAX_STRING(${expr})`,
  boolean: (expr) => sqlExprMacro`LAX_BOOL(${expr})`,
  int: (expr) => sqlExprMacro`LAX_INT64(${expr})`,
  float: (expr) => sqlExprMacro`LAX_FLOAT64(${expr})`,
  super: (expr) => sqlExprMacro`${expr}`,
  timestamp(expr) {
    return sqlExprMacro`CAST(LAX_STRING(${expr}) as ${PrimitiveAttributeTypeToBigQueryType['timestamp']})`;
  },
  day(expr) {
    return this.timestamp(expr);
  },
  month(expr) {
    return this.timestamp(expr);
  },
  year(expr) {
    return this.timestamp(expr);
  },
};

const typeMapping = (ty: Ty.AttributeType): string => {
  if (ty.k === 'struct') {
    return `STRUCT<${Object.entries(ty.fields)
      .map(([name, { ty }]) => `${name} ${typeMapping(ty)}`)
      .join(', ')}>`;
  }

  if (ty.k === 'array') {
    return `ARRAY<${typeMapping(ty.t.ty)}>`;
  }

  if (ty.k === 'enum') {
    return PrimitiveAttributeTypeToBigQueryType.string;
  }

  if (ty.k === 'record') {
    return PrimitiveAttributeTypeToBigQueryType.super;
  }

  return PrimitiveAttributeTypeToBigQueryType[ty.t];
};

const getPropertyFromRecord = (
  expr: AST.ExprIR,
  name: AST.ExprIR,
  wanted: Ty.AttributeType
) => {
  const isInterpretable = TC.isExprInterpretable(name, {
    allow: { now: false, rand: false, undefaultedVars: false },
  });

  if (isInterpretable) {
    const val = Interpreter.evalExprIR(name);
    if (val === null) {
      return sqlExprMacro`${Constant(null, { ty: wanted }).ir()}`;
    }
    Assert.assert(typeof val === 'string' && /[a-zA-Z0-9 _-]/.test(val));
    const inner = sqlExprMacro`JSON_EXTRACT(TO_JSON(${expr}), "$['${val}']")`;
    return superToSql(inner, wanted, 0);
  } else {
    console.log({ name, isInterpretable });
    throw new Error('TODO: Bigquery dynamic json access');
  }
};

const WAREHOUSE_TYPE_TO_ERA_TYPE_MAP: Record<string, Ty.Shorthand> = {
  INT: 'int',
  INT64: 'int',
  SMALLINT: 'int',
  INTEGER: 'int',
  BIGINT: 'int',
  TINYINT: 'int',
  BYTEINT: 'int',
  FLOAT64: 'float',
  STRING: 'string',
  TEXT: 'string',
  TIMESTAMP: 'timestamp',
  DATE: 'timestamp',
  DATETIME: 'timestamp',
  BOOL: 'boolean',
  BOOLEAN: 'boolean',
};

const tryWarehouseTypeToEraType = (
  warehouseType: string
): Ty.AttributeType | null => {
  const upper = warehouseType.toUpperCase().trim();
  const arrayMatch = /^ARRAY<(?<inner>.*)>$/.exec(upper);
  if (arrayMatch) {
    const inner = tryWarehouseTypeToEraType(arrayMatch[1]!);
    if (inner) {
      return Ty.a(inner);
    }
  }
  const scalarTy = WAREHOUSE_TYPE_TO_ERA_TYPE_MAP[upper];
  return scalarTy !== undefined ? Ty.ty(scalarTy).ty : null;
};

export const BigQueryDialect: SqlDialect = {
  tryWarehouseTypeToEraType,
  nullLiteral: (ty) => sqlExprMacro`cast(null as ${typeMapping(ty)})`,
  supportsFileSources: false,
  quotingCharacter: '`',
  generateSeries: ({ start, stop }) => {
    return sqlExprMacro`SELECT \`n\` FROM UNNEST(GENERATE_ARRAY(${start.toString()}, ${stop.toString()})) AS \`n\``;
  },
  makeRecord(fields) {
    return sqlExprMacro`TO_JSON(${this.makeStruct(fields)})`;
  },
  makeArray: ({ elements }) => {
    return sqlExprMacro`[${intersperse<AST.ExprIR | string>(elements, ', ')}]`;
  },
  makeStruct(fields) {
    return sqlExprMacro`STRUCT(${intersperse(
      Object.entries(fields).map(
        ([name, expr]) => sqlExprMacro`${expr} as ${this.attr(name)}`
      ),
      [', ']
    )})`;
  },
  getPropertyFromStruct(expr, name, _wantedTy) {
    return sqlExprMacro`(${expr}).${this.attr(name)}`;
  },
  relation: function (ref): string {
    if (typeof ref === 'string') {
      return `\`${ref}\``;
    }

    return ref.schema ? `\`${ref.schema}\`.\`${ref.name}\`` : `\`${ref.name}\``;
  },
  functionOverrides: {
    empty_array_of: () => sqlExprMacro`[]`,
    to_the_power_of: ([arg0, arg1]) => sqlExprMacro`pow(${arg0!}, ${arg1!})`,
    split_part: ([arg0, arg1, arg2]) =>
      sqlExprMacro`coalesce(split(${arg0!}, ${arg1!})[SAFE_ORDINAL(${arg2!})], '')`,
    get_from_record: ([arg0, arg1], wanted) =>
      getPropertyFromRecord(arg0!, arg1!, wanted),
    is_numeric_string: ([arg0]) =>
      sqlExprMacro`REGEXP_CONTAINS(${arg0!}, '^[0-9]+$')`,
    gen_random_uuid: () => sqlExprMacro`GENERATE_UUID()`,
    now: () => sqlExprMacro`CURRENT_TIMESTAMP()`,
    random: () => sqlExprMacro`RAND()`,
    array_agg: ([arg0]) => sqlExprMacro`ARRAY_AGG(${arg0!} IGNORE NULLS)`,
    // In 2017, they said they were adding percent_* as an aggregate
    // https://stackoverflow.com/a/45579962
    percentile_cont: new Error(
      'percentile_cont aggregate function is not supported.'
    ),
    percentile_disc: new Error(
      'percentile_disc aggregate function is not supported.'
    ),
    date_diff: ([arg0, arg1, arg2]) => {
      const unit: string = { days: 'DAY', years: 'YEAR', seconds: 'SECOND' }[
        assertConstantString(arg2!, AST.DATE_DIFF_UNITS)
      ];
      switch (unit) {
        case 'SECOND':
          return sqlExprMacro`cast(TIMESTAMP_DIFF(cast((${arg1!}) as timestamp), cast((${arg0!}) as timestamp), ${unit}) as float64)`;
        default:
          return sqlExprMacro`cast(DATE_DIFF(cast((${arg1!}) as date), cast((${arg0!}) as date), ${unit}) as int)`;
      }
    },
    date_add: ([arg0, arg1, arg2]) => {
      const unit: string = { days: 'DAY' }[
        assertConstantString(arg2!, AST.DATE_ADD_UNITS)
      ];
      return sqlExprMacro`DATE_ADD((${arg0!}), INTERVAL cast((${arg1!}) as int) ${unit})`;
    },
    date_trunc: ([arg0, arg1]) => {
      const unit: string = {
        minute: 'MINUTE',
        hour: 'HOUR',
        day: 'DAY',
        week: 'WEEK(MONDAY)',
        month: 'MONTH',
        quarter: 'QUARTER',
        year: 'YEAR',
      }[assertConstantString(arg1!, AST.DATE_TRUNC_UNITS)];

      return sqlExprMacro`DATE_TRUNC(cast((${arg0!}) as timestamp), ${unit})`;
    },
    date_part: ([arg0, arg1]) => {
      const constParam = assertConstantString(arg1!, AST.DATE_PART_UNITS);

      const unit: string = {
        minute: 'MINUTE',
        hour: 'HOUR',
        day: 'DAY',
        dow: 'DAYOFWEEK',
        week: 'WEEK(MONDAY)',
        month: 'MONTH',
        quarter: 'QUARTER',
        year: 'YEAR',
      }[assertConstantString(arg1!, AST.DATE_PART_UNITS)];

      // We subtract 1 from the dow because bigquery uses 1 indexed days of the
      // week 🤦
      return constParam === 'dow'
        ? sqlExprMacro`(EXTRACT(${unit} FROM cast((${arg0!}) as timestamp)) - 1)`
        : sqlExprMacro`EXTRACT(${unit} FROM cast((${arg0!}) as timestamp))`;
    },
    nan: () => sqlExprMacro`cast('NaN' as float64)`,
    is_nan: (arg) => sqlExprMacro`IS_NAN(${arg})`,
    log_2: ([arg0]) => sqlExprMacro`log(${arg0!}, 2)`,
    log_10: ([arg0]) => sqlExprMacro`log(${arg0!}, 10)`,
    cosine_distance: ([arg0, arg1]) => {
      return sqlExprMacro`ML.DISTANCE((${arg0!}), (${arg1!}), 'COSINE')`;
    },
  },
  attr,
  typeMapping,
  placeholder: function ({ oneIndexedArgNum }): string {
    return `@params${oneIndexedArgNum}`;
  },
  informationSchema: {
    // Bigquery uses unique _tripart_ names for it's information schema so, we
    // can fake it by using a "name" of `INFORMATION_SCHEMA.(COLUMNS | TABLES)`
    columns: (schemas: readonly string[]) => {
      return UnionAll(
        schemas.map((schema) =>
          From({
            schema,
            name: 'INFORMATION_SCHEMA.COLUMNS',
            attributes: {
              column_name: 'string',
              table_name: 'string',
              table_schema: 'string',
              data_type: 'string',
              is_nullable: 'string',
            },
          }).select((t) => ({
            ...t.star(),
            is_nullable: t.attr('is_nullable').eq('YES'),
          }))
        )
      ).ir();
    },
    tables: (schemas: readonly string[]) => {
      return UnionAll(
        schemas.map((schema) =>
          From({
            schema,
            name: 'INFORMATION_SCHEMA.TABLES',
            attributes: {
              table_name: 'string',
              table_schema: 'string',
            },
          }).select((t) => t.pick('table_name', 'table_schema'))
        )
      ).ir();
    },
  },
  values({ values, attributes }) {
    if (Object.keys(attributes).length == 0) {
      return sqlExprMacro`select null as \`__unused\` from unnest([]) where false`;
    }

    if (values.length === 0) {
      return sqlExprMacro`select ${Object.entries(attributes)
        .map(
          ([name, { ty }]) => `cast(null as ${typeMapping(ty)}) as \`${name}\``
        )
        .join(', ')} from unnest([])`;
    }

    const structs = values.map(
      (row) =>
        sqlExprMacro`STRUCT(${intersperse(
          Object.entries(attributes).map(([name, { ty }]) => {
            const val = row[name] ?? null;

            const valAst =
              val === null ? (`null` as string) : Constant(val, { ty }).ir();
            return sqlExprMacro`${valAst} as \`${name}\``;
          }),
          [', ']
        )})`
    );

    return sqlExprMacro`select * from unnest([
      ${intersperse(structs, [', '])}
    ])`;
  },

  cast(expr, targetTy) {
    const { ty } = Expression.fromAst(expr);

    if (ty.ty.k === 'primitive' && ty.ty.t === 'super') {
      const superCast = superToSql(sqlExprMacro`${expr}`, targetTy, 0);
      return sqlExprMacro`CASE WHEN ${expr} IS NOT NULL THEN ${superCast} END`;
    }

    if (targetTy.k === 'primitive' && targetTy.t === 'super') {
      return sqlExprMacro`to_json(${expr})`;
    }

    if (targetTy.k === 'primitive' && targetTy.t === 'string') {
      if (ty.ty.k === 'struct') {
        return sqlExprMacro`to_json_string(${expr})`;
      }
    }

    Assert.assert(
      ty.ty.k !== 'struct',
      'We cant do any more struct operations'
    );

    if (ty.ty.t === 'super' && targetTy.k === 'primitive') {
      return PRIMITIVE_JSON_TO_PRIMITIVE_SQL_TYPE[targetTy.t](expr);
    }

    return sqlExprMacro`cast((${expr}) as ${typeMapping(targetTy)})`;
  },
};

const superToSql = (
  source: SqlExprAst,
  targetTy: Ty.AttributeType,
  depth: number
): SqlExprAst => {
  switch (targetTy.k) {
    case 'id':
    case 'primitive':
    case 'range':
      return PRIMITIVE_JSON_TO_PRIMITIVE_SQL_TYPE[targetTy.t](source);
    case 'enum':
      return PRIMITIVE_JSON_TO_PRIMITIVE_SQL_TYPE.string(source);
    case 'record':
      return PRIMITIVE_JSON_TO_PRIMITIVE_SQL_TYPE.super(source);
    case 'array': {
      const x = sqlExprMacro`x${depth.toString()}`;

      return sqlExprMacro`ARRAY(select ${superToSql(
        x,
        targetTy.t.ty,
        depth + 1
      )} from unnest(JSON_QUERY_ARRAY(${source}, '$')) as \`${x}\`)`;
    }
    case 'struct': {
      const fields = Object.entries(targetTy.fields);
      const inner: SqlExprAst = intersperse(
        fields.map(([name, ty]): SqlExprAst => {
          const newSource = sqlExprMacro`${source}.${attr(name)}`;
          return sqlExprMacro`${superToSql(
            newSource,
            ty.ty,
            depth + 1
          )} as ${attr(name)}`;
        }),
        [', ']
      );

      return sqlExprMacro`STRUCT(${inner})`;
    }
    default:
      return Assert.unreachable(targetTy);
  }
};
