StreamEvents.js

import Logger from './Logger'
import EventSubscriber from './utils/EventSubscriber'
import config from './config'

const USER_COUNT_TARGET = 'SubscribeViewerCount'
const USER_COUNT_TARGET_RESPONSE = 'SubscribeViewerCountResponse'

const logger = Logger.get('StreamEvents')
const messageType = { REQUEST: 1, RESPONSE: 3 }
let invocationId = 0

export const defaultEventsLocation = config.WOWZA_EVENTS_LOCATION
let eventsLocation = defaultEventsLocation

const errorMsg = 'You need to initialize stream event with StreamEvents.init()'

/**
 * @typedef {Object} OnUserCountOptions
 * @property {String} accountId - Wowza Account Id.
 * @property {String} streamName - Wowza Stream Name.
 * @property {onUserCountCallback} callback - Callback function executed when a new message is available.
 */

/**
 * Callback invoke when new user count is received.
 *
 * @callback onUserCountCallback
 * @param {Object} data
 * @param {String} data.streamId - Stream identifier with the following format `accountId/streamName`.
 * @param {Number} data.count    - Current amount of viewers of the stream.
 * @param {String} [data.error]  - Error message.
 */

/**
 * @class StreamEvents
 * @classdesc Lets you to subscribe to stream events like receive the amount of viewers of a stream.
 *
 * This events are handled via a WebSocket with Wowza server.
 * @hideconstructor
 */
export default class StreamEvents {
  constructor () {
    this.eventSubscriber = null
  }

  /**
   * Initializes the connection with Wowza Stream Event.
   * @returns {Promise<StreamEvents>} Promise object which represents the StreamEvents instance
   * once the connection with the Wowza stream events is done.
   * @example const streamEvents = await StreamEvents.init()
   */
  static async init () {
    const instance = new StreamEvents()
    instance.eventSubscriber = new EventSubscriber(this.getEventsLocation())
    await instance.eventSubscriber.initializeHandshake()

    return instance
  }

  /**
   * Set Websocket Stream Events location.
   *
   * @param {String} url - New Stream Events location
  */
  static setEventsLocation (url) {
    eventsLocation = url
  }

  /**
   * Get current Websocket Stream Events location.
   *
   * By default, wss://streamevents.wowza.com/ws is the location.
   * @returns {String} Stream Events location
  */
  static getEventsLocation () {
    return eventsLocation
  }

  /**
   * Subscribes to User Count event and invokes the callback once a new message is available.
   * @param {OnUserCountOptions | String} options - Wowza options or *Deprecated Wowza Account Id.*
   * @param {String} [streamName] - *Deprecated, use options parameter instead* Wowza Stream Name.
   * @param {onUserCountCallback} [callback] - *Deprecated, use options parameter instead* Callback function executed when a new message is available.
   * @example
   * import StreamEvents from '@wowzamediasystems/sdk-rts'
   *
   * //Create a new instance
   * const streamEvents = await StreamEvents.init()
   * const accountId = "Publisher account ID"
   * const streamName = "Stream Name"
   * const options = {
   *    accountId,
   *    streamName,
   *    callback: (data) => {
   *      if (data.error) {
   *        console.error("Handle error: ", error)
   *      } else {
   *        console.log("Viewers: ", data.count)
   *      }
   *    }
   *  }
   *
   * //Initializes the user count event
   * streamEvents.onUserCount(options)
   */
  onUserCount (options, streamName = null, callback = null) {
    if (!this.eventSubscriber) {
      logger.error(errorMsg)
      throw new Error(errorMsg)
    }
    const optionsParsed = getOnUserCountOptions(options, streamName, callback)
    logger.info(`Starting user count. AccountId: ${optionsParsed.accountId}, streamName: ${optionsParsed.streamName}`)
    const streamId = `${optionsParsed.accountId}/${optionsParsed.streamName}`
    const requestInvocationId = invocationId++
    const userCountRequest = {
      arguments: [[streamId]],
      invocationId: requestInvocationId.toString(),
      streamIds: [],
      target: USER_COUNT_TARGET,
      type: 1
    }
    this.eventSubscriber.subscribe(userCountRequest)
    this.eventSubscriber.on('message', (response) => {
      handleStreamCountResponse(streamId, requestInvocationId, response, optionsParsed.callback)
    })
  }

  /**
   * Stop listening to stream events connected.
   * @example streamEvents.stop()
   */
  stop () {
    this.eventSubscriber.close()
  }
}

const handleStreamCountResponse = (streamIdConstraint, invocationIdConstraint, response, callback) => {
  switch (response.type) {
    case messageType.REQUEST:
      if (response.target === USER_COUNT_TARGET_RESPONSE) {
        for (const { streamId, count } of response.arguments) {
          if (streamId === streamIdConstraint) {
            const countChange = { streamId, count }
            logger.debug('User count changed: ', countChange)
            callback(countChange)
          }
        }
      }
      break

    case messageType.RESPONSE:
      if (response.error && response.invocationId === invocationIdConstraint) {
        const countData = {
          error: response.error,
          streamId: streamIdConstraint
        }
        logger.error('User count error: ', response.error)
        callback(countData)
      }
      break

    default:
      break
  }
}

const getOnUserCountOptions = (options, legacyStreamName, legacyCallback) => {
  let parsedOptions = (typeof options === 'object') ? options : {}
  if (Object.keys(parsedOptions).length === 0) {
    parsedOptions = {
      accountId: options,
      streamName: legacyStreamName,
      callback: legacyCallback
    }
  }
  if (config.WOWZA_FIXED_ACCOUNT_ID) {
    parsedOptions = { ...parsedOptions, accountId: config.WOWZA_FIXED_ACCOUNT_ID }
  }
  return parsedOptions
}