import { Relation } from '@cotera/era';
import { Table } from 'apache-arrow';
import * as duckdb from '@duckdb/duckdb-wasm';
import { v4 } from 'uuid';
import { DuckDBQueryResult } from '../duckdb';
import { z } from 'zod';
import { Pool, Options as PoolOptions, createPool } from 'generic-pool';
import { Disposable, using } from '@cotera/utilities';

const typeToExtension: Record<string, string> = {
  'application/json': 'json',
  'text/csv': 'csv',
  'application/octet-stream': 'parquet',
};

const summarizeItem = z.object({
  column_name: z.string(),
  column_type: z.string(),
  min: z.string().nullable(),
  max: z.string().nullable(),
  approx_unique: z.coerce.number().nullable(),
  avg: z.coerce.number().nullable(),
  std: z.coerce.number().nullable(),
});

const summarizeSchema = z.array(summarizeItem);

declare global {
  interface Window {
    DUCKDB_MEMORY_LIMIT_IN_MB: number;
    DUCKDB_MEMORY_RELEASE_THRESHOLD: number;
  }
}

window.DUCKDB_MEMORY_LIMIT_IN_MB = 1024;
window.DUCKDB_MEMORY_RELEASE_THRESHOLD = 0.6;

export class DuckDBPool {
  private pool: Pool<PooledConnection>;

  constructor(
    private readonly internal: Pick<duckdb.AsyncDuckDB, 'connect'>,
    poolOptions: PoolOptions
  ) {
    this.pool = createPool(
      {
        create: async () => {
          return await this.connect();
        },
        destroy: async (connection: PooledConnection) => {
          await connection.connection.close();
        },
        validate: async (connection: PooledConnection) => {
          const dbInfo = await connection.connection.query(
            `PRAGMA database_size;`
          );
          const memUsage = new DuckDBQueryResult(dbInfo)
            .column('memory_usage')
            .valueAt(0);

          let mb: number;
          if (memUsage.includes('GB')) {
            //convert to MB
            const gb = parseFloat(memUsage.replace('GB', ''));
            mb = gb * 1024;
          }
          mb = parseFloat(memUsage.replace('MB', ''));

          // If memory usage is less than 60% of the memory limit, then the connection is still valid
          // else, the connection is invalid and we aquire a new one, to reset the memory usage
          return (
            mb <
            window.DUCKDB_MEMORY_LIMIT_IN_MB *
              window.DUCKDB_MEMORY_RELEASE_THRESHOLD
          );
        },
      },
      poolOptions
    );
  }

  // Connect method to establish a new connection
  private async connect(): Promise<PooledConnection> {
    return await this.internal.connect().then(async (c) => {
      await c.query(`INSTALL json; LOAD json;`);
      await c.query(`
        SET memory_limit = '${window.DUCKDB_MEMORY_LIMIT_IN_MB}MB';
        SET temp_directory='/cotera/db/tmp.tmp';
      `);
      return new PooledConnection(this, c);
    });
  }

  // Get a connection from the pool
  public async aquire(): Promise<PooledConnection> {
    return await this.pool.acquire();
  }

  // Release a connection back to the pool
  public async release(connection: PooledConnection): Promise<void> {
    await this.pool.release(connection);
  }

  // Drains and shuts down the pool
  public async closePool(): Promise<void> {
    await this.pool.drain();
    await this.pool.clear();
  }
}

export class PooledConnection implements Disposable {
  constructor(
    private readonly pool: DuckDBPool,
    private readonly _connection: duckdb.AsyncDuckDBConnection
  ) {}

  async dispose() {
    await this.pool.release(this);
  }

  get connection() {
    return this._connection;
  }
}

declare global {
  interface Window {
    DUCKDB_ADD_ON_QUERY_START_HOOK: (
      cb: (sql: string, params: Record<string, any>) => void
    ) => () => void;
  }
}

let ON_START_QUERY_HOOKS: { hook: (sql: string, params: Record<string, any>) => void; token: string }[] = [];

window.DUCKDB_ADD_ON_QUERY_START_HOOK = (hook) => {
  const token = v4();
  ON_START_QUERY_HOOKS.push({ hook, token });
  return () => {
    ON_START_QUERY_HOOKS = ON_START_QUERY_HOOKS.filter((hook) => hook.token !== token);
  };
};

export class CoteraDuckDB {
  constructor(
    private readonly internal: duckdb.AsyncDuckDB,
    private readonly pool: DuckDBPool
  ) {}

  async rawQuery(sql: string) {
    return await using(await this.pool.aquire(), async ({ connection }) => {
      for (const { hook } of ON_START_QUERY_HOOKS) {
        hook(sql, {});
      }
      const stmt = await connection.prepare(sql);
      const res = await stmt.query();

      return res;
    });
  }

  async query(rel: Relation): Promise<Table> {
    return await using(await this.pool.aquire(), async ({ connection }) => {
      const { sql, params } = rel.duckdbwasmSql();
      for (const { hook } of ON_START_QUERY_HOOKS) {
        hook(sql, params);
      }
      const stmt = await connection.prepare(sql);
      const res = await stmt.query(...params);

      return res;
    });
  }

  async summarize(rel: Relation): Promise<z.infer<typeof summarizeSchema>> {
    return await using(await this.pool.aquire(), async ({ connection }) => {
      const { sql, params } = rel.duckdbwasmSql();
      const summarizeSql = `SUMMARIZE ${sql}`;

      for (const { hook } of ON_START_QUERY_HOOKS) {
        hook(summarizeSql, params);
      }

      const stmt = await connection.prepare(summarizeSql);
      const res = await stmt.query(...params);

      return new DuckDBQueryResult(res).toArrayOf(summarizeItem);
    });
  }

  async registerFileBuffer(name: string, buffer: Uint8Array) {
    try {
      await this.internal.registerFileBuffer(name, buffer);
    } catch (e) {
      if (
        !(
          e instanceof Error &&
          e.message.toLocaleLowerCase().includes('file is already registered')
        )
      ) {
        throw e;
      }
    }
  }

  async loadFile(file: File): Promise<string> {
    const fileName = `${v4()}.${
      typeToExtension[file.type] as keyof typeof typeToExtension
    }`;
    await this.registerFileBuffer(
      fileName,
      new Uint8Array(await file.arrayBuffer())
    );

    return fileName;
  }

  async registerFileURL(name: string, url: string) {
    try {
      await this.internal.registerFileURL(
        name,
        url,
        duckdb.DuckDBDataProtocol.HTTP,
        false
      );
    } catch (e) {
      if (
        !(
          e instanceof Error &&
          e.message.toLocaleLowerCase().includes('file is already registered')
        )
      ) {
        throw e;
      }
    }
  }
}
