import 'rxjs/add/observable/empty';
import 'rxjs/add/observable/from';
import 'rxjs/add/observable/merge';
import 'rxjs/add/operator/bufferWhen';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/debounce';
import 'rxjs/add/operator/debounceTime';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/partition';
import 'rxjs/add/operator/skip';
import 'rxjs/add/operator/take';
import get from 'lodash/get';
import { Observable } from 'rxjs/Observable';
import { timer } from 'rxjs/observable/timer';
import getVisibility$ from '@atlassian/jira-common-page-visiblity-stream/src/page-visibility-stream';
import log from '@atlassian/jira-common-util-logging/src/log.tsx';
import { BOARD_SCOPE_ISSUES_UPDATED } from '@atlassian/jira-realtime/src/common/constants/events.tsx';
import { getBoardJirtJitter } from '../../feature-flags';
import { REFRESH_SOURCE_REALTIME } from '../../model/constants';
import { REALTIME_DISPATCH_EVENT, type RealtimeEventAction } from '../../state/actions/realtime';
import { workRefreshData } from '../../state/actions/work';
import type { State } from '../../state/reducers/types';
import {
	currentUserAccountIdSelector,
	getIsCMPBoard,
} from '../../state/selectors/software/software-selectors';
import type { Action, ActionsObservable, MiddlewareAPI } from '../../state/types';
import { isInactiveTab, isBlurredDocument } from './utils';

const isNotCurrentUserInActiveTab = (state: State, action: RealtimeEventAction) =>
	currentUserAccountIdSelector(state) !==
		get(action, ['payload', 'event', 'payload', 'atlassianId'], null) || isBlurredDocument();

/**
 * In order to prevent spikes in load when front-end receives real-time events,
 * we introduce jitter onto the refreshes.
 *
 * By adding random per tab jitter refresh request load will be randomly (and
 * evenly) spread across a time window. For example, if the jitter window is 3
 * seconds, and we have 300 connected clients, our spike load in the case of a
 * refresh event is reduced from peak 300 req/s to 100 req/s, because 1/3 of
 * the clients will be scheduled for each second.
 *
 * Looking at sub-second, timings, all clients' refreshes will be evenly spread
 * within the jitter window rather than happening at the same or close instants.
 */
const tabJitter = Math.random() * getBoardJirtJitter();

export const TMP_BOARD_JIRT_DEBOUNCE_INTERVAL = 3000;

const realtimeEpic = (action$: ActionsObservable, store: MiddlewareAPI): Observable<Action> => {
	const events$ = action$
		.ofType(REALTIME_DISPATCH_EVENT)
		.filter((action) => isNotCurrentUserInActiveTab(store.getState(), action));

	const [tabShownEvents$, tabHiddenEvents$] = events$.partition(() => !isInactiveTab());
	const delayedHiddenEvents$ = tabHiddenEvents$
		.bufferWhen(() => getVisibility$().skip(1).filter(Boolean).take(1))
		.flatMap((actions) => Observable.from(actions));

	const result$ = Observable.merge(tabShownEvents$, delayedHiddenEvents$).filter((action) => {
		// We want to only listen to board scope issues updated events here since we
		// additionally get both issue create and issue update specifically for custom filters
		// So we ignore them here so custom filters still know how to react to those events
		const isCMPBoard = getIsCMPBoard(store.getState());
		return isCMPBoard || action.payload.event.type === BOARD_SCOPE_ISSUES_UPDATED;
	});

	return result$
		.debounce(() => timer(TMP_BOARD_JIRT_DEBOUNCE_INTERVAL + tabJitter))
		.mapTo(workRefreshData(REFRESH_SOURCE_REALTIME))
		.catch((error) => {
			log.safeErrorWithoutCustomerData(
				'board.realtime.event.process.failure',
				'Failed to process realtime event',
				error,
			);

			return Observable.empty();
		});
};

export default realtimeEpic;
