'use strict'

/**
 * Module dependencies
 */
var events = require('events')
var Store = require('./store')
var mqttPacket = require('./mqtt-packet')
var Writable = require('./readable-stream').Writable
var inherits = require('inherits')
var reInterval = require('reinterval')
var validations = require('./validations')
var xtend = require('xtend')

var defaultConnectOptions = {
	keepalive: 60,
	reschedulePings: true,
	protocolId: 'MQTT',
	protocolVersion: 4,
	reconnectPeriod: 1000,
	connectTimeout: 30 * 1000,
	clean: true,
	resubscribe: true
}
var errors = {
	0: '',
	1: 'Unacceptable protocol version',
	2: 'Identifier rejected',
	3: 'Server unavailable',
	4: 'Bad username or password',
	5: 'Not authorized',
	16: 'No matching subscribers',
	17: 'No subscription existed',
	128: 'Unspecified error',
	129: 'Malformed Packet',
	130: 'Protocol Error',
	131: 'Implementation specific error',
	132: 'Unsupported Protocol Version',
	133: 'Client Identifier not valid',
	134: 'Bad User Name or Password',
	135: 'Not authorized',
	136: 'Server unavailable',
	137: 'Server busy',
	138: 'Banned',
	139: 'Server shutting down',
	140: 'Bad authentication method',
	141: 'Keep Alive timeout',
	142: 'Session taken over',
	143: 'Topic Filter invalid',
	144: 'Topic Name invalid',
	145: 'Packet identifier in use',
	146: 'Packet Identifier not found',
	147: 'Receive Maximum exceeded',
	148: 'Topic Alias invalid',
	149: 'Packet too large',
	150: 'Message rate too high',
	151: 'Quota exceeded',
	152: 'Administrative action',
	153: 'Payload format invalid',
	154: 'Retain not supported',
	155: 'QoS not supported',
	156: 'Use another server',
	157: 'Server moved',
	158: 'Shared Subscriptions not supported',
	159: 'Connection rate exceeded',
	160: 'Maximum connect time',
	161: 'Subscription Identifiers not supported',
	162: 'Wildcard Subscriptions not supported'
}

function defaultId() {
	return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
}

function sendPacket(client, packet, cb) {
	client.emit('packetsend', packet)

	var result = mqttPacket.writeToStream(packet, client.stream, client.options)

	if (!result && cb) {
		client.stream.once('drain', cb)
	} else if (cb) {
		cb()
	}
}

function flush(queue) {
	if (queue) {
		Object.keys(queue).forEach(function (messageId) {
			if (typeof queue[messageId].cb === 'function') {
				queue[messageId].cb(new Error('Connection closed'))
				delete queue[messageId]
			}
		})
	}
}

function flushVolatile(queue) {
	if (queue) {
		Object.keys(queue).forEach(function (messageId) {
			if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
				queue[messageId].cb(new Error('Connection closed'))
				delete queue[messageId]
			}
		})
	}
}

function storeAndSend(client, packet, cb, cbStorePut) {
	client.outgoingStore.put(packet, function storedPacket(err) {
		if (err) {
			return cb && cb(err)
		}
		cbStorePut()
		sendPacket(client, packet, cb)
	})
}

function nop() { }

/**
 * MqttClient constructor
 *
 * @param {Stream} stream - stream
 * @param {Object} [options] - connection options
 * (see Connection#connect)
 */
function MqttClient(streamBuilder, options) {
	var k
	var that = this

	if (!(this instanceof MqttClient)) {
		return new MqttClient(streamBuilder, options)
	}

	this.options = options || {}

	// Defaults
	for (k in defaultConnectOptions) {
		if (typeof this.options[k] === 'undefined') {
			this.options[k] = defaultConnectOptions[k]
		} else {
			this.options[k] = options[k]
		}
	}

	this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()

	this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }

	this.streamBuilder = streamBuilder

	// Inflight message storages
	this.outgoingStore = options.outgoingStore || new Store()
	this.incomingStore = options.incomingStore || new Store()

	// Should QoS zero messages be queued when the connection is broken?
	this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero

	// map of subscribed topics to support reconnection
	this._resubscribeTopics = {}

	// map of a subscribe messageId and a topic
	this.messageIdToTopic = {}

	// Ping timer, setup in _setupPingTimer
	this.pingTimer = null
	// Is the client connected?
	this.connected = false
	// Are we disconnecting?
	this.disconnecting = false
	// Packet queue
	this.queue = []
	// connack timer
	this.connackTimer = null
	// Reconnect timer
	this.reconnectTimer = null
	// Is processing store?
	this._storeProcessing = false
	// Packet Ids are put into the store during store processing
	this._packetIdsDuringStoreProcessing = {}
	/**
	 * MessageIDs starting with 1
	 * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810
	 */
	this.nextId = Math.max(1, Math.floor(Math.random() * 65535))

	// Inflight callbacks
	this.outgoing = {}

	// True if connection is first time.
	this._firstConnection = true

	// Mark disconnected on stream close
	this.on('close', function () {
		this.connected = false
		clearTimeout(this.connackTimer)
	})

	// Send queued packets
	this.on('connect', function () {
		var queue = this.queue

		function deliver() {
			var entry = queue.shift()
			var packet = null

			if (!entry) {
				return
			}

			packet = entry.packet

			that._sendPacket(
				packet,
				function (err) {
					if (entry.cb) {
						entry.cb(err)
					}
					deliver()
				}
			)
		}

		deliver()
	})

	// Clear ping timer
	this.on('close', function () {
		if (that.pingTimer !== null) {
			that.pingTimer.clear()
			that.pingTimer = null
		}
	})

	// Setup reconnect timer on disconnect
	this.on('close', this._setupReconnect)

	events.EventEmitter.call(this)

	this._setupStream()
}
inherits(MqttClient, events.EventEmitter)

/**
 * setup the event handlers in the inner stream.
 *
 * @api private
 */
MqttClient.prototype._setupStream = function () {
	var connectPacket
	var that = this
	var writable = new Writable()
	var parser = mqttPacket.parser(this.options)
	var completeParse = null
	var packets = []

	this._clearReconnect()

	this.stream = this.streamBuilder(this)

	parser.on('packet', function (packet) {
		packets.push(packet)
	})

	function nextTickWork() {
		if (packets.length) {
			process.nextTick(work)
		} else {
			var done = completeParse
			completeParse = null
			done()
		}
	}

	function work() {
		var packet = packets.shift()

		if (packet) {
			that._handlePacket(packet, nextTickWork)
		} else {
			var done = completeParse
			completeParse = null
			if (done) done()
		}
	}

	writable._write = function (buf, enc, done) {
		completeParse = done
		parser.parse(buf)
		work()
	}

	this.stream.pipe(writable)

	// Suppress connection errors
	this.stream.on('error', nop)

	// Echo stream close
	this.stream.on('close', function () {
		flushVolatile(that.outgoing)
		that.emit('close')
	})

	// Send a connect packet
	connectPacket = Object.create(this.options)
	connectPacket.cmd = 'connect'
	// avoid message queue
	sendPacket(this, connectPacket)

	// Echo connection errors
	parser.on('error', this.emit.bind(this, 'error'))

	// auth
	if (this.options.properties) {
		if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) {
			this.emit('error', new Error('Packet has no Authentication Method'))
			return this
		}
		if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') {
			var authPacket = xtend({ cmd: 'auth', reasonCode: 0 }, this.options.authPacket)
			sendPacket(this, authPacket)
		}
	}

	// many drain listeners are needed for qos 1 callbacks if the connection is intermittent
	// this.stream.setMaxListeners(1000)

	clearTimeout(this.connackTimer)
	this.connackTimer = setTimeout(function () {
		that._cleanUp(true)
	}, this.options.connectTimeout)
}

MqttClient.prototype._handlePacket = function (packet, done) {
	var options = this.options

	if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) {
		this.emit('error', new Error('exceeding packets size ' + packet.cmd))
		this.end({ reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' } })
		return this
	}

	this.emit('packetreceive', packet)

	switch (packet.cmd) {
		case 'publish':
			this._handlePublish(packet, done)
			break
		case 'puback':
		case 'pubrec':
		case 'pubcomp':
		case 'suback':
		case 'unsuback':
			this._handleAck(packet)
			done()
			break
		case 'pubrel':
			this._handlePubrel(packet, done)
			break
		case 'connack':
			this._handleConnack(packet)
			done()
			break
		case 'pingresp':
			this._handlePingresp(packet)
			done()
			break
		case 'disconnect':
			this._handleDisconnect(packet)
			done()
			break
		default:
			// do nothing
			// maybe we should do an error handling
			// or just log it
			break
	}
}

MqttClient.prototype._checkDisconnecting = function (callback) {
	if (this.disconnecting) {
		if (callback) {
			callback(new Error('client disconnecting'))
		} else {
			this.emit('error', new Error('client disconnecting'))
		}
	}
	return this.disconnecting
}

/**
 * publish - publish <message> to <topic>
 *
 * @param {String} topic - topic to publish to
 * @param {String, Buffer} message - message to publish
 * @param {Object} [opts] - publish options, includes:
 *    {Number} qos - qos level to publish on
 *    {Boolean} retain - whether or not to retain the message
 *    {Boolean} dup - whether or not mark a message as duplicate
 *    {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
 * @param {Function} [callback] - function(err){}
 *    called when publish succeeds or fails
 * @returns {MqttClient} this - for chaining
 * @api public
 *
 * @example client.publish('topic', 'message');
 * @example
 *     client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
 * @example client.publish('topic', 'message', console.log);
 */
MqttClient.prototype.publish = function (topic, message, opts, callback) {
	var packet
	var options = this.options

	// .publish(topic, payload, cb);
	if (typeof opts === 'function') {
		callback = opts
		opts = null
	}

	// default opts
	var defaultOpts = { qos: 0, retain: false, dup: false }
	opts = xtend(defaultOpts, opts)

	if (this._checkDisconnecting(callback)) {
		return this
	}

	packet = {
		cmd: 'publish',
		topic: topic,
		payload: message,
		qos: opts.qos,
		retain: opts.retain,
		messageId: this._nextId(),
		dup: opts.dup
	}

	if (options.protocolVersion === 5) {
		packet.properties = opts.properties
		if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) &&
			((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) ||
				(!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) {
			/*
			if we are don`t setup topic alias or
			topic alias maximum less than topic alias or
			server don`t give topic alias maximum,
			we are removing topic alias from packet
			*/
			delete packet.properties.topicAlias
		}
	}

	switch (opts.qos) {
		case 1:
		case 2:
			// Add to callbacks
			this.outgoing[packet.messageId] = {
				volatile: false,
				cb: callback || nop
			}
			if (this._storeProcessing) {
				this._packetIdsDuringStoreProcessing[packet.messageId] = false
				this._storePacket(packet, undefined, opts.cbStorePut)
			} else {
				this._sendPacket(packet, undefined, opts.cbStorePut)
			}
			break
		default:
			if (this._storeProcessing) {
				this._storePacket(packet, callback, opts.cbStorePut)
			} else {
				this._sendPacket(packet, callback, opts.cbStorePut)
			}
			break
	}

	return this
}

/**
 * subscribe - subscribe to <topic>
 *
 * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
 * @param {Object} [opts] - optional subscription options, includes:
 *    {Number} qos - subscribe qos level
 * @param {Function} [callback] - function(err, granted){} where:
 *    {Error} err - subscription error (none at the moment!)
 *    {Array} granted - array of {topic: 't', qos: 0}
 * @returns {MqttClient} this - for chaining
 * @api public
 * @example client.subscribe('topic');
 * @example client.subscribe('topic', {qos: 1});
 * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log);
 * @example client.subscribe('topic', console.log);
 */
MqttClient.prototype.subscribe = function () {
	var packet
	var args = new Array(arguments.length)
	for (var i = 0; i < arguments.length; i++) {
		args[i] = arguments[i]
	}
	var subs = []
	var obj = args.shift()
	var resubscribe = obj.resubscribe
	var callback = args.pop() || nop
	var opts = args.pop()
	var invalidTopic
	var that = this
	var version = this.options.protocolVersion

	delete obj.resubscribe

	if (typeof obj === 'string') {
		obj = [obj]
	}

	if (typeof callback !== 'function') {
		opts = callback
		callback = nop
	}

	invalidTopic = validations.validateTopics(obj)
	if (invalidTopic !== null) {
		setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
		return this
	}

	if (this._checkDisconnecting(callback)) {
		return this
	}

	var defaultOpts = {
		qos: 0
	}
	if (version === 5) {
		defaultOpts.nl = false
		defaultOpts.rap = false
		defaultOpts.rh = 0
	}
	opts = xtend(defaultOpts, opts)

	if (Array.isArray(obj)) {
		obj.forEach(function (topic) {
			if (!that._resubscribeTopics.hasOwnProperty(topic) ||
				that._resubscribeTopics[topic].qos < opts.qos ||
				resubscribe) {
				var currentOpts = {
					topic: topic,
					qos: opts.qos
				}
				if (version === 5) {
					currentOpts.nl = opts.nl
					currentOpts.rap = opts.rap
					currentOpts.rh = opts.rh
					currentOpts.properties = opts.properties
				}
				subs.push(currentOpts)
			}
		})
	} else {
		Object
			.keys(obj)
			.forEach(function (k) {
				if (!that._resubscribeTopics.hasOwnProperty(k) ||
					that._resubscribeTopics[k].qos < obj[k].qos ||
					resubscribe) {
					var currentOpts = {
						topic: k,
						qos: obj[k].qos
					}
					if (version === 5) {
						currentOpts.nl = obj[k].nl
						currentOpts.rap = obj[k].rap
						currentOpts.rh = obj[k].rh
						currentOpts.properties = opts.properties
					}
					subs.push(currentOpts)
				}
			})
	}

	packet = {
		cmd: 'subscribe',
		subscriptions: subs,
		qos: 1,
		retain: false,
		dup: false,
		messageId: this._nextId()
	}

	if (opts.properties) {
		packet.properties = opts.properties
	}

	if (!subs.length) {
		callback(null, [])
		return
	}

	// subscriptions to resubscribe to in case of disconnect
	if (this.options.resubscribe) {
		var topics = []
		subs.forEach(function (sub) {
			if (that.options.reconnectPeriod > 0) {
				var topic = { qos: sub.qos }
				if (version === 5) {
					topic.nl = sub.nl || false
					topic.rap = sub.rap || false
					topic.rh = sub.rh || 0
					topic.properties = sub.properties
				}
				that._resubscribeTopics[sub.topic] = topic
				topics.push(sub.topic)
			}
		})
		that.messageIdToTopic[packet.messageId] = topics
	}

	this.outgoing[packet.messageId] = {
		volatile: true,
		cb: function (err, packet) {
			if (!err) {
				var granted = packet.granted
				for (var i = 0; i < granted.length; i += 1) {
					subs[i].qos = granted[i]
				}
			}

			callback(err, subs)
		}
	}

	this._sendPacket(packet)

	return this
}

/**
 * unsubscribe - unsubscribe from topic(s)
 *
 * @param {String, Array} topic - topics to unsubscribe from
 * @param {Object} [opts] - optional subscription options, includes:
 *    {Object} properties - properties of unsubscribe packet
 * @param {Function} [callback] - callback fired on unsuback
 * @returns {MqttClient} this - for chaining
 * @api public
 * @example client.unsubscribe('topic');
 * @example client.unsubscribe('topic', console.log);
 */
MqttClient.prototype.unsubscribe = function () {
	var packet = {
		cmd: 'unsubscribe',
		qos: 1,
		messageId: this._nextId()
	}
	var that = this
	var args = new Array(arguments.length)
	for (var i = 0; i < arguments.length; i++) {
		args[i] = arguments[i]
	}
	var topic = args.shift()
	var callback = args.pop() || nop
	var opts = args.pop()

	if (typeof topic === 'string') {
		topic = [topic]
	}

	if (typeof callback !== 'function') {
		opts = callback
		callback = nop
	}

	if (this._checkDisconnecting(callback)) {
		return this
	}

	if (typeof topic === 'string') {
		packet.unsubscriptions = [topic]
	} else if (typeof topic === 'object' && topic.length) {
		packet.unsubscriptions = topic
	}

	if (this.options.resubscribe) {
		packet.unsubscriptions.forEach(function (topic) {
			delete that._resubscribeTopics[topic]
		})
	}

	if (typeof opts === 'object' && opts.properties) {
		packet.properties = opts.properties
	}

	this.outgoing[packet.messageId] = {
		volatile: true,
		cb: callback
	}

	this._sendPacket(packet)

	return this
}

/**
 * end - close connection
 *
 * @returns {MqttClient} this - for chaining
 * @param {Boolean} force - do not wait for all in-flight messages to be acked
 * @param {Function} cb - called when the client has been closed
 *
 * @api public
 */
MqttClient.prototype.end = function () {
	var that = this

	var force = arguments[0]
	var opts = arguments[1]
	var cb = arguments[2]

	if (force == null || typeof force !== 'boolean') {
		cb = opts || nop
		opts = force
		force = false
		if (typeof opts !== 'object') {
			cb = opts
			opts = null
			if (typeof cb !== 'function') {
				cb = nop
			}
		}
	}

	if (typeof opts !== 'object') {
		cb = opts
		opts = null
	}

	cb = cb || nop

	function closeStores() {
		that.disconnected = true
		that.incomingStore.close(function () {
			that.outgoingStore.close(function () {
				if (cb) {
					cb.apply(null, arguments)
				}
				that.emit('end')
			})
		})
		if (that._deferredReconnect) {
			that._deferredReconnect()
		}
	}

	function finish() {
		// defer closesStores of an I/O cycle,
		// just to make sure things are
		// ok for websockets
		that._cleanUp(force, setImmediate.bind(null, closeStores), opts)
	}

	if (this.disconnecting) {
		return this
	}

	this._clearReconnect()

	this.disconnecting = true

	if (!force && Object.keys(this.outgoing).length > 0) {
		// wait 10ms, just to be sure we received all of it
		this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
	} else {
		finish()
	}

	return this
}

/**
 * removeOutgoingMessage - remove a message in outgoing store
 * the outgoing callback will be called withe Error('Message removed') if the message is removed
 *
 * @param {Number} mid - messageId to remove message
 * @returns {MqttClient} this - for chaining
 * @api public
 *
 * @example client.removeOutgoingMessage(client.getLastMessageId());
 */
MqttClient.prototype.removeOutgoingMessage = function (mid) {
	var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
	delete this.outgoing[mid]
	this.outgoingStore.del({ messageId: mid }, function () {
		cb(new Error('Message removed'))
	})
	return this
}

/**
 * reconnect - connect again using the same options as connect()
 *
 * @param {Object} [opts] - optional reconnect options, includes:
 *    {Store} incomingStore - a store for the incoming packets
 *    {Store} outgoingStore - a store for the outgoing packets
 *    if opts is not given, current stores are used
 * @returns {MqttClient} this - for chaining
 *
 * @api public
 */
MqttClient.prototype.reconnect = function (opts) {
	var that = this
	var f = function () {
		if (opts) {
			that.options.incomingStore = opts.incomingStore
			that.options.outgoingStore = opts.outgoingStore
		} else {
			that.options.incomingStore = null
			that.options.outgoingStore = null
		}
		that.incomingStore = that.options.incomingStore || new Store()
		that.outgoingStore = that.options.outgoingStore || new Store()
		that.disconnecting = false
		that.disconnected = false
		that._deferredReconnect = null
		that._reconnect()
	}

	if (this.disconnecting && !this.disconnected) {
		this._deferredReconnect = f
	} else {
		f()
	}
	return this
}

/**
 * _reconnect - implement reconnection
 * @api privateish
 */
MqttClient.prototype._reconnect = function () {
	this.emit('reconnect')
	this._setupStream()
}

/**
 * _setupReconnect - setup reconnect timer
 */
MqttClient.prototype._setupReconnect = function () {
	var that = this

	if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
		if (!this.reconnecting) {
			this.emit('offline')
			this.reconnecting = true
		}
		that.reconnectTimer = setInterval(function () {
			that._reconnect()
		}, that.options.reconnectPeriod)
	}
}

/**
 * _clearReconnect - clear the reconnect timer
 */
MqttClient.prototype._clearReconnect = function () {
	if (this.reconnectTimer) {
		clearInterval(this.reconnectTimer)
		this.reconnectTimer = null
	}
}

/**
 * _cleanUp - clean up on connection end
 * @api private
 */
MqttClient.prototype._cleanUp = function (forced, done) {
	var opts = arguments[2]
	if (done) {
		this.stream.on('close', done)
	}

	if (forced) {
		if ((this.options.reconnectPeriod === 0) && this.options.clean) {
			flush(this.outgoing)
		}
		this.stream.destroy()
	} else {
		var packet = xtend({ cmd: 'disconnect' }, opts)
		this._sendPacket(
			packet,
			setImmediate.bind(
				null,
				this.stream.end.bind(this.stream)
			)
		)
	}

	if (!this.disconnecting) {
		this._clearReconnect()
		this._setupReconnect()
	}

	if (this.pingTimer !== null) {
		this.pingTimer.clear()
		this.pingTimer = null
	}

	if (done && !this.connected) {
		this.stream.removeListener('close', done)
		done()
	}
}

/**
 * _sendPacket - send or queue a packet
 * @param {String} type - packet type (see `protocol`)
 * @param {Object} packet - packet options
 * @param {Function} cb - callback when the packet is sent
 * @param {Function} cbStorePut - called when message is put into outgoingStore
 * @api private
 */
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
	cbStorePut = cbStorePut || nop

	if (!this.connected) {
		this._storePacket(packet, cb, cbStorePut)
		return
	}

	// When sending a packet, reschedule the ping timer
	this._shiftPingInterval()

	switch (packet.cmd) {
		case 'publish':
			break
		case 'pubrel':
			storeAndSend(this, packet, cb, cbStorePut)
			return
		default:
			sendPacket(this, packet, cb)
			return
	}

	switch (packet.qos) {
		case 2:
		case 1:
			storeAndSend(this, packet, cb, cbStorePut)
			break
		/**
		   * no need of case here since it will be caught by default
		   * and jshint comply that before default it must be a break
		   * anyway it will result in -1 evaluation
		   */
		case 0:
		/* falls through */
		default:
			sendPacket(this, packet, cb)
			break
	}
}

/**
 * _storePacket - queue a packet
 * @param {String} type - packet type (see `protocol`)
 * @param {Object} packet - packet options
 * @param {Function} cb - callback when the packet is sent
 * @param {Function} cbStorePut - called when message is put into outgoingStore
 * @api private
 */
MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
	cbStorePut = cbStorePut || nop

	if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
		this.queue.push({ packet: packet, cb: cb })
	} else if (packet.qos > 0) {
		cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
		this.outgoingStore.put(packet, function (err) {
			if (err) {
				return cb && cb(err)
			}
			cbStorePut()
		})
	} else if (cb) {
		cb(new Error('No connection to broker'))
	}
}

/**
 * _setupPingTimer - setup the ping timer
 *
 * @api private
 */
MqttClient.prototype._setupPingTimer = function () {
	var that = this

	if (!this.pingTimer && this.options.keepalive) {
		this.pingResp = true
		this.pingTimer = reInterval(function () {
			that._checkPing()
		}, this.options.keepalive * 1000)
	}
}

/**
 * _shiftPingInterval - reschedule the ping interval
 *
 * @api private
 */
MqttClient.prototype._shiftPingInterval = function () {
	if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
		this.pingTimer.reschedule(this.options.keepalive * 1000)
	}
}
/**
 * _checkPing - check if a pingresp has come back, and ping the server again
 *
 * @api private
 */
MqttClient.prototype._checkPing = function () {
	if (this.pingResp) {
		this.pingResp = false
		this._sendPacket({ cmd: 'pingreq' })
	} else {
		// do a forced cleanup since socket will be in bad shape
		this._cleanUp(true)
	}
}

/**
 * _handlePingresp - handle a pingresp
 *
 * @api private
 */
MqttClient.prototype._handlePingresp = function () {
	this.pingResp = true
}

/**
 * _handleConnack
 *
 * @param {Object} packet
 * @api private
 */

MqttClient.prototype._handleConnack = function (packet) {
	var options = this.options
	var version = options.protocolVersion
	var rc = version === 5 ? packet.reasonCode : packet.returnCode

	clearTimeout(this.connackTimer)

	if (packet.properties) {
		if (packet.properties.topicAliasMaximum) {
			if (!options.properties) { options.properties = {} }
			options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum
		}
		if (packet.properties.serverKeepAlive && options.keepalive) {
			options.keepalive = packet.properties.serverKeepAlive
			this._shiftPingInterval()
		}
		if (packet.properties.maximumPacketSize) {
			if (!options.properties) { options.properties = {} }
			options.properties.maximumPacketSize = packet.properties.maximumPacketSize
		}
	}

	if (rc === 0) {
		this.reconnecting = false
		this._onConnect(packet)
	} else if (rc > 0) {
		var err = new Error('Connection refused: ' + errors[rc])
		err.code = rc
		this.emit('error', err)
	}
}

/**
 * _handlePublish
 *
 * @param {Object} packet
 * @api private
 */
/*
those late 2 case should be rewrite to comply with coding style:

case 1:
case 0:
  // do not wait sending a puback
  // no callback passed
  if (1 === qos) {
    this._sendPacket({
      cmd: 'puback',
      messageId: mid
    });
  }
  // emit the message event for both qos 1 and 0
  this.emit('message', topic, message, packet);
  this.handleMessage(packet, done);
  break;
default:
  // do nothing but every switch mus have a default
  // log or throw an error about unknown qos
  break;

for now i just suppressed the warnings
*/
MqttClient.prototype._handlePublish = function (packet, done) {
	done = typeof done !== 'undefined' ? done : nop
	var topic = packet.topic.toString()
	var message = packet.payload
	var qos = packet.qos
	var mid = packet.messageId
	var that = this
	var options = this.options
	var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]

	switch (qos) {
		case 2: {
			options.customHandleAcks(topic, message, packet, function (error, code) {
				if (!(error instanceof Error)) {
					code = error
					error = null
				}
				if (error) { return that.emit('error', error) }
				if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) }
				if (code) {
					that._sendPacket({ cmd: 'pubrec', messageId: mid, reasonCode: code }, done)
				} else {
					that.incomingStore.put(packet, function () {
						that._sendPacket({ cmd: 'pubrec', messageId: mid }, done)
					})
				}
			})
			break
		}
		case 1: {
			// emit the message event
			options.customHandleAcks(topic, message, packet, function (error, code) {
				if (!(error instanceof Error)) {
					code = error
					error = null
				}
				if (error) { return that.emit('error', error) }
				if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) }
				if (!code) { that.emit('message', topic, message, packet) }
				that.handleMessage(packet, function (err) {
					if (err) {
						return done && done(err)
					}
					that._sendPacket({ cmd: 'puback', messageId: mid, reasonCode: code }, done)
				})
			})
			break
		}
		case 0:
			// emit the message event
			this.emit('message', topic, message, packet)
			this.handleMessage(packet, done)
			break
		default:
			// do nothing
			// log or throw an error about unknown qos
			break
	}
}

/**
 * Handle messages with backpressure support, one at a time.
 * Override at will.
 *
 * @param Packet packet the packet
 * @param Function callback call when finished
 * @api public
 */
MqttClient.prototype.handleMessage = function (packet, callback) {
	callback()
}

/**
 * _handleAck
 *
 * @param {Object} packet
 * @api private
 */

MqttClient.prototype._handleAck = function (packet) {
	/* eslint no-fallthrough: "off" */
	var mid = packet.messageId
	var type = packet.cmd
	var response = null
	var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
	var that = this
	var err

	if (!cb) {
		// Server sent an ack in error, ignore it.
		return
	}

	// Process
	switch (type) {
		case 'pubcomp':
		// same thing as puback for QoS 2
		case 'puback':
			var pubackRC = packet.reasonCode
			// Callback - we're done
			if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
				err = new Error('Publish error: ' + errors[pubackRC])
				err.code = pubackRC
				cb(err, packet)
			}
			delete this.outgoing[mid]
			this.outgoingStore.del(packet, cb)
			break
		case 'pubrec':
			response = {
				cmd: 'pubrel',
				qos: 2,
				messageId: mid
			}
			var pubrecRC = packet.reasonCode

			if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
				err = new Error('Publish error: ' + errors[pubrecRC])
				err.code = pubrecRC
				cb(err, packet)
			} else {
				this._sendPacket(response)
			}
			break
		case 'suback':
			delete this.outgoing[mid]
			for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) {
				if ((packet.granted[grantedI] & 0x80) !== 0) {
					// suback with Failure status
					var topics = this.messageIdToTopic[mid]
					if (topics) {
						topics.forEach(function (topic) {
							delete that._resubscribeTopics[topic]
						})
					}
				}
			}
			cb(null, packet)
			break
		case 'unsuback':
			delete this.outgoing[mid]
			cb(null)
			break
		default:
			that.emit('error', new Error('unrecognized packet type'))
	}

	if (this.disconnecting &&
		Object.keys(this.outgoing).length === 0) {
		this.emit('outgoingEmpty')
	}
}

/**
 * _handlePubrel
 *
 * @param {Object} packet
 * @api private
 */
MqttClient.prototype._handlePubrel = function (packet, callback) {
	callback = typeof callback !== 'undefined' ? callback : nop
	var mid = packet.messageId
	var that = this

	var comp = { cmd: 'pubcomp', messageId: mid }

	that.incomingStore.get(packet, function (err, pub) {
		if (!err) {
			that.emit('message', pub.topic, pub.payload, pub)
			that.handleMessage(pub, function (err) {
				if (err) {
					return callback(err)
				}
				that.incomingStore.del(pub, nop)
				that._sendPacket(comp, callback)
			})
		} else {
			that._sendPacket(comp, callback)
		}
	})
}

/**
 * _handleDisconnect
 *
 * @param {Object} packet
 * @api private
 */
MqttClient.prototype._handleDisconnect = function (packet) {
	this.emit('disconnect', packet)
}

/**
 * _nextId
 * @return unsigned int
 */
MqttClient.prototype._nextId = function () {
	// id becomes current state of this.nextId and increments afterwards
	var id = this.nextId++
	// Ensure 16 bit unsigned int (max 65535, nextId got one higher)
	if (this.nextId === 65536) {
		this.nextId = 1
	}
	return id
}

/**
 * getLastMessageId
 * @return unsigned int
 */
MqttClient.prototype.getLastMessageId = function () {
	return (this.nextId === 1) ? 65535 : (this.nextId - 1)
}

/**
 * _resubscribe
 * @api private
 */
MqttClient.prototype._resubscribe = function (connack) {
	var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
	if (!this._firstConnection &&
		(this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) &&
		_resubscribeTopicsKeys.length > 0) {
		if (this.options.resubscribe) {
			if (this.options.protocolVersion === 5) {
				for (var topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
					var resubscribeTopic = {}
					resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]
					resubscribeTopic.resubscribe = true
					this.subscribe(resubscribeTopic, { properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties })
				}
			} else {
				this._resubscribeTopics.resubscribe = true
				this.subscribe(this._resubscribeTopics)
			}
		} else {
			this._resubscribeTopics = {}
		}
	}

	this._firstConnection = false
}

/**
 * _onConnect
 *
 * @api private
 */
MqttClient.prototype._onConnect = function (packet) {
	if (this.disconnected) {
		this.emit('connect', packet)
		return
	}

	var that = this

	this._setupPingTimer()
	this._resubscribe(packet)

	this.connected = true

	function startStreamProcess() {
		var outStore = that.outgoingStore.createStream()

		function clearStoreProcessing() {
			that._storeProcessing = false
			that._packetIdsDuringStoreProcessing = {}
		}

		that.once('close', remove)
		outStore.on('error', function (err) {
			clearStoreProcessing()
			that.removeListener('close', remove)
			that.emit('error', err)
		})

		function remove() {
			outStore.destroy()
			outStore = null
			clearStoreProcessing()
		}

		function storeDeliver() {
			// edge case, we wrapped this twice
			if (!outStore) {
				return
			}
			that._storeProcessing = true

			var packet = outStore.read(1)

			var cb

			if (!packet) {
				// read when data is available in the future
				outStore.once('readable', storeDeliver)
				return
			}

			// Skip already processed store packets
			if (that._packetIdsDuringStoreProcessing[packet.messageId]) {
				storeDeliver()
				return
			}

			// Avoid unnecessary stream read operations when disconnected
			if (!that.disconnecting && !that.reconnectTimer) {
				cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
				that.outgoing[packet.messageId] = {
					volatile: false,
					cb: function (err, status) {
						// Ensure that the original callback passed in to publish gets invoked
						if (cb) {
							cb(err, status)
						}

						storeDeliver()
					}
				}
				that._packetIdsDuringStoreProcessing[packet.messageId] = true
				that._sendPacket(packet)
			} else if (outStore.destroy) {
				outStore.destroy()
			}
		}

		outStore.on('end', function () {
			var allProcessed = true
			for (var id in that._packetIdsDuringStoreProcessing) {
				if (!that._packetIdsDuringStoreProcessing[id]) {
					allProcessed = false
					break
				}
			}
			if (allProcessed) {
				clearStoreProcessing()
				that.removeListener('close', remove)
				that.emit('connect', packet)
			} else {
				startStreamProcess()
			}
		})
		storeDeliver()
	}
	// start flowing
	startStreamProcess()
}

module.exports = MqttClient
