//
// Copyright 2014-2015 Amazon.com,
// Inc. or its affiliates. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//
using Amazon.CognitoIdentity;
using Amazon.CognitoSync.SyncManager.Internal;
using Amazon.Runtime.Internal;
using Amazon.Runtime.Internal.Util;
using Logger = Amazon.Runtime.Internal.Util.Logger;
using Amazon.Util.Internal.PlatformServices;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Globalization;
#if BCL45
using System.Threading.Tasks;
using System.Runtime.ExceptionServices;
#endif
namespace Amazon.CognitoSync.SyncManager
{
///
/// Dataset is the container of s. It can have up to 1k
/// or 1 MB in size. A typical use of Dataset is the
/// following.
///
///
/// //open or create dataset
/// Dataset dataset = cognitoSyncManger.OpenOrCreateDataset("new dataset");
/// // synchronize. It pulls down latest changes from remote storage
/// // and push local changes to remote storage
/// dataset.Synchronize();
/// // reads value
/// String highScore = dataset.Get("high_score");
/// String name = dataset.Get("name");
/// // sets value
/// dataset.Put("high_score", "90");
/// dataset.Put("name", "John");
/// // push changes to remote if needed
/// dataset.Synchronize();
///
///
/// Amazon Cognito Sync Dev. Guide - Understanding Datasets
public partial class Dataset : IDisposable
{
///
/// Max number of retries during synchronize before it gives up.
///
private const int MAX_RETRY = 3;
///
/// The name of the dataset
///
protected string DatasetName
{
get
{
return this._datasetName;
}
}
private string _datasetName;
///
/// Instance of
///
protected ILocalStorage Local
{
get
{
return this._local;
}
}
private ILocalStorage _local;
private CognitoSyncStorage _remote;
///
/// Instance of
///
protected CognitoSyncStorage Remote
{
get
{
return this._remote;
}
}
///
/// Instance of
///
protected CognitoAWSCredentials CognitoCredentials
{
get
{
return this._cognitoCredentials;
}
}
private CognitoAWSCredentials _cognitoCredentials;
private Boolean waitingForConnectivity = false;
private bool _disposed;
private Logger _logger;
#region constructor
///
/// Creates a new Dataset
///
/// The name of the dataset
/// The Cognito Credentials associated with the dataset
/// Local storage, can be InMemoryStorage or SQLiteStorage or Some Custom Storage Class which implements
/// Remote storage
internal Dataset(string datasetName, CognitoAWSCredentials cognitoCredentials, ILocalStorage local, CognitoSyncStorage remote)
{
this._datasetName = datasetName;
this._cognitoCredentials = cognitoCredentials;
this._local = local;
this._remote = remote;
_logger = Logger.GetLogger(this.GetType());
DatasetSetupInternal();
}
#endregion
#region dispose methods
///
/// Releases the resources consumed by this object
///
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
#region public methods
///
/// Retrieves the associated from local storage.
///
/// The metadata for the Dataset.
public DatasetMetadata Metadata
{
get
{
return Local.GetDatasetMetadata(IdentityId, DatasetName);
}
}
internal List LocalMergedDatasets
{
get
{
List mergedDatasets = new List();
string prefix = DatasetName + ".";
foreach (DatasetMetadata dataset in Local.GetDatasetMetadata(IdentityId))
{
if (dataset.DatasetName.StartsWith(prefix, StringComparison.OrdinalIgnoreCase))
{
mergedDatasets.Add(dataset.DatasetName);
}
}
return mergedDatasets;
}
}
///
/// Delete this . You cannot do any more operations
/// on this dataset.
///
public void Delete()
{
Local.DeleteDataset(IdentityId, DatasetName);
}
///
/// Gets the value of a with the given key. If the
/// doesn't exist or is marked deleted, null will be returned.
///
/// Key of the record in the dataset.
/// Amazon Cognito Sync Dev. Guide - Reading and Writing Data
public string Get(string key)
{
return Local.GetValue(IdentityId, DatasetName,
DatasetUtils.ValidateRecordKey(key));
}
///
/// Gets the with the given key. If the
/// doesn't exist or is marked deleted, null will be returned.
///
/// Key of the record in the dataset.
public Record GetRecord(string key)
{
return Local.GetRecord(IdentityId, DatasetName,
DatasetUtils.ValidateRecordKey(key));
}
///
/// Gets the Key/Value representation of all records of this dataset. Datasets marked
/// as deleted records are excluded.
///
/// Key/Value representation of all records, excluding deleted ones
public IDictionary ActiveRecords
{
get
{
IDictionary map = new Dictionary();
foreach (Record record in Local.GetRecords(IdentityId, DatasetName))
{
if (!record.IsDeleted)
{
map.Add(record.Key, record.Value);
}
}
return map;
}
}
///
/// Retrieves all raw records, including those marked as deleted, from local storage.
///
/// List of all raw records
public IList Records
{
get
{
return Local.GetRecords(IdentityId, DatasetName);
}
}
///
/// Gets the size of a record with the given key. If the key is deleted, -1
/// will be returned.
/// The size is calculated as sum of UTF-8 string length of record key and value
///
/// The size in bytes.
/// The key of a record
public long GetSizeInBytes(string key)
{
return DatasetUtils.ComputeRecordSize(Local.GetRecord(IdentityId,
DatasetName, DatasetUtils.ValidateRecordKey(key)));
}
///
/// Gets the total size in bytes of this dataset. Records that are marked as
/// deleted don't contribute to the total size.
/// The size is calculated as sum of UTF-8 string length of key and value for all the records
///
/// The total size in bytes
public long TotalSizeInBytes
{
get
{
long size = 0;
foreach (Record record in Local.GetRecords(IdentityId, DatasetName))
{
size += DatasetUtils.ComputeRecordSize(record);
}
return size;
}
}
///
/// Retrieves the status of a record.
///
/// true if it is modified locally; otherwise, false.
/// Key identifying a record
public bool IsModified(string key)
{
Record record = Local.GetRecord(IdentityId, DatasetName,
DatasetUtils.ValidateRecordKey(key));
return (record != null && record.IsModified);
}
///
/// Puts a with the given key and value into the
/// Dataset. If a with the same key exists, its value
/// will be overwritten. If a is marked as deleted previously,
/// then it will be resurrected with new value while the sync count continues
/// with previous value. No matter whether the value changes or not, the
/// record is considered as updated, and it will be written to Cognito Sync
/// service on next synchronize operation. If value is null, a
/// ArgumentNullException will be thrown.
///
/// Key of the record
/// String value of a to be put into the
///
/// Amazon Cognito Sync Dev. Guide - Reading and Writing Data
public void Put(string key, string value)
{
Local.PutValue(IdentityId, DatasetName,
DatasetUtils.ValidateRecordKey(key), value);
}
///
/// Populates a dataset with a dictionary of key/value pairs
///
/// An IDictionary of key/value pairs
public void PutAll(IDictionary values)
{
foreach (string key in values.Keys)
{
DatasetUtils.ValidateRecordKey(key);
}
Local.PutAllValues(IdentityId, DatasetName, values);
}
///
/// Marks a with the given key as deleted. Nothing happens if
/// the doesn't exist or is deleted already.
///
/// Key identifying the Record
/// Amazon Cognito Sync Dev. Guide - Reading and Writing Data
public void Remove(string key)
{
Local.PutValue(IdentityId, DatasetName, DatasetUtils.ValidateRecordKey(key), null);
}
///
/// Saves resolved conflicting into local storage. This is
/// used inside after you
/// resolve all conflicts.
///
/// A list of records to save into local storage
public void Resolve(List remoteRecords)
{
Local.PutRecords(IdentityId, DatasetName, remoteRecords);
}
#endregion
#region helper methods
bool locked = false;
bool queuedSync = false;
private void EndSynchronizeAndCleanup()
{
locked = false;
if (queuedSync)
{
queuedSync = false;
}
}
#if BCL35
private void RunSyncOperation(int retry)
#else
private async Task RunSyncOperationAsync(int retry, CancellationToken cancellationToken)
#endif
{
long lastSyncCount = Local.GetLastSyncCount(IdentityId, DatasetName);
#if !(BCL35)
ExceptionDispatchInfo capturedException = null;
#endif
// if dataset is deleted locally, push it to remote
if (lastSyncCount == -1)
{
try
{
#if BCL35
Remote.DeleteDataset(DatasetName);
#else
await Remote.DeleteDatasetAsync(DatasetName, cancellationToken).ConfigureAwait(false);
#endif
}
catch (DatasetNotFoundException)
{
//Ignore the exception here, since the dataset was local only
}
catch (Exception e)
{
_logger.InfoFormat("{0} , dataset : {1}", e.Message, this.DatasetName);
EndSynchronizeAndCleanup();
FireSyncFailureEvent(e);
return;
}
Local.PurgeDataset(IdentityId, DatasetName);
_logger.InfoFormat("OnSyncSuccess: dataset delete is pushed to remote - {0}", this.DatasetName);
EndSynchronizeAndCleanup();
FireSyncSuccessEvent(new List());
return;
}
// get latest modified records from remote
_logger.InfoFormat("Get latest modified records since {0} for dataset {1}", lastSyncCount, this.DatasetName);
DatasetUpdates datasetUpdates = null;
try
{
#if BCL35
datasetUpdates = Remote.ListUpdates(DatasetName, lastSyncCount);
#else
datasetUpdates = await Remote.ListUpdatesAsync(DatasetName, lastSyncCount, cancellationToken).ConfigureAwait(false);
#endif
}
catch (Exception listUpdatesException)
{
_logger.Error(listUpdatesException, string.Empty);
EndSynchronizeAndCleanup();
FireSyncFailureEvent(listUpdatesException);
return;
}
if (datasetUpdates != null && datasetUpdates.MergedDatasetNameList.Count != 0 && this.OnDatasetMerged != null)
{
bool resume = this.OnDatasetMerged(this, datasetUpdates.MergedDatasetNameList);
if (resume)
{
if (retry == 0)
{
EndSynchronizeAndCleanup();
FireSyncFailureEvent(new SyncManagerException("Out of retries"));
}
else
{
#if BCL35
this.RunSyncOperation(--retry);
#else
await this.RunSyncOperationAsync(--retry, cancellationToken).ConfigureAwait(false);
#endif
}
return;
}
else
{
_logger.InfoFormat("OnSyncFailure: Manual Cancel");
EndSynchronizeAndCleanup();
FireSyncFailureEvent(new SyncManagerException("Manual cancel"));
return;
}
}
// if the dataset doesn't exist or is deleted, trigger onDelete
if (lastSyncCount != 0 && !datasetUpdates.Exists
|| datasetUpdates.Deleted && this.OnDatasetDeleted != null)
{
bool resume = this.OnDatasetDeleted(this);
if (resume)
{
// remove both records and metadata
Local.DeleteDataset(IdentityId, DatasetName);
Local.PurgeDataset(IdentityId, DatasetName);
_logger.InfoFormat("OnSyncSuccess");
EndSynchronizeAndCleanup();
FireSyncSuccessEvent(new List());
return;
}
else
{
_logger.InfoFormat("OnSyncFailure");
EndSynchronizeAndCleanup();
FireSyncFailureEvent(new SyncManagerException("Manual cancel"));
return;
}
}
lastSyncCount = datasetUpdates.SyncCount;
List remoteRecords = datasetUpdates.Records;
if (remoteRecords.Count != 0)
{
// if conflict, prompt developer/user with callback
List conflicts = new List();
List conflictRecords = new List();
foreach (Record remoteRecord in remoteRecords)
{
Record localRecord = Local.GetRecord(IdentityId,
DatasetName,
remoteRecord.Key);
// only when local is changed and its value is different
if (localRecord != null && localRecord.IsModified
&& !StringUtils.Equals(localRecord.Value, remoteRecord.Value))
{
conflicts.Add(new SyncConflict(remoteRecord, localRecord));
conflictRecords.Add(remoteRecord);
}
}
// retaining only non-conflict records
remoteRecords.RemoveAll(t => conflictRecords.Contains(t));
if (conflicts.Count > 0)
{
_logger.InfoFormat("{0} records in conflict!", conflicts.Count);
bool syncConflictResult = false;
if (this.OnSyncConflict == null)
{
// delegate is not implemented so the conflict resolution is applied
syncConflictResult = this.ResolveConflictsWithDefaultPolicy(conflicts);
}
else
{
syncConflictResult = this.OnSyncConflict(this, conflicts);
}
if (!syncConflictResult)
{
_logger.InfoFormat("User cancelled conflict resolution");
EndSynchronizeAndCleanup();
FireSyncFailureEvent(new OperationCanceledException("User cancelled conflict resolution"));
return;
}
}
// save to local
if (remoteRecords.Count > 0)
{
_logger.InfoFormat("Save {0} records to local", remoteRecords.Count);
Local.PutRecords(IdentityId, DatasetName, remoteRecords);
}
// new last sync count
_logger.InfoFormat("Updated sync count {0}", datasetUpdates.SyncCount);
Local.UpdateLastSyncCount(IdentityId, DatasetName,
datasetUpdates.SyncCount);
}
// push changes to remote
List localChanges = this.ModifiedRecords;
long minPatchSyncCount = lastSyncCount;
foreach (Record r in localChanges)
{
//track the max sync count
if (r.SyncCount < minPatchSyncCount)
{
minPatchSyncCount = r.SyncCount;
}
}
if (localChanges.Count != 0)
{
_logger.InfoFormat("Push {0} records to remote", localChanges.Count);
try
{
#if BCL35
List result = Remote.PutRecords(DatasetName, localChanges, datasetUpdates.SyncSessionToken);
#else
List result = await Remote.PutRecordsAsync(DatasetName, localChanges, datasetUpdates.SyncSessionToken, cancellationToken).ConfigureAwait(false);
#endif
// update local meta data
Local.ConditionallyPutRecords(IdentityId, DatasetName, result, localChanges);
// verify the server sync count is increased exactly by one, aka no
// other updates were made during this update.
long newSyncCount = 0;
foreach (Record record in result)
{
newSyncCount = newSyncCount < record.SyncCount
? record.SyncCount
: newSyncCount;
}
if (newSyncCount == lastSyncCount + 1)
{
_logger.InfoFormat("Updated sync count {0}", newSyncCount);
Local.UpdateLastSyncCount(IdentityId, DatasetName,
newSyncCount);
}
_logger.InfoFormat("OnSyncSuccess");
EndSynchronizeAndCleanup();
FireSyncSuccessEvent(remoteRecords);
return;
}
catch (DataConflictException e)
{
_logger.InfoFormat("Conflicts detected when pushing changes to remote: {0}", e.Message);
if (retry == 0)
{
EndSynchronizeAndCleanup();
FireSyncFailureEvent(e);
}
else
{
//it's possible there is a local dirty record with a stale sync count this will fix it
if (lastSyncCount > minPatchSyncCount)
{
Local.UpdateLastSyncCount(IdentityId, DatasetName, minPatchSyncCount);
}
#if BCL35
RunSyncOperation(--retry);
}
return;
}
#else
capturedException = ExceptionDispatchInfo.Capture(e);
}
}
#endif
catch (Exception e)
{
_logger.InfoFormat("OnSyncFailure {0}", e.Message);
EndSynchronizeAndCleanup();
FireSyncFailureEvent(e);
return;
}
#if !(BCL35)
if (capturedException != null)
await RunSyncOperationAsync(--retry, cancellationToken).ConfigureAwait(false);
return;
#endif
}
else
{
_logger.InfoFormat("OnSyncSuccess");
EndSynchronizeAndCleanup();
FireSyncSuccessEvent(remoteRecords);
return;
}
}
#if BCL35
private void SynchornizeInternal()
#else
private async Task SynchornizeInternalAsync(CancellationToken cancellationToken)
#endif
{
//make sure we have the latest identity id
try
{
#if BCL35
CognitoCredentials.GetIdentityId();
#else
await CognitoCredentials.GetIdentityIdAsync().ConfigureAwait(false);
#endif
bool resume = true;
List mergedDatasets = LocalMergedDatasets;
if (mergedDatasets.Count > 0)
{
_logger.InfoFormat("Detected merge datasets - {0}", DatasetName);
if (this.OnDatasetMerged != null)
{
resume = this.OnDatasetMerged(this, mergedDatasets);
}
}
if (!resume)
{
EndSynchronizeAndCleanup();
FireSyncFailureEvent(new OperationCanceledException(string.Format(CultureInfo.InvariantCulture, "Sync canceled on merge for dataset - {0}", this.DatasetName)));
return;
}
#if BCL35
RunSyncOperation(MAX_RETRY);
#else
await RunSyncOperationAsync(MAX_RETRY, cancellationToken).ConfigureAwait(false);
#endif
}
catch (Exception e)
{
EndSynchronizeAndCleanup();
FireSyncFailureEvent(e);
_logger.Error(e, "");
}
}
internal String IdentityId
{
get
{
return DatasetUtils.GetIdentityId(CognitoCredentials);
}
}
internal List ModifiedRecords
{
get
{
return Local.GetModifiedRecords(IdentityId, DatasetName);
}
}
#endregion
#region SynchronizeEvents
private EventHandler mOnSyncSuccess;
///
/// This is called after remote changes are downloaded to local storage
/// and local changes are uploaded to remote storage. Updated records
/// from remote storage are passed in the callback. If conflicts occur
/// during synchronize and are resolved in after
/// several retries, then updatedRecords will be what are pulled down
/// from remote in the last retry.
///
public event EventHandler OnSyncSuccess
{
add
{
lock (this)
{
mOnSyncSuccess += value;
}
}
remove
{
lock (this)
{
mOnSyncSuccess -= value;
}
}
}
private EventHandler mOnSyncFailure;
///
/// This is called when an exception occurs during sync
///
public event EventHandler OnSyncFailure
{
add
{
lock (this)
{
mOnSyncFailure += value;
}
}
remove
{
lock (this)
{
mOnSyncFailure -= value;
}
}
}
///
/// Fires a Sync Success Event
///
/// List of records after successful sync
protected void FireSyncSuccessEvent(List records)
{
if (mOnSyncSuccess != null)
{
mOnSyncSuccess(this, new SyncSuccessEventArgs(records));
}
}
///
/// Fires a Sync Failure event.
///
/// Exception object which caused the sync Failure
protected void FireSyncFailureEvent(Exception exception)
{
if (mOnSyncFailure != null)
{
mOnSyncFailure(this, new SyncFailureEventArgs(exception));
}
}
#endregion
#region SynchronizeDelegates
///
/// Delegate which is invoked when a sync conflict occurs
///
/// The data set which resulted in conflict
/// List of Objects which have conflicts
/// true if you want to retry synchronization, else false
public delegate bool SyncConflictDelegate(Dataset dataset, List conflicts);
///
/// Delegate which is invoked when a data set is deleted
///
/// The dataset which was deleted
/// true if you want to remove local dataset, or false if you want to
/// keep it
public delegate bool DatasetDeletedDelegate(Dataset dataset);
///
/// Delegate which is invoked when a dataset is merged due to an identity merge
///
/// The dataset which was merged, due to an identity merge
/// identity id's on which merge occured
///
public delegate bool DatasetMergedDelegate(Dataset dataset, List datasetNames);
///
/// This can be triggered during two phases. One is when the remote
/// changes are about to be written to local storage. The other is when
/// local changes are uploaded to remote storage and got rejected. Here
/// is an example:
///
///
/// playerInfo.OnSyncConflict = this.HandleSyncConflict;
///
/// private bool HandleSyncConflict(Dataset dataset, List<SyncConflict> conflicts)
/// {
/// List<Record> resolved = new List<Record>();
/// for (SyncConflict conflict in conflicts)
/// {
/// resolved.add(conflicts.resolveWithRemoteRecord());
/// }
/// dataset.Resolve(resolved);
/// return true;
/// }
///
///
public SyncConflictDelegate OnSyncConflict;
///
/// This is triggered when the given dataset is deleted remotely. Return
/// true if you want to remove local dataset, or false if you want to
/// keep it.
///
public DatasetDeletedDelegate OnDatasetDeleted;
///
/// If two or more datasets are merged as a result of identity merge,
/// this will be triggered. A list of names of merged datasets' is passed
/// in. The merged dataset name will be appended with its old identity
/// id. One can open the merged dataset, synchronize the content,
/// reconcile with the current dataset, and remove it. This callback will
/// fire off until the merged dataset is removed.
///
///
public DatasetMergedDelegate OnDatasetMerged;
///
/// Clears all the delegates
///
public void ClearAllDelegates()
{
if (mOnSyncSuccess != null)
foreach (Delegate d in mOnSyncSuccess.GetInvocationList())
OnSyncSuccess -= (EventHandler)d;
if (mOnSyncFailure != null)
foreach (Delegate d in mOnSyncFailure.GetInvocationList())
OnSyncFailure -= (EventHandler)d;
if (OnSyncConflict != null)
foreach (Delegate d in OnSyncConflict.GetInvocationList())
OnSyncConflict -= (SyncConflictDelegate)d;
if (OnDatasetDeleted != null)
foreach (Delegate d in OnDatasetDeleted.GetInvocationList())
OnDatasetDeleted -= (DatasetDeletedDelegate)d;
if (OnDatasetMerged != null)
foreach (Delegate d in OnDatasetMerged.GetInvocationList())
OnDatasetMerged -= (DatasetMergedDelegate)d;
}
#endregion
#region Default conflict resolution
internal bool ResolveConflictsWithDefaultPolicy(List conflicts)
{
List resolvedRecords = new List();
foreach (SyncConflict conflict in conflicts)
{
if (conflict.RemoteRecord == null || conflict.LocalRecord.LastModifiedDate.Value.CompareTo(conflict.RemoteRecord.LastModifiedDate.Value) > 0)
{
resolvedRecords.Add(conflict.ResolveWithLocalRecord());
}
else
{
resolvedRecords.Add(conflict.ResolveWithRemoteRecord());
}
}
this.Resolve(resolvedRecords);
return true;
}
#endregion
}
///
/// A sync success event
///
public class SyncSuccessEventArgs : EventArgs
{
///
/// List of updated records
///
public List UpdatedRecords { get; private set; }
internal SyncSuccessEventArgs(List updatedRecords)
{
this.UpdatedRecords = updatedRecords;
}
}
///
/// A sync failure event
///
public class SyncFailureEventArgs : EventArgs
{
///
/// Exception which triggered the failure event
///
public Exception Exception { get; private set; }
internal SyncFailureEventArgs(Exception exception)
{
this.Exception = exception;
}
}
}