import * as messages from './Messages'
import * as rx from 'rxjs'
import * as rxops from 'rxjs/operators'
import { CompressedRTCStatsReport, EnableRTCStatsReporting, KaldunIncomingMessage, KaldunOutgoingMessage } from '../generated/submodules/client-messages/protos/websocket'
import { ReactiveWebSocket, ReactiveWebSocketEvents } from './ReactiveWebSocket'
import TaskCompletionSource from './TaskCompletionSource'

type State = {
  signalingState: SignalingState.New 
                | SignalingState.ClientErrorReceived
                | SignalingState.ServerErrorReceived
                | SignalingState.MaintenanceReceived
                | SignalingState.Interrupted
                | SignalingState.Closed
} | {
  signalingState: SignalingState.Connected
  send: (msg: string | ArrayBufferLike) => void
  close: (code?: number, description?: string) => void
}

export enum SignalingState {
  New = "new",
  Connected = "connected",
  Interrupted = "interrupted",
  MaintenanceReceived = "maintenance",
  ClientErrorReceived = "client_error",
  ServerErrorReceived = "server_error",
  Closed = "closed",
}

class RoomSignaling {
  private sequence: number;

  private readonly sockets = new rx.Subject<rx.Observable<ReactiveWebSocketEvents>>()
  private readonly state = new rx.BehaviorSubject<State>({ signalingState: SignalingState.New });

  private readonly requests = new Map<number, { 
    join_room_v2: messages.JoinRoomV2Request, 
    tcs: TaskCompletionSource<{ join_room_v2: messages.JoinRoomV2Response} | { error: messages.ErrorResponse }>
  } | {
    restart_ice: messages.RestartIceRequest,
    tcs: TaskCompletionSource<{ restart_ice: messages.RestartIceResponse } | { error: messages.ErrorResponse }>
  } | {
    authenticate: messages.AuthenticateRequest,
    tcs: TaskCompletionSource<{ authenticate: messages.AuthenticateResponse } | { error: messages.ErrorResponse }>
  } | {
    leave_room: messages.LeaveRoomRequest,
    tcs: TaskCompletionSource<{ leave_room: messages.LeaveRoomResponse } | { error: messages.ErrorResponse }>
  } | {
    update_media: messages.UpdateMediaRequest,
    tcs: TaskCompletionSource<{ update_media: messages.UpdateMediaResponse } | { error: messages.ErrorResponse }>
  }>();

  private readonly roomUpdatedNotificationSubj = new rx.Subject<{seq: number, room_updated: messages.RoomUpdatedNotification}>()
  private readonly maintenanceNotificationSubj = new rx.Subject<{seq: number, maintenance: messages.MaintenanceNotification}>()
  private readonly enableRtcReportingNotificationSubj = new rx.Subject<{seq: number, notification: EnableRTCStatsReporting}>();

  private get sender(): Promise<(msg: string | ArrayBufferLike) => void> {
    return rx.firstValueFrom(this.state.pipe(
      rxops.flatMap(x => {
        if (x.signalingState === SignalingState.Connected) {
          return rx.of(x.send)
        }
        return rx.EMPTY
      })
    ))
  }

  constructor(baseSequence: number) {
    this.sequence = baseSequence;
    this.sockets.pipe(
      rxops.switchAll(),
    ).subscribe(event => {
      switch (event.type) {
        case "closed": {
          this.onWebsocketClosed(event.code, event.reason);
          switch (this.signalingState) {
            case SignalingState.ClientErrorReceived:
            case SignalingState.ServerErrorReceived:
            case SignalingState.MaintenanceReceived:
              break;
            case SignalingState.Connected:
              this.state.next({ signalingState: SignalingState.Interrupted })
              break;
            case SignalingState.New:
            case SignalingState.Interrupted:
              console.error("Received WebSocket closed event in unexpected signaling state ", this.signalingState)
              break;
            case SignalingState.Closed:
              break;
          }
          break;
        }
        case "error": {
          this.onWebSocketError()
          this.state.next({ signalingState: SignalingState.Interrupted })
          break;
        }
        case "opened": {
          this.state.next({ signalingState: SignalingState.Connected, send: event.send, close: event.close })
          break;
        }
        case "message": {
          this.onWebsocketMessage(event.data)
          break;
        }
      }
    })
  }

  get signalingState(): SignalingState {
    return this.state.value.signalingState;
  }

  get signalingStateChanged(): rx.Observable<SignalingState> {
    return this.state.pipe(
      rxops.map(x => x.signalingState)
    );
  }

  get roomUpdatedNotification(): rx.Observable<{seq: number, room_updated: messages.RoomUpdatedNotification }> {
    return this.roomUpdatedNotificationSubj.asObservable()
  }

  get maintenanceNotification(): rx.Observable<{seq: number, maintenance: messages.MaintenanceNotification }> {
    return this.maintenanceNotificationSubj.asObservable()
  }

  get enableRtcReporting(): rx.Observable<{seq: number, notification: EnableRTCStatsReporting}> {
    return this.enableRtcReportingNotificationSubj.asObservable();
  }

  public connect(url: string) {
    if (this.signalingState === SignalingState.Connected) {
      console.error("WebSocket is already connected: ", this.state);
      return;
    }
    if (this.signalingState === SignalingState.Closed) {
      console.error("Signaling is permanently closed");
      throw new Error("Signaling was permanently closed");
    }
    this.sockets.next(ReactiveWebSocket(url))
  }

  private onWebsocketClosed(code: number, reason: string) {
    console.log(`WebSocket closed with code ${code} reason ${reason}`);
    for (const req of this.requests.values()) {
      req.tcs.trySetException(new Error(`WebSocket closed with code: ${code} reason: ${reason}`));
    }
    this.requests.clear();
  }

  private onWebSocketError() {
    console.log(`WebSocket error happen`);
    for (const req of this.requests.values()) {
      req.tcs.trySetException(new Error(`WebSocket error`));
    }
    this.requests.clear();
  }

  private async send(msg: string) {
    const send = await this.sender;
    send(msg);
  }

  private async send_binary(msg: KaldunIncomingMessage) {
    const send = await this.sender;
    send(msg.serializeBinary());
  }

