import {
	distinctUntilChanged,
	delay,
	map,
	first,
	startWith,
	mergeMap,
	takeUntil,
	filter,
	debounce,
	concatMap,
	scan,
	distinctUntilKeyChanged,
	tap,
	switchMap,
} from "rxjs/operators";
/*************
 * This service monitors:
 *  1. Requests actions made to online services.
 *  2. Worklogs generated by activity online.
 *  3. Also using this for General Notifications.
 *
 * READ ME :
 * Useful terms and things to know.
 *
 * a Request Action ,  is a action that results in an online request to a service.
 * a Request Action Status , is the status of that request.
 *
 *  request actions status VOs are stored in the app data store
 * ( Except at time of writing , those immediate data requests that succeed - we don't worry about these currently. )
 * ( requests that fail or generate potentially long running workflows are recorded.)
 *
 * request actions that generate workflows in the ahub are recorded with their workflow ids as well as action ids in the data store
 *
 * a worklog segment is a set of worklogs from the ahub for a given time period.
 * A worklog segment contains worklogs about workflows and are used to update the request actions status in the data store.
 *
 * General notifications, You can generate a notification by calling the service, this is picked up by the
 * worklog monitor, which issues a notification toast, which in turn tiggers its storage in the notification history.
 * Sounds wierd, but it follows the path of the other items, but we might want to change in the furutre or perhaps the names !
 *
 * aHub -> worklog -> store -> requestActionMonitorService -> action store notification -> store -> system health component.
 * portal aHub failed request -> store -> requestActionMonitorService -> action store notification -> store -> system health component.
 * General notification -> requestActionMonitorService -> action store notification -> store -> system health component.
 *
 *********/

/**
 * Core
 */
import { Injectable } from "@angular/core";
import {
	Observable,
	Subject,
	interval,
	timer,
	combineLatest,
	concat,
	of,
} from "rxjs";

/**
 * Utils
 */
import { ListUtil } from "store/list.util";

/*
 * Navigation
 */
import { Router } from "@angular/router";
import { AppRoutingNavigation } from "app/app-routing-navigation";

/**
 * Store and selectors.
 */
import { StoreAccess } from "store/store-access";
import { AHubActions } from "actions/ahub.actions";
import { AppActions } from "actions/app.actions";

import {
	aHubStatePermanentWorklogOutstandingCount,
	aHubStatePermanentWorklogSegmentLast,
	aHubStatePermanentWorkGroupList,
	aHubStatePermanentIsConnected,
	aHubStatePermanentUserList,
} from "selector/ahub/ahub-permanent.selector";
import {
	aHubStateTemporaryUserIndexList,
	aHubStateTemporaryDataSetIndexes,
	aHubStateTemporaryClientIndexs,
	aHubStateTemporaryDistributionGroupIndexes,
	aHubStateTemporaryExportIndexs,
	aHubStateTemporaryWorkGroupIndexes,
	aHubStateTemporaryDistributionGroupList,
	aHubStateTemporaryDistributionsList,
	aHubStateTemporaryWorkGroupList,
	aHubStateTemporaryExportList,
	aHubStateTemporaryDataSetList,
	aHubStateTemporaryClientList,
	aHubStateTemporaryClientQuotas,
	aHubStateTemporaryProductClassIndexes,
	aHubStateTemporaryProductClassList,
	aHubStateTemporaryProductPropertyIndexes,
	aHubStateTemporaryProductPropertyList,
	aHubStateTemporaryProductPropertySectionIndexes,
	aHubStateTemporaryProductPropertySectionList,
	aHubStateTemporaryProductPropertyAllocationIndexes,
	aHubStateTemporaryExtractDefinitionIndexes,
	aHubStateTemporaryExtractDefinitionList,
	aHubStateTemporaryClientLibraryVersionIndexs,
	aHubStateTemporaryClientLibraryVersionList,
	aHubStateTemporaryClientLibraryVersionModelUrlsList,
	aHubStateTemporaryClientLibrariesList,
	aHubStateTemporaryExtractIndexes,
	aHubStateTemporaryExtractList,
	aHubStateTemporaryProductPropertyAllocationChainIndexes,
	aHubStateTemporaryProductPropertyAllocationChains,
	aHubStateTemporaryDataSetCategoryList,
	aHubStateTemporaryExtractProductsKeys,
	aHubStateTemporaryDataSetCategoryProductsKeys,
	aHubStateTemporaryExporterIndexes,
	aHubStateTemporaryExporterList,
	aHubStateTemporaryResourcePackIndexes,
	aHubStateTemporaryResourcePackList,
	aHubStateTemporaryExporterBuildHistoryIndexMap,
	aHubStateTemporaryExporterBuildHistorys,
	aHubStateTemporaryClientConfigurationList,
} from "selector/ahub/ahub-temporary.selector";

import {
	requestActionStatuses,
	sessionClientId,
	sessionUserId,
} from "selector/app.selector";

import { viewSelectedDistributionGroupId } from "selector/view/view-distribution-groups.selector";
import { viewLibrarySelectedExtractId } from "selector/view/view-library-extracts.selector";

import { viewSelectedExportId } from "selector/view/view-exports.selector";

import { viewSelectedUserId } from "selector/view/view-account-management.selector";

import { WorklogObservable } from "./worklog-observable";

/**
 * Value Objects
 */
import { WorklogAHubVO } from "valueObjects/ahub/work/worklog.ahub.vo";
import { WorklogSegmentAHubVO } from "valueObjects/ahub/work/worklog-segment.ahub.vo";
import { RequestActionStatusVO } from "valueObjects/app/request-action-status.vo";

import { RequestActionStatusUploadVO } from "valueObjects/app/request-action-status-upload.vo";
import { RequestActionStatusEnum } from "valueObjects/app/request-action-status.app.enum";
import { viewLibraryPublishingSelectedExporter } from "app/store/selector/view/view-library-publishing.selector";
import { ExporterAHubVO } from "app/valueObjects/ahub/library/exporter.ahub.vo";
import { DistributionAHubVO } from "app/valueObjects/ahub/accounts/distribution.ahub.vo";

@Injectable()
export class RequestActionMonitorService {
	/**
	 *  One minute in MS
	 */
	private static readonly MINIUTE_MS: number = 60 * 1000;

	/**
	 *  Worklog Purge timein millisec. x Minutes.
	 */
	private readonly purgeMS: number =
		15 * RequestActionMonitorService.MINIUTE_MS;

	/**
	 * Create an observable on the current session client id.
	 */
	private readonly clientId$: Observable<number> =
		StoreAccess.dataGetObvs(sessionClientId);

	/**
	 * Create an observable to watch the current session user id.
	 */
	private readonly userId$: Observable<number> =
		StoreAccess.dataGetObvs(sessionUserId);

	/**
	 * Set this subject to trigger ending of a subscription of the worklog monitoring.
	 */
	private readonly workLogPollingKillPulse$: Subject<void> =
		new Subject<void>();

	/**
	 * We will observe the worklog pending flag which indicates if worklogs
	 * are being fetched. We filter for when the pending flag is reset to flase to indicate
	 * worklogs have been fetched.
	 */
	private readonly worklogPendingRequestCount$: Observable<number> =
		StoreAccess.dataGetObvs(aHubStatePermanentWorklogOutstandingCount);

	/**
	 * Creates a observable date which is trigger on a regular basis to generate request action status purge actions.
	 */
	private readonly worklogPurgeShedule$: Observable<Date> = interval(
		RequestActionMonitorService.MINIUTE_MS
	).pipe(
		map((intervalVal: number) => {
			return new Date(Date.now() - this.purgeMS);
		})
	);

	/**
	 * Creates observable of the last worklog segment to come in from the ahub with worklog changes.
	 * we'll filter out any updates that have no changes.
	 */
	private readonly worklogSegmentLast$: Observable<WorklogSegmentAHubVO> =
		StoreAccess.dataGetObvs(aHubStatePermanentWorklogSegmentLast).pipe(
			filter(
				(worklogSegment) =>
					worklogSegment.workLogs !== undefined &&
					worklogSegment.workLogs !== null
			),
			filter((worklogSegment) => worklogSegment.workLogs.length > 0)
		);

	/**
	 * Creates an observable stream of individual worklogs fed back from the aHub, this is based on the
	 * lastest worklog segments retrieved which only contain changed items.
	 */
	private readonly worklog$: Observable<WorklogAHubVO> =
		this.worklogSegmentLast$.pipe(
			concatMap(
				(worklogSegment: WorklogSegmentAHubVO) => worklogSegment.workLogs
			)
		);

	/**
	 * Creates an observable to obtain all worklogs..
	 */
	private readonly worklogAll$: WorklogObservable = new WorklogObservable(
		this.worklog$
	);

	/**
	 * Creates an observable to obtain worklogs limited to completed non faulting items.
	 */
	private readonly worklogSuccess$: WorklogObservable = new WorklogObservable(
		this.worklog$
	).onlySuccess();

	/**
	 * Creates an observable to obtain worklogs limited to completed items ( may have faulted )
	 */
	private readonly worklogComplete$: WorklogObservable = new WorklogObservable(
		this.worklog$
	).onlyComplete();

	/**
	 * Creates an observable to obtain worklogs limited to non failed items (may have completed).
	 */
	private readonly worklogNonFailed$: WorklogObservable = new WorklogObservable(
		this.worklog$
	).onlyNonFailed();

	/**
	 * Creates an observable to obtain worklog updates generated by changes in the aHub limited to in progress items.
	 */
	private readonly worklogInProgress$: WorklogObservable =
		new WorklogObservable(this.worklog$).onlyIncomplete();

	/**
	 * --------------------------------------------------------
	 * Request Action Status - Streams
	 * --------------------------------------------------------
	 */

	/**
	 *  Monitor the request actions status checking for errors to report.
	 */
	private readonly requestActionStatus$: Observable<RequestActionStatusVO[]> =
		StoreAccess.dataGetObvs(requestActionStatuses);

	/**
	 *  MA flat version of the requestActionStatusVos.
	 */
	private readonly requestActionStatusFlat$: Observable<RequestActionStatusVO> =
		this.requestActionStatus$.pipe(mergeMap((actionStatus) => actionStatus));

	/**
	 * --------------------------------------------------------------------
	 * General Notification - Streams
	 * --------------------------------------------------------------------
	 */

	/**
	 * --------------------------------------------------------------------
	 * METHODS
	 * --------------------------------------------------------------------
	 */

	constructor(private readonly router: Router) {
		// Montiors the status of request actions and generated follow up actions if required.
		this.requestActionStatusMonitoringStart();

		// Starts the polling of the aHub to get worklogs which will be used to update requestActionStatusVos
		this.worklogPollingStart();

		// Constructs the observable chains that updates the request action status VO based on returned worklogs.
		this.worklogStoreUpdateStart();

		// Starts the purging of previously old recieved worklogs, that would drive up memory usage over time..
		this.worklogPurgingStart();
	}

	private worklogStoreUpdateStart() {
		//Create a worklog observable. we want to apply some restrictions
		// 1) We are only intrested in those items which have affected ids ( we can only update the cache if we have the related id )
		//    WE must update data weather the results completed or failed as the aHub may have changed the data as a result of a state change.
		const worklogObservable: WorklogObservable = new WorklogObservable(
			this.worklog$
		).hasAffectedIds();

		//Worklog observable for the accounts entity
		const worklogObservableAccounts: WorklogObservable =
			worklogObservable.entityTypeAccounts();

		//Worklog observable for the library entity
		const worklogObservableLibrary: WorklogObservable =
			worklogObservable.entityTypeLibrary();

		// DOWNLOAD Stream  - Set it up

		this.worklogWithDownload(new WorklogObservable(this.worklog$));

		// We observe the completed worklogs to watch for any complete actions.
		// Each observation represents a part of the temporary aHub store cache, that might need its items updated ( if they exist )

		this.worklogStoreUpdateDistributionGroupsStart(
			worklogObservableAccounts.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_DISTRIBUTION_GROUPS
			)
		);
		this.worklogStoreUpdateDistributionsStart(
			worklogObservableAccounts.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_DISTRIBUTION
			)
		);
		this.worklogStoreUpdateWorkGroupsStart(
			worklogObservableAccounts.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_WORK_GROUPS
			)
		);
		this.worklogStoreUpdateExportStart(
			worklogObservableAccounts.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_EXPORT
			)
		);
		this.worklogStoreUpdateClientStart(
			worklogObservableAccounts.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_CLIENT
			)
		);
		this.worklogStoreUpdateUserStart(
			worklogObservableAccounts.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_USER
			)
		);
		this.worklogStoreUpdateClientQuotaStart(
			worklogObservableAccounts.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_CLIENT_QUOTA
			)
		);
		this.worklogStoreUpdateClientLibraryVersionStart(
			worklogObservableAccounts.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_CLIENT_LIBRARY_VERSION
			)
		);
		this.worklogStoreUpdateClientLibraryStart(
			worklogObservableAccounts.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_CLIENT_LIBRARY
			)
		);

		this.worklogStoreUpdateProductClassesStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_PRODUCT_CLASSES
			)
		);
		this.worklogStoreUpdateProductPropertyStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_PRODUCT_PROPERTY
			)
		);
		this.worklogStoreUpdateProductPropertySectionStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_PRODUCT_PROPERTY_SECTION
			)
		);
		this.worklogStoreUpdateProductPropertyAllocationStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_PRODUCT_PROPERTY_ALLOCATION
			)
		);
		this.worklogStoreUpdateProductPropertyAllocationChainStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_PRODUCT_PROPERTY_ALLOCATION_CHAIN
			)
		);
		this.worklogStoreUpdateDataSetStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_DATASET
			)
		);
		this.worklogStoreUpdateDataSetCategoryStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_DATASET_CATEGORY
			)
		);
		this.worklogStoreUpdateExtractDefinitionStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_EXTRACT_DEFINITION
			)
		);

		this.worklogStoreUpdateExtractStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_EXTRACT
			)
		);
		this.worklogStoreUpdateExtractContentsStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_EXTRACT_CONTENTS
			)
		);
		this.worklogStoreUpdateProductStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_PRODUCT
			)
		);
		this.worklogStoreUpdateExporterStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_EXPORTER
			)
		);
		this.worklogStoreUpdateExporterBuildHistoryStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_EXPORTER_BUILD_HISTORY
			)
		);
		this.worklogStoreUpdateResourcePackStart(
			worklogObservableLibrary.worklogTypeFilter(
				WorklogObservable.WORK_LOG_TYPES_RESOURCE_PACK
			)
		);
	}

	/**
	 * Sends the intial action to begin worklog long polling from the aHub, and set up
	 * observation to loop subseqent requests for worklogs as the previous completes.
	 */
	private worklogPollingStart() {
		let worklogSegmentFetchActionId: number;
		let previousWorklogSegmentFetchActionId = -1;

		combineLatest([this.userId$, this.clientId$.pipe(startWith(-1))])
			.pipe(
				switchMap(([userId, clientId]) => {
					//No user id, then we will return a false .... we will never deal with worklogs if we have no user id
					if (userId === undefined || userId === -1) {
						return of(false);
					}

					//OK so now we want to setup our worklog request loop for that we will use two streams
					// 1) The first one will fire an initial time so that we can get the most recent data for either the
					//    newly defined current user or the newley defined current client
					// 2) The second stream is responsible for subsequent worklog requests we want to make them after
					//    any outstaning requests have finished
					return concat(
						of(true),
						this.worklogPendingRequestCount$.pipe(
							// ignore multiple repeated settings to true or false.
							map((count) => count === 0), // filter to false items ... only trigger on false
							distinctUntilChanged()
						)
					);
				}),

				// We will only try and make a call if thats what we want to do
				filter((shouldRequest) => shouldRequest),

				// Slow rate up if not connected.),
				debounce((alwaysFalse: boolean) =>
					timer(
						StoreAccess.dataGet(aHubStatePermanentIsConnected) ? 1000 : 10000
					)
				),

				// Get the last worklog update time, all streams must fire this value
				map(
					(value) =>
						StoreAccess.dataGet(aHubStatePermanentWorklogSegmentLast)
							.workLogLastestUpdateTime
				)
			)
			.subscribe((lastTime) => {
				worklogSegmentFetchActionId = StoreAccess.dispatch(
					AHubActions.worklogSegmentFetch(lastTime)
				);
			});

		// Handle worklog requests that never come back...
		// (perhaps the client machine is put to sleep while waiting for a worklog to come back)
		timer(0, 60000)
			.pipe(takeUntil(this.workLogPollingKillPulse$.asObservable()))
			.subscribe((timer) => {
				if (
					worklogSegmentFetchActionId === previousWorklogSegmentFetchActionId
				) {
					StoreAccess.dispatch(
						AHubActions.worklogSegmentFetch(
							StoreAccess.dataGet(aHubStatePermanentWorklogSegmentLast)
								.workLogLastestUpdateTime
						)
					);
				}
				// Only kick in once worklogs have already started (eg not while waiting for log in)
				if (worklogSegmentFetchActionId) {
					previousWorklogSegmentFetchActionId = worklogSegmentFetchActionId;
				}
			});

		// We also have internal app data which is monitoring actions the app as generated
		// we'll want to send and action to ensure this is up to date as the worklogs come in.
		this.worklogSegmentLast$.subscribe((worklogSegment) =>
			StoreAccess.dispatch(
				AppActions.sessionrequestActionStatusUpdate(worklogSegment)
			)
		);
	}

	/**
	 * Generates an action to remove old processed worklogs.
	 */
	private worklogPurgingStart() {
		// We will subscribe to our purgue shedule observable to trigger worklog purging.
		this.worklogPurgeShedule$.subscribe((purgeDate: Date) =>
			StoreAccess.dispatch(AHubActions.worklogPurge(purgeDate))
		);
	}

	/**
	 * Actions that generate server requests ( currently with the exception of those that simply get data and successfuly return , [Subject To Change] )
	 * get logged in the aplication data store. Long running request return workflow references which are also recorded in the same data store.
	 * Long running actions are then updated as worklogs are returned through long polling , again in this store.
	 *
	 * This method, starts the monitoring of this data and triggers, subsequent actions.. in most cases for error handling purposes.
	 *
	 * We status updates
	 */
	private requestActionStatusMonitoringStart() {
		// Subscribe to the requestActionStatus updated from the store , specifically those that have failed.
		this.requestActionStatusObservableFaults().subscribe(
			(requestActionStatus: RequestActionStatusVO) => {
				// Depending on the fault, we want to trigger another action.
				// Generate based on error code.
				// We will also despatch a connectivity error set, if the error code suggests it.
				switch (requestActionStatus.errorCode) {
					// Bad authentication, or expired session?
					case 401:
					case 419:
						// Note.. there is a case where this can occur if the user is in the process of logging in and has supplied a bad password.
						// In this instance, sending them back to login is not a good idea.. as they are already at the login screen !
						// Its a lot of pain for mis typing your password. .. No convinced this is the best test though.
						if (!this.router.routerState.snapshot.url.includes("login")) {
							// Record the return to URL.
							StoreAccess.dispatch(
								AppActions.returnToUrlSet(this.router.routerState.snapshot.url)
							);

							// Added a delay here to allow a user to read any warning/error messages before redirect
							setTimeout(() => {
								AppRoutingNavigation.navigateLogout(this.router);
							}, 5000); // Return user to login...
						}

						break;

					case 0: // Unable to make request , internet disconnected... lets hope for a reconnect soon !
					case 502: // ahub load balancer error - should not happen - but treat as a failed connection.
						console.error(
							"Failed Request (internet connection?) :  ",
							requestActionStatus.errorCode
						);
						StoreAccess.dispatch(AHubActions.commsError()); // Despatch comms Error action.
						break;
				}
			}
		);
	}

	/**
	 * Begins observation of worklogs which generate a download
	 */
	private worklogWithDownload(worklog$: WorklogObservable) {
		//Create a new date, we will use this to prevent downloading old content
		const streamStartDate: Date = new Date();

		//Create a stream which will fire once a work log with a download is complete
		//as then we will trigger a download
		worklog$
			.onlyComplete()
			.hasDownload()
			.sessionUser()
			.observable()
			.pipe(
				distinctUntilKeyChanged("workflowExecutionId"),
				filter(
					(worklog) =>
						worklog.endTime.getTime() -
							(streamStartDate.getTime() -
								RequestActionMonitorService.MINIUTE_MS) >
						0
				)
			)
			.subscribe((worklog) => {
				//Dispatch an action to download the content imediatly
				StoreAccess.dispatch(
					AHubActions.workflowFilesDownload(worklog.workflowExecutionId)
				);
			});
	}

	/**
	 * Begins observation of Work Logs for : Exports
	 */
	private worklogStoreUpdateExportStart(worklogExports$: WorklogObservable) {
		//If the exports have changed we will update the export indexes
		worklogExports$.observable().subscribe((worklog) => {
			//We will only go and get the export index's if they have alreadt been set
			if (StoreAccess.dataGet(aHubStateTemporaryExportIndexs) !== null) {
				StoreAccess.dispatch(AHubActions.exportIndexsFetch(), true);
			}
		});

		//Watch the worklogs for updates
		worklogExports$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryExportList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.exportsByIdFetch(matchingCachedItemIds),
						true
					);

					const distributionsList = StoreAccess.dataGet(
						aHubStateTemporaryDistributionsList
					);

					const distributionsForUpdatedExport: DistributionAHubVO[] =
						distributionsList.data.filter((distribution) =>
							matchingCachedItemIds.includes(distribution.exportId)
						);

					if (distributionsForUpdatedExport) {
						const distributionIdsForUpdatedExport =
							distributionsForUpdatedExport.map((d) => d.id);
						StoreAccess.dispatch(
							AHubActions.distributionsByIdsFetch(
								distributionIdsForUpdatedExport
							),
							true
						);

						// An export has been updated for this client and various distribution have been affected, lets update the distribution indexes (should be a cheap call)
						// for the distribution groups affected
						const distributionGroupIdsForUpdatedExport: number[] = Array.from(
							new Set(
								distributionsForUpdatedExport.map((d) => d.distributionGroupId)
							)
						);
						distributionGroupIdsForUpdatedExport.forEach((dgId) => {
							StoreAccess.dispatch(
								AHubActions.distributionGroupDistributionIndexsFetch(dgId),
								true
							);
						});
					}
				}
			});

		//Watch the worklog for export remove
		worklogExports$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.exportsRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : dataSets
	 */
	private worklogStoreUpdateDataSetStart(worklogdataSets$: WorklogObservable) {
		//If the dataSets have changed we will update the dataSet indexes
		worklogdataSets$.observable().subscribe((worklog) => {
			//We will only go and get the dataSet index's if they have alreadt been set
			if (StoreAccess.dataGet(aHubStateTemporaryDataSetIndexes) !== null) {
				StoreAccess.dispatch(AHubActions.dataSetIndexsFetch(), true);
			}
		});

		//Watch the worklogs for updates
		worklogdataSets$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryDataSetList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.dataSetsByIdFetch(matchingCachedItemIds),
						true
					);
				}

				// The dataset update may have caused changes which should be reflected in child exporters, lets make sure that if there is one currently selected, we refresh it
				const currentlySelectedExporter: ExporterAHubVO = StoreAccess.dataGet(
					viewLibraryPublishingSelectedExporter
				);
				if (currentlySelectedExporter) {
					StoreAccess.dispatch(
						AHubActions.exportersByIdFetch([currentlySelectedExporter.id]),
						true
					);
				}
			});

		//Watch the worklog for dataSet remove
		worklogdataSets$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.dataSetsRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : dataSets categories.
	 */
	private worklogStoreUpdateDataSetCategoryStart(
		worklogDataSetCategory$: WorklogObservable
	) {
		// Watch the worklogs for updates.
		worklogDataSetCategory$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryDataSetCategoryList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.dataSetCategoriesByIdsFetch(matchingCachedItemIds),
						true
					);
				}
			});

		// Watch the worklog for data set category remove.
		worklogDataSetCategory$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.dataSetCategoryRemove(worklog.workAffectedIds),
					true
				)
			);

		//Watch the worklogs for ANY successful changes to the worklog categorys
		//we will use these changes to update the product data being viewed by the user
		worklogDataSetCategory$
			.onlySuccess()
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				//Get the currently list of data set categories with products for a product
				const currentDataSetCategoryIdsWithProducts = StoreAccess.dataGet(
					aHubStateTemporaryDataSetCategoryProductsKeys
				);

				//Do we have current extract id's with product data
				if (currentDataSetCategoryIdsWithProducts) {
					currentDataSetCategoryIdsWithProducts.forEach((dataSetCatId) =>
						StoreAccess.dispatch(
							AHubActions.dataSetCategoryProductsFetch(dataSetCatId),
							true
						)
					);
				}
			});
	}

	/**
	 * Begins observation of Work Logs for : Client
	 */
	private worklogStoreUpdateClientStart(worklogClient$: WorklogObservable) {
		//If the clients have changed we will update the client indexes
		worklogClient$.observable().subscribe((worklog) => {
			//We will only go and get the client index's if they have alreadt been set
			if (StoreAccess.dataGet(aHubStateTemporaryClientIndexs) !== null) {
				StoreAccess.dispatch(AHubActions.clientIndexsFetch(), true);
			}
		});

		// Look out for new clients that the user might want to switch too.
		worklogClient$
			.workActionAdd()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.userClientIndexsFetch(StoreAccess.dataGet(sessionUserId)),
					true
				)
			);

		//Watch the worklogs for updates
		worklogClient$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryClientList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.clientsByIdFetch(matchingCachedItemIds),
						true
					);
				}

				const matchingCachedClientConfigItemIds =
					ListUtil.listDataFilterIdOnExists(
						StoreAccess.dataGet(aHubStateTemporaryClientConfigurationList),
						worklog.workAffectedIds
					);
				if (matchingCachedClientConfigItemIds.length > 0) {
					matchingCachedClientConfigItemIds.forEach((clientId) => {
						StoreAccess.dispatch(
							AHubActions.clientConfigurationByClientIdFetch(clientId),
							true
						);
					});
				}
			});

		//Watch the worklog for removals
		worklogClient$
			.workActionDelete()
			.observable()
			.subscribe((worklog) => {
				// Remove the client.
				StoreAccess.dispatch(
					AHubActions.dataSetsRemove(worklog.workAffectedIds),
					true
				);

				// Then update the users list.
				StoreAccess.dispatch(
					AHubActions.userClientIndexsFetch(StoreAccess.dataGet(sessionUserId)),
					true
				);
			});
	}

	/**
	 * Begins observation of Work Logs for : user
	 */
	private worklogStoreUpdateUserStart(worklogUser$: WorklogObservable) {
		worklogUser$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				if (
					worklog.workAffectedIds.indexOf(StoreAccess.dataGet(sessionUserId)) >
					-1
				) {
					// We just got a worklog that affects the current user, we should update the entity entityPermissionsFetch
					StoreAccess.dispatch(AppActions.entityPermissionsFetch(), true);
				}

				// Get the list of matching user index items for the affected ids.
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryUserIndexList),
					worklog.workAffectedIds
				);

				// Now make a request to get the user indexes by ids.
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.userIndexesByIdFetch(matchingCachedItemIds),
						true
					);

					// If currently have this user selected, lets go get the user extended data too
					if (
						matchingCachedItemIds.indexOf(
							StoreAccess.dataGet(viewSelectedUserId)
						) !== -1
					) {
						StoreAccess.dispatch(
							AHubActions.userExtendedsByIdFetch([
								StoreAccess.dataGet(viewSelectedUserId),
							])
						);
					}
				}

				// Get the list of matching permanent user items for the affected ids.
				const matchingPermanentCachedItemIds =
					ListUtil.listDataFilterIdOnExists(
						StoreAccess.dataGet(aHubStatePermanentUserList),
						worklog.workAffectedIds
					);

				// Now make a request to get the user indexes by ids.
				if (matchingPermanentCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.sessionUsersByIdsFetch(matchingPermanentCachedItemIds),
						true
					);
				}
			});

		// Remove all deleted user indexes from the user index list.
		worklogUser$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.userIndexesRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : Distribution Group
	 */
	private worklogStoreUpdateDistributionGroupsStart(
		worklogDistributionGroup$: WorklogObservable
	) {
		worklogDistributionGroup$.observable().subscribe((worklog) => {
			//We will only go and get the distribution group index's if they have alreadt been set
			if (
				StoreAccess.dataGet(aHubStateTemporaryDistributionGroupIndexes) !== null
			) {
				StoreAccess.dispatch(
					AHubActions.distributionGroupIndexesForSessionClientIdFetch(),
					true
				);
			}
		});

		worklogDistributionGroup$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryDistributionGroupList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.distributionGroupsFetch(matchingCachedItemIds),
						true
					);

					for (const distributionGroupId of matchingCachedItemIds) {
						StoreAccess.dispatch(
							AHubActions.distributionGroupUserIndexsFetch(distributionGroupId),
							true
						);
					}
				}
			});

		worklogDistributionGroup$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.distributionGroupsRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 *  Begins observation of Work Logs for : Distribution
	 */
	private worklogStoreUpdateDistributionsStart(
		worklogDistribution$: WorklogObservable
	) {
		worklogDistribution$.observable().subscribe((worklog) => {
			// Get the selected distribution group id.
			const distributionGroupId = StoreAccess.dataGet(
				viewSelectedDistributionGroupId
			);

			// Do we have a currently selected distribution group id? If so, make the request to get the distributions for the selected id.
			if (distributionGroupId && distributionGroupId > -1) {
				StoreAccess.dispatch(
					AHubActions.distributionGroupDistributionIndexsFetch(
						distributionGroupId
					),
					true
				);
			}

			// Get the selected export id.
			const exportId = StoreAccess.dataGet(viewSelectedExportId);

			// Do we have a currently selected export id? If so, make the request to get the distributions for the selected id.
			if (exportId && exportId > -1) {
				StoreAccess.dispatch(
					AHubActions.exportDistributionIndexsFetch(exportId),
					true
				);
			}
		});

		worklogDistribution$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryDistributionsList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.distributionsByIdsFetch(matchingCachedItemIds),
						true
					);
				}
			});

		worklogDistribution$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.distributionsRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : Work Group
	 */
	private worklogStoreUpdateWorkGroupsStart(
		worklogWorkGroup$: WorklogObservable
	) {
		worklogWorkGroup$
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				//We will only go and get the workgroup index's if they have alreadt been set
				if (StoreAccess.dataGet(aHubStateTemporaryWorkGroupIndexes) !== null) {
					StoreAccess.dispatch(AHubActions.workGroupIndexsFetch(), true);
				}
			});

		worklogWorkGroup$
			.workActionUpdate()
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryWorkGroupList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.workGroupsFetch(matchingCachedItemIds),
						true
					);

					for (const workGroupId of matchingCachedItemIds) {
						StoreAccess.dispatch(
							AHubActions.workGroupUserIndexsFetch(workGroupId),
							true
						);
					}
				}
			});

		worklogWorkGroup$
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				const workGroupIncludingCurrentUser = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStatePermanentWorkGroupList),
					worklog.workAffectedIds
				);
				if (workGroupIncludingCurrentUser.length > 0) {
					StoreAccess.dispatch(AppActions.entityPermissionsFetch(), true);
				}
			});

		worklogWorkGroup$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.workGroupsRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : client quota
	 */
	private worklogStoreUpdateClientQuotaStart(
		worklogClientQuota$: WorklogObservable
	) {
		worklogClientQuota$
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				//Filter out the the affects ids to only the ones we are looking at
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryClientQuotas),
					worklog.workAffectedIds
				);

				//If we have any ids matching then we will go and get them
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.clientQuotaByIdsFetch(matchingCachedItemIds),
						true
					);
				}
			});
	}

	/**
	 * Begins observation of Work Logs for : Product Class
	 */
	private worklogStoreUpdateProductClassesStart(
		worklogProductClass$: WorklogObservable
	) {
		worklogProductClass$.observable().subscribe((worklog) => {
			//We will only go and get the product class index's if they have already been set
			if (StoreAccess.dataGet(aHubStateTemporaryProductClassIndexes) !== null) {
				StoreAccess.dispatch(
					AHubActions.productClassIndexsByClientIdFetch(),
					true
				);
			}
		});

		worklogProductClass$
			.workActionUpdate()
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryProductClassList),
					worklog.workAffectedIds
				);

				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.productClassesFetch(matchingCachedItemIds),
						true
					);
				}
			});

		worklogProductClass$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.productClassesRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : Product Property
	 */
	private worklogStoreUpdateProductPropertyStart(
		worklogProductProperty$: WorklogObservable
	) {
		worklogProductProperty$.observable().subscribe((worklog) => {
			//We will only go and get the product property index's if they have already been set
			if (
				StoreAccess.dataGet(aHubStateTemporaryProductPropertyIndexes) !== null
			) {
				StoreAccess.dispatch(AHubActions.productPropertyIndexsFetch(), true);
			}
		});

		worklogProductProperty$
			.workActionUpdate()
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryProductPropertyList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.productPropertyFetch(matchingCachedItemIds),
						true
					);
				}
			});

		worklogProductProperty$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.productPropertyRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : Product Property section
	 */
	private worklogStoreUpdateProductPropertySectionStart(
		worklogProductPropertySections$: WorklogObservable
	) {
		worklogProductPropertySections$.observable().subscribe((worklog) => {
			//We will only go and get the product property sections index's if they have already been set
			if (
				StoreAccess.dataGet(aHubStateTemporaryProductPropertySectionIndexes) !==
				null
			) {
				StoreAccess.dispatch(
					AHubActions.productPropertySectionIndexsFetch(),
					true
				);
			}
		});

		worklogProductPropertySections$
			.workActionUpdate()
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryProductPropertySectionList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.productPropertySectionsFetch(
							StoreAccess.dataGet(sessionClientId),
							matchingCachedItemIds
						),
						true
					);
				}
			});

		worklogProductPropertySections$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.productPropertySectionsRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : Product Property Allocation
	 */
	private worklogStoreUpdateProductPropertyAllocationStart(
		worklogProductPropertyAllocation$: WorklogObservable
	) {
		worklogProductPropertyAllocation$
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				// We will only go and get the extract definition index's if they have already been set.
				if (
					StoreAccess.dataGet(
						aHubStateTemporaryProductPropertyAllocationIndexes
					) !== null
				) {
					StoreAccess.dispatch(
						AHubActions.productPropertyAllocationsFetch(),
						true
					);
				}
			});
	}

	/**
	 * Begins observation of Work Logs for : Product Property Allocation chain.
	 */
	private worklogStoreUpdateProductPropertyAllocationChainStart(
		worklogProductPropertyAllocationChain$: WorklogObservable
	) {
		worklogProductPropertyAllocationChain$
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				// We will only go and get the product property allocation chain index's if they have already been set.
				if (
					StoreAccess.dataGet(
						aHubStateTemporaryProductPropertyAllocationChainIndexes
					) !== null
				) {
					StoreAccess.dispatch(
						AHubActions.productPropertyAllocationChainIndexsFetch(),
						true
					);
				}
			});

		//Watch the worklogs for updates
		worklogProductPropertyAllocationChain$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(
						aHubStateTemporaryProductPropertyAllocationChains
					),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.productPropertyAllocationChainsFetch(
							matchingCachedItemIds
						),
						true
					);
				}
			});

		//Watch the worklog for chain remove
		worklogProductPropertyAllocationChain$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.productPropertyAllocationChainsRemove(
						worklog.workAffectedIds
					),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : Extract definitions
	 */
	private worklogStoreUpdateExtractDefinitionStart(
		worklogExtractDefinitions$: WorklogObservable
	) {
		// If the extract definitions have changed we will update the extract definition indexes.
		worklogExtractDefinitions$.observable().subscribe((worklog) => {
			// We will only go and get the extract definition index's if they have already been set.
			if (
				StoreAccess.dataGet(aHubStateTemporaryExtractDefinitionIndexes) !== null
			) {
				StoreAccess.dispatch(AHubActions.extractDefinitionIndexesFetch(), true);
			}
		});

		// Watch the worklogs for updates.
		worklogExtractDefinitions$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryExtractDefinitionList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.extractDefinitionsByIdFetch(matchingCachedItemIds),
						true
					);
				}
			});

		// Watch the worklog for extract definition remove.
		worklogExtractDefinitions$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.extractDefinitionsRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : Extracts
	 */
	private worklogStoreUpdateExtractStart(worklogExtracts$: WorklogObservable) {
		// If the extracts have changed we will update the extract indexes.
		worklogExtracts$.observable().subscribe((worklog) => {
			// We will only go and get the extract index's if they have already been set.
			if (StoreAccess.dataGet(aHubStateTemporaryExtractIndexes) !== null) {
				StoreAccess.dispatch(AHubActions.extractIndexesFetch(), true);
			}
		});

		// Watch the worklogs for updates.
		worklogExtracts$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryExtractList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.extractsByIdFetch(matchingCachedItemIds),
						true
					);
				}
			});

		// Watch the worklog for extract remove.
		worklogExtracts$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.extractsRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Watch for changes to the extract's contents.
	 *
	 * @param worklogExtractContents$        The worklogs for work on extract contents.
	 */
	private worklogStoreUpdateExtractContentsStart(
		worklogExtractContents$: WorklogObservable
	) {
		// Watch the worklogs for updates.
		worklogExtractContents$
			.workActionUpdate()
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				// If the currently selected extract is being updated, drop and re-fetch the products
				const currentlySelectedExtractId = StoreAccess.dataGet(
					viewLibrarySelectedExtractId
				);
				if (
					worklog.workAffectedIds.indexOf(currentlySelectedExtractId) !== -1
				) {
					StoreAccess.dispatch(
						AHubActions.extractProductsFetch(currentlySelectedExtractId),
						true
					);
				}
			});
	}

	/**
	 * Begins observation of Work Logs for : (Extract) Products
	 */
	private worklogStoreUpdateProductStart(worklogProducts$: WorklogObservable) {
		// Watch for an overflowed worklog for a product
		worklogProducts$
			.onlyComplete()
			.observable()
			.pipe(filter((worklog) => worklog.hasOverflowed))
			.subscribe((worklog) => {
				//Get the currently list of extracts for a product
				const currentExtractIdsWithProducts = StoreAccess.dataGet(
					aHubStateTemporaryExtractProductsKeys
				);

				//Do we have current extract id's with product data
				if (currentExtractIdsWithProducts) {
					currentExtractIdsWithProducts.forEach((extractId) => {
						StoreAccess.dispatch(
							AHubActions.extractProductsFetch(extractId),
							true
						);
					});
				}

				//Get the currently list of data set categories with products for a product
				const currentDataSetCategoryIdsWithProducts = StoreAccess.dataGet(
					aHubStateTemporaryDataSetCategoryProductsKeys
				);

				//Do we have current extract id's with product data
				if (currentDataSetCategoryIdsWithProducts) {
					currentDataSetCategoryIdsWithProducts.forEach((dataSetCatId) =>
						StoreAccess.dispatch(
							AHubActions.dataSetCategoryProductsFetch(dataSetCatId),
							true
						)
					);
				}
			});

		// Watch the worklogs for product adds.
		worklogProducts$
			.workActionAdd()
			.onlyComplete()
			.observable()
			.pipe(
				filter((worklog) => !worklog.hasOverflowed),
				filter(
					(worklog) =>
						worklog.workAffectedIds && worklog.workAffectedIds.length > 0
				)
			)
			.subscribe((worklog) => {
				// ---- Extract Products ----

				//Get the currently list of extracts for a product
				const currentExtractIdsWithProducts = StoreAccess.dataGet(
					aHubStateTemporaryExtractProductsKeys
				);

				//Do we have current extract id's with product data
				if (currentExtractIdsWithProducts) {
					currentExtractIdsWithProducts.forEach((extractId) =>
						StoreAccess.dispatch(
							AHubActions.extractProductsFetchByIds(
								extractId,
								worklog.workAffectedIds
							),
							true
						)
					);
				}

				// ---- Data Set Category Products ----

				//Get the currently list of data set categories with products for a product
				const currentDataSetCategoryIdsWithProducts = StoreAccess.dataGet(
					aHubStateTemporaryDataSetCategoryProductsKeys
				);

				//Do we have current extract id's with product data
				if (currentDataSetCategoryIdsWithProducts) {
					currentDataSetCategoryIdsWithProducts.forEach((dataSetCatId) =>
						StoreAccess.dispatch(
							AHubActions.dataSetCategoryProductsFetchByIds(
								dataSetCatId,
								worklog.workAffectedIds
							),
							true
						)
					);
				}
			});

		// Watch the worklogs for product updates.
		worklogProducts$
			.workActionUpdate()
			.onlyComplete()
			.observable()
			.pipe(
				filter((worklog) => !worklog.hasOverflowed),
				filter(
					(worklog) =>
						worklog.workAffectedIds && worklog.workAffectedIds.length > 0
				)
			)
			.subscribe((worklog) => {
				// ---- Extract Products ----

				//Get the currently list of extracts for a product
				const currentExtractIdsWithProducts = StoreAccess.dataGet(
					aHubStateTemporaryExtractProductsKeys
				);

				//Do we have current extract id's with product data
				if (currentExtractIdsWithProducts) {
					//Loop through each of the products which we have data for
					currentExtractIdsWithProducts.forEach((extractId) => {
						//OK well we are looking at an extract lets go an get these products in the context of this extract ( they may not fit at all! )
						//we will send the list without knowing if we have them or not due to the fact that the the products affect could be in or out of this
						//extract depending on extract rules ... etc
						StoreAccess.dispatch(
							AHubActions.extractProductsFetchByIds(
								extractId,
								worklog.workAffectedIds
							),
							true
						);
						StoreAccess.dispatch(
							AHubActions.extractProductAssetUrlsFetch(
								extractId,
								worklog.workAffectedIds
							),
							true
						);
					});
				}

				// ---- Data Set Category Products ----

				//Get the currently list of data set categories with products for a product
				const currentDataSetCategoryIdsWithProducts = StoreAccess.dataGet(
					aHubStateTemporaryDataSetCategoryProductsKeys
				);

				//Do we have current extract id's with product data
				if (currentDataSetCategoryIdsWithProducts) {
					currentDataSetCategoryIdsWithProducts.forEach((dataSetCatId) =>
						StoreAccess.dispatch(
							AHubActions.dataSetCategoryProductsFetchByIds(
								dataSetCatId,
								worklog.workAffectedIds
							),
							true
						)
					);
				}
			});

		// Watch the worklog for product remove.
		worklogProducts$
			.workActionDelete()
			.onlyComplete()
			.observable()
			.pipe(
				filter((worklog) => !worklog.hasOverflowed),
				filter(
					(worklog) =>
						worklog.workAffectedIds && worklog.workAffectedIds.length > 0
				)
			)
			.subscribe((worklog) => {
				// ---- Extract Products ----

				//Get the currently list of extracts for a product
				const currentExtractIdsWithProducts = StoreAccess.dataGet(
					aHubStateTemporaryExtractProductsKeys
				);

				//Do we have current extract id's with product data
				if (currentExtractIdsWithProducts) {
					//Loop through each of the extracts we have product data for removing these products from each of them
					currentExtractIdsWithProducts.forEach((extractId) => {
						//Call for a removal of the affected products set ... we will send the list without knowing if we have them or not
						//as it will be a larger overhead to get the list ( which is a duplicated object ).
						StoreAccess.dispatch(
							AHubActions.extractProductsRemoveByIds(
								extractId,
								worklog.workAffectedIds
							)
						);
					});
				}

				// ---- Data Set Category Products ----

				//Get the currently list of data set categories with products for a product
				const currentDataSetCategoryIdsWithProducts = StoreAccess.dataGet(
					aHubStateTemporaryDataSetCategoryProductsKeys
				);

				//Do we have current extract id's with product data
				if (currentDataSetCategoryIdsWithProducts) {
					//Loop through each of the data set categorys we have product data for removing these products from each of them
					currentDataSetCategoryIdsWithProducts.forEach((dataSetCatId) =>
						StoreAccess.dispatch(
							AHubActions.dataSetCategoryProductsRemove(
								dataSetCatId,
								worklog.workAffectedIds
							)
						)
					);
				}
			});
	}

	/**
	 * Begins observation of Work Logs for : Extract definitions
	 */
	private worklogStoreUpdateClientLibraryVersionStart(
		worklogClientLibraryVersion$: WorklogObservable
	) {
		// If the extract definitions have changed we will update the extract definition indexes.
		worklogClientLibraryVersion$
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				// We will only go and get the extract definition index's if they have already been set.
				if (
					StoreAccess.dataGet(aHubStateTemporaryClientLibraryVersionIndexs) !==
					null
				) {
					StoreAccess.dispatch(
						AHubActions.clientLibraryVersionIndexFetch(),
						true
					);
				}

				// Get the list of matching cached ids that we have in the store.
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(
						aHubStateTemporaryClientLibraryVersionModelUrlsList
					),
					worklog.workAffectedIds
				);

				// Now make a request for each ones model again.
				if (matchingCachedItemIds.length > 0) {
					matchingCachedItemIds.forEach((id) =>
						StoreAccess.dispatch(
							AHubActions.clientLibraryVersionModelUrlFetch(id),
							true
						)
					);
				}
			});

		// Watch the worklogs for updates.
		worklogClientLibraryVersion$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryClientLibraryVersionList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.clientLibraryVersionFetchByVersion(
							matchingCachedItemIds
						),
						true
					);
				}
			});
	}

	/**
	 * Start the monitoring of the client libraries.
	 *
	 * @param worklogClientLibrary$
	 */
	private worklogStoreUpdateClientLibraryStart(
		worklogClientLibrary$: WorklogObservable
	) {
		//If the client libraries have changed we will update the client library indexes
		//Because the client libraries contain state in there indexes, any changes to the client libraries
		//we will refetch the index list. This should;nt be an issue as teh list is light and will only contains a handle of libraries
		//per client.
		worklogClientLibrary$.observable().subscribe((worklog) => {
			StoreAccess.dispatch(AHubActions.clientLibraryIndexesFetch(), true);
		});

		// Watch the worklogs for updates to specific client libraries.
		worklogClientLibrary$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryClientLibrariesList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.clientLibrariesByIdFetch(matchingCachedItemIds),
						true
					);
				}
			});

		// Watch the worklog for extract definition remove.
		worklogClientLibrary$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.clientLibrariesRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : Exporters
	 */
	private worklogStoreUpdateExporterStart(
		worklogExporters$: WorklogObservable
	) {
		// If the extracts have changed we will update the exporter indexes.
		worklogExporters$.observable().subscribe((worklog) => {
			// We will only go and get the exporter index's if they have already been set.
			if (StoreAccess.dataGet(aHubStateTemporaryExporterIndexes) !== null) {
				StoreAccess.dispatch(AHubActions.exporterIndexesFetch(), true);
			}
		});

		// Watch the worklogs for updates.
		worklogExporters$
			.workActionUpdate()
			.observable()
			.subscribe((worklog) => {
				//Does it match one we are watching?
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryExporterList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.exportersByIdFetch(matchingCachedItemIds),
						true
					);
				}
			});

		// Watch the worklog for exporter remove.
		worklogExporters$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.exportersRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Start the monitoring of exporter build histories.
	 *
	 * @param worklogExporterBuildHistory$        The observable work log to watch for changes on.
	 */
	private worklogStoreUpdateExporterBuildHistoryStart(
		worklogExporterBuildHistory$: WorklogObservable
	) {
		// If the extracts have changed we will update the exporter indexes.
		worklogExporterBuildHistory$.observable().subscribe((worklog) => {
			//Get the list of keys which we have for these items
			const buildHistoryItems = StoreAccess.dataGet(
				aHubStateTemporaryExporterBuildHistoryIndexMap
			).keys;

			//Loop through work affected ids and if we are looking at any of the build indexes we will update the data
			buildHistoryItems.forEach((id) =>
				StoreAccess.dispatch(
					AHubActions.exporterBuildHistoryIndexesFetch(Number(id)),
					true
				)
			);
		});

		// Watch the worklogs for updates.
		worklogExporterBuildHistory$
			.workActionUpdate()
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryExporterBuildHistorys),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.exporterBuildHistorysByIdsFetch(matchingCachedItemIds),
						true
					);
				}
			});

		// Watch the worklog for exporter build histories remove.
		worklogExporterBuildHistory$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.exporterBuildHistoryRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * Begins observation of Work Logs for : Resource packs
	 */
	private worklogStoreUpdateResourcePackStart(
		worklogResourcePacks$: WorklogObservable
	) {
		// If the resource packs have changed we will update the resource packs indexes.
		worklogResourcePacks$.observable().subscribe((worklog) => {
			// We will only go and get the resource packs index's if they have already been set.
			if (StoreAccess.dataGet(aHubStateTemporaryResourcePackIndexes) !== null) {
				StoreAccess.dispatch(AHubActions.resourcePackIndexesFetch(), true);
			}
		});

		// Watch the worklogs for updates.
		worklogResourcePacks$
			.workActionUpdate()
			.onlyComplete()
			.observable()
			.subscribe((worklog) => {
				const matchingCachedItemIds = ListUtil.listDataFilterIdOnExists(
					StoreAccess.dataGet(aHubStateTemporaryResourcePackList),
					worklog.workAffectedIds
				);
				if (matchingCachedItemIds.length > 0) {
					StoreAccess.dispatch(
						AHubActions.resourcePacksByIdFetch(matchingCachedItemIds),
						true
					);
				}
			});

		// Watch the worklog for resource pack remove.
		worklogResourcePacks$
			.workActionDelete()
			.observable()
			.subscribe((worklog) =>
				StoreAccess.dispatch(
					AHubActions.resourcePacksRemove(worklog.workAffectedIds),
					true
				)
			);
	}

	/**
	 * --------------------------------------------------------------------
	 * Work log Observable streams
	 * --------------------------------------------------------------------
	 */

	/**
	 * Create a worklog observable which recives only worklogs that are currently running.
	 */
	worklogObservableProgress(): WorklogObservable {
		// Return the running observable worklog stream.
		return this.worklogInProgress$;
	}

	/**
	 * Create a worklog observable which recives only worklogs that have completed successfully
	 */
	worklogObservableSuccess(): WorklogObservable {
		// Return the successful observable worklog stream.
		return this.worklogSuccess$;
	}

	/**
	 * Create a worklog observable which recives only worklogs that have completed  ( may have failed )
	 */
	worklogObservableComplete(): WorklogObservable {
		// Return the complete observable worklog stream.
		return this.worklogComplete$;
	}

	/**
	 * Create a worklog observable which recieves only worklogs that have not failed.
	 */
	worklogObservableNonFailed(): WorklogObservable {
		return this.worklogNonFailed$;
	}

	/**
	 * Return all worklogs.
	 */
	worklogObservableAll(): WorklogObservable {
		return this.worklogAll$;
	}

	/**
	 * Returns a worklog observable for the action id supplied. This will supply back all
	 * worklogs related to this action. This stream will complete soon after it recives its first
	 * complete worklogs as a result not reliable if worklogs complete seperatly
	 *
	 * @param actionId    Id of the action we want to monitor
	 */
	worklogByActionId(actionId: number, throwError = false): WorklogObservable {
		//Get an observable for the current action id
		const actionObservable: Observable<RequestActionStatusVO> =
			this.requestActionStatusObservableByActionId(actionId);

		//Get all the worklogs
		const worklogObservables: WorklogObservable = this.worklogObservableAll();

		//Create a combine latest, which will fire once when the action is added. As a result we will know the
		//wfid. We will then use this in combination with the worklog observables to see its progress
		//We need the worklogs to start with no data so that we can detect if an action fails ... at this point
		//there may not be any worklogs so start with none and we will filter them out before we consider them!
		const worklogsForAction = combineLatest([
			worklogObservables.observable().pipe(startWith(undefined)),
			actionObservable.pipe(first()),
		]).pipe(
			tap(([worklog, action]) => {
				//If we have a fault with the request throw an error in this stream which we should watch for
				//this is a request error not a worklog error
				if (action.fault && throwError) {
					throw new Error(`Request failed: ${action.error}`);
				}
			}),
			filter(([a, b]) => a !== undefined && b !== undefined),
			filter(
				([log, action]) => action.workflowReference == log.workflowExecutionId
			),
			map(([log, action]) => log)
		);

		//OK we want to complete the stream once the worklog is complete. So we will add a take untill onto the stream
		//but we want to see the complete worklogs which will appear automatically, we are achiving this by giving
		//the stream a short window to push out complete worklogs through the stream before it finishes.
		const worklogsForActionWithCompletion = worklogsForAction.pipe(
			takeUntil(
				worklogsForAction.pipe(
					filter((log) => log.complete),
					delay(1000)
				)
			)
		);

		//Return the worklog with this action. This will complete once we recive some complete worklogs
		return new WorklogObservable(worklogsForActionWithCompletion);
	}

	/**
	 * An observable that is fired when the worklogs have been interrupted for
	 * so long that worklogs may be missed. This could result in the page and
	 * portal cached materials being out of sync with the ahub system
	 * returns the time gap that has passed which represents the itteruption.
	 */
	worklogInterupted(): Observable<number> {
		// We monitor the worklogs for thier recieved time, if the gap in the recieved times exceed 10 minutes,
		// then there is a change we have missed essential owrklogs as the aHub only returns upto the last
		// 10 mins worth of logs.
		return StoreAccess.dataGetObvs(aHubStatePermanentWorklogSegmentLast).pipe(
			map((worklogSegmentAHubVO) => {
				return {
					prev: worklogSegmentAHubVO.worklogLastestRecievedTime,
					curr: worklogSegmentAHubVO.worklogLastestRecievedTime,
				};
			}), // extract recieved time.
			startWith({ prev: new Date(), curr: new Date() }), // Starting with a pair of now dates
			scan((a, b) => {
				return { prev: a.curr, curr: b.curr };
			}), // keep the previous and next dates so we can compare them.
			filter(
				(requestTimingPair) =>
					requestTimingPair.curr !== undefined &&
					requestTimingPair.curr !== null &&
					requestTimingPair.prev !== undefined &&
					requestTimingPair.prev !== null
			),
			filter(
				(requestTimingPair) =>
					requestTimingPair.curr.valueOf() - requestTimingPair.prev.valueOf() >
					10 * RequestActionMonitorService.MINIUTE_MS
			), // If the difference in time exceeds the 10 min window, emit.
			map(
				(requestTimingPair) =>
					requestTimingPair.curr.valueOf() - requestTimingPair.prev.valueOf()
			)
		); // We'll return the difference in milliseconds.
	}

	/**
	 * --------------------------------------------------------------------
	 * Request Action Status Observables
	 * --------------------------------------------------------------------
	 */

	/**
	 * Create a request action status observable, that filters on a particular action Id.
	 */
	requestActionStatusObservableByActionId(
		actionId: number
	): Observable<RequestActionStatusVO> {
		//Filter the actions down to actions with the specified id
		return this.requestActionStatusFlat$.pipe(
			filter(
				(requestActionStatusVO) => requestActionStatusVO.actionId === actionId
			)
		);
	}

	/**
	 * Create a request action status observable which will show if an action is busy or not. The likly status of
	 * the stream is the value will be  false -> true -> false.
	 * (Before request action has been logged ) -> (Item logged but incomplete) -> (Item has completed)
	 * You want to stop subscribing before the last point !
	 */
	requestActionStatusObservableByActionIdBusy(
		actionId: number
	): Observable<boolean> {
		//Get the statues from the store
		return this.requestActionStatus$.pipe(
			map((requestActionStatuses) =>
				requestActionStatuses.find(
					(requestActionStatusVO) => requestActionStatusVO.actionId == actionId
				)
			),
			map(
				(requestActionStatusVO) =>
					requestActionStatusVO !== undefined &&
					!requestActionStatusVO.fault &&
					requestActionStatusVO.status !== RequestActionStatusEnum.COMPLETED
			)
		);
	}

	/**
	 * Create a request action error observable which will return an error if there is one on the action id provided.
	 *
	 * @param actionId        The action id to monitor.
	 */
	requestActionErrorObservableByActionIdBusy(
		actionId: number
	): Observable<string> {
		//Get the statues from the store
		return this.requestActionStatus$.pipe(
			map((requestActionStatuses) =>
				requestActionStatuses.find(
					(requestActionStatusVO) => requestActionStatusVO.actionId === actionId
				)
			),
			map((requestActionStatusVO) =>
				requestActionStatusVO !== undefined && requestActionStatusVO.error
					? requestActionStatusVO.error
					: ""
			)
		);
	}

	/**
	 * Create a request action status observable which reports actions that fault.
	 */
	requestActionStatusObservableFaults(): Observable<RequestActionStatusVO> {
		//Filter the actions down to actions with which have failed
		return this.requestActionStatusFlat$.pipe(
			filter((actionStatus) => actionStatus.fault)
		);
	}

	/**
	 * Get the request action status upload as an observable
	 */
	requestActionStatusUploadObservableByWF(
		workflowId: string
	): Observable<RequestActionStatusUploadVO> {
		return this.requestActionStatus$.pipe(
			mergeMap((action) => action),
			filter((action) => action.workflowReference === workflowId),
			map((action) => action.upload)
		);
	}

	/**
	 * Get the request action status upload as an observable
	 */
	requestActionStatusUploadObservableByAction(
		actionId: number
	): Observable<RequestActionStatusUploadVO> {
		return this.requestActionStatus$.pipe(
			mergeMap((action) => action),
			filter((action) => action.actionId === actionId),
			map((action) => action.upload)
		);
	}
}