  private async onWebsocketMessage(msg: string | ArrayBufferLike | ArrayBufferView | Blob) {
    console.log("signaling message from server is ", msg)

    let resp: messages.ResponseOrNotification | null = null;

    if (typeof(msg) === "string") {
      resp = JSON.parse(msg);
    } else {
      let buffer: ArrayBufferLike
      if (msg instanceof Blob) {
        buffer = await msg.arrayBuffer();
      } else if (msg instanceof ArrayBuffer) {
        buffer = msg
      } else {
        console.log("Error unsupported message: ", msg);
        return;
      }

      let binary_msg = KaldunOutgoingMessage.deserializeBinary(new Uint8Array(buffer));

      let notification = binary_msg.has_enable_rtc_reporting ? binary_msg.enable_rtc_reporting : null;
      if (notification) {
        this.enableRtcReportingNotificationSubj.next({
          seq: binary_msg.seq,
          notification: notification
        });
      }

      return;
    }
    if (resp && 'seq' in resp) {
      if ('join_room_v2' in resp) {
        const req = this.requests.get(resp.seq);
        if (req) {
          if ('join_room_v2' in req) {
            req.tcs.trySetResult(resp)
          } else {
            console.error(`Response type with seq ${resp.seq} does not match request type `, req);
          }
          this.requests.delete(resp.seq)
        } else {
          console.log(`No request with seq ${resp.seq} found`)
        }
      } else if ('authenticate' in resp) {
        const req = this.requests.get(resp.seq);
        if (req) {
          if ('authenticate' in req) {
            req.tcs.trySetResult(resp)
          } else {
            console.error(`Response type with seq ${resp.seq} does not match request type `, req);
          }
          this.requests.delete(resp.seq)
        } else {
          console.log(`No request with seq ${resp.seq} found`)
        }
      } else if ('restart_ice' in resp) {
        const req = this.requests.get(resp.seq);
        if (req) {
          if ('restart_ice' in req) {
            req.tcs.trySetResult(resp)
          } else {
            console.error(`Response type with seq ${resp.seq} does not match request type `, req);
          }
          this.requests.delete(resp.seq)
        } else {
          console.log(`No request with seq ${resp.seq} found`)
        }
      } else if ('leave_room' in resp) {
        const req = this.requests.get(resp.seq);
        if (req) {
          if ('leave_room' in req) {
            req.tcs.trySetResult(resp)
          } else {
            console.error(`Response type with seq ${resp.seq} does not match request type `, req);
          }
          this.requests.delete(resp.seq)
        } else {
          console.log(`No request with seq ${resp.seq} found`)
        }
      } else if ('update_media' in resp) {
        const req = this.requests.get(resp.seq);
        if (req) {
          if ('update_media' in req) {
            req.tcs.trySetResult(resp)
          } else {
            console.error(`Response type with seq ${resp.seq} does not match request type `, req);
          }
          this.requests.delete(resp.seq)
        } else {
          console.log(`No request with seq ${resp.seq} found`)
        }
      } else if ('error' in resp) {
        const req = this.requests.get(resp.seq);
        if (req) {
          req.tcs.trySetResult(resp)
          this.requests.delete(resp.seq)
        } else {
          console.log(`No request with seq ${resp.seq} found`)
        }
      } else if ('maintenance' in resp) {
        this.state.next({signalingState: SignalingState.MaintenanceReceived})
        this.maintenanceNotificationSubj.next(resp)
      } else if ('room_updated' in resp) {
        this.roomUpdatedNotificationSubj.next(resp)
      } else {
        console.error("Bad server message: ", resp);
      }
    } else {
      console.error("Bad server message: ", resp);
    }
  }

  public async send_report(report: CompressedRTCStatsReport) {
    const sequence = ++this.sequence;

    let message = new KaldunIncomingMessage({
      seq: sequence,
      rtc_report: report
    });
    await this.send_binary(message);
  }

  public async join({
    ticket,
    region,
    offer,
    enableTcp,
    allowAutomaticSimulcast,
    videoProducerBandwidthLimitKbps,
  } : { 
    ticket?: string,
    region?: string,
    offer: string,
    enableTcp?: boolean,
    allowAutomaticSimulcast?: boolean,
    videoProducerBandwidthLimitKbps?: number
  }): Promise<{ join_room_v2: messages.JoinRoomV2Response} | { error: messages.ErrorResponse }> {
    if (this.signalingState === SignalingState.Closed) {
      throw new Error("Signaling closed");
    }

    const sequence = ++this.sequence;
    const joinRoom: messages.JoinRoomV2Request = {
      ticket: ticket,
      region: region,
      sdp_offer: offer,
      debug_configuration: {
        allocate_tcp_candidates: enableTcp,
        allow_automatic_simulcast: allowAutomaticSimulcast,
        video_producer_bandwidth_limit_kbps: videoProducerBandwidthLimitKbps,
      }
    }

    const joinRequest: messages.Request = {
      seq: sequence,
      join_room_v2: joinRoom
    }

    const completableRequest = {
      join_room_v2: joinRoom,
      tcs: new TaskCompletionSource<{ join_room_v2: messages.JoinRoomV2Response} | { error: messages.ErrorResponse }>()
    };
    this.requests.set(sequence, completableRequest)
    console.log("join_request_v2: ", joinRequest);

    this.send(JSON.stringify(joinRequest));
    const result = await completableRequest.tcs.task;
    console.log("join_response_v2: ", result);
    return result;
  }

  public async restartIce(): Promise<{ restart_ice: messages.RestartIceResponse } | { error: messages.ErrorResponse }> {
    if (this.signalingState === SignalingState.Closed) {
      throw new Error("Signaling closed");
    }
    const sequence = ++this.sequence;
    const restartIce: messages.RestartIceRequest = {
    };
    const restartIceRequest: messages.Request = {
      seq: sequence,
      restart_ice: restartIce
    };
    const completableRequest = {
      restart_ice: restartIce,
      tcs: new TaskCompletionSource<{ restart_ice: messages.RestartIceResponse } | { error: messages.ErrorResponse }>()
    };
    this.requests.set(sequence, completableRequest)

    console.log("restart_ice_request: ", restartIceRequest);
    this.send(JSON.stringify(restartIceRequest));

    const result = await completableRequest.tcs.task;
    console.log("restart_ice_response: ", result);
    return result;
  }

  public async authenticate(ticket: string): Promise<{ authenticate: messages.AuthenticateResponse } | { error: messages.ErrorResponse }> {
    if (this.signalingState === SignalingState.Closed) {
      throw new Error("Signaling closed");
    }
    const sequence = ++this.sequence;

    const authenticate: messages.AuthenticateRequest = {
      ticket: ticket,
    };
    const authenticateRequest: messages.Request = {
      seq: sequence,
      authenticate: authenticate
    };
    const completableRequest = {
      authenticate: authenticate,
      tcs: new TaskCompletionSource<{ authenticate: messages.AuthenticateResponse } | { error: messages.ErrorResponse }>()
    };
    this.requests.set(sequence, completableRequest);

    console.log("authenticate_request: ", authenticateRequest);
    this.send(JSON.stringify(authenticateRequest));

    let result = await completableRequest.tcs.task;
    console.log("authenticate_response: ", result);
    return result
  }

  public async leave(): Promise<{ leave_room: messages.LeaveRoomResponse } | { error: messages.ErrorResponse }> {
    if (this.signalingState === SignalingState.Closed) {
      throw new Error("Signaling closed");
    }
    const sequence = ++this.sequence;
    const leave: messages.LeaveRoomRequest = {
    };
    const leaveRequest: messages.Request = {
      seq: sequence,
      leave_room: leave
    };
    const completableRequest = {
      leave_room: leave,
      tcs: new TaskCompletionSource<{ leave_room: messages.LeaveRoomResponse } | { error: messages.ErrorResponse }>()
    };
    this.requests.set(sequence, completableRequest);

    console.log("leave_request: ", leaveRequest);
    this.send(JSON.stringify(leaveRequest));

    const result = await completableRequest.tcs.task;
    console.log("leave_response: ", result);
    return result;
  }

  public async updateMedia(arg: { produced_tracks?: Array<messages.ProducedTrackInfo>, consumed_tracks?: Array<messages.PreferredConsumedTrackInfo> }): Promise<{ update_media: messages.UpdateMediaResponse } | { error: messages.ErrorResponse }> {
    if (this.signalingState === SignalingState.Closed) {
      throw new Error("Signaling closed");
    }
    const sequence = ++this.sequence;
    const updateMedia: messages.UpdateMediaRequest = {
      produced_tracks: arg.produced_tracks,
      consumed_tracks: arg.consumed_tracks,
    };
    const updateMediaRequest: messages.Request = {
      seq: sequence,
      update_media: updateMedia
    };

    const completableRequest = {
      update_media: updateMedia,
      tcs: new TaskCompletionSource<{ update_media: messages.UpdateMediaResponse } | { error: messages.ErrorResponse }>()
    };
    this.requests.set(sequence, completableRequest);

    console.log("update_media_request: ", updateMediaRequest);
    this.send(JSON.stringify(updateMediaRequest));

    const result = await completableRequest.tcs.task;
    console.log("update_media_response: ", result);
    return result;
  }

  public close() {
    if (this.state.value.signalingState === SignalingState.Connected) {
      this.state.value.close();
    }
    this.state.next({signalingState: SignalingState.Closed});
    this.state.complete();
    this.roomUpdatedNotificationSubj.complete();
    this.maintenanceNotificationSubj.complete();
  }
}

export default RoomSignaling
