| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552 | using System;using System.Threading;using UniRx.InternalUtil;namespace UniRx{    public static class Observer    {        internal static IObserver<T> CreateSubscribeObserver<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)        {            // need compare for avoid iOS AOT            if (onNext == Stubs<T>.Ignore)            {                return new Subscribe_<T>(onError, onCompleted);            }            else            {                return new Subscribe<T>(onNext, onError, onCompleted);            }        }        internal static IObserver<T> CreateSubscribeWithStateObserver<T, TState>(TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted)        {            return new Subscribe<T, TState>(state, onNext, onError, onCompleted);        }        internal static IObserver<T> CreateSubscribeWithState2Observer<T, TState1, TState2>(TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext,            Action<Exception, TState1, TState2> onError, Action<TState1, TState2> onCompleted)        {            return new Subscribe<T, TState1, TState2>(state1, state2, onNext, onError, onCompleted);        }        internal static IObserver<T> CreateSubscribeWithState3Observer<T, TState1, TState2, TState3>(TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext,            Action<Exception, TState1, TState2, TState3> onError, Action<TState1, TState2, TState3> onCompleted)        {            return new Subscribe<T, TState1, TState2, TState3>(state1, state2, state3, onNext, onError, onCompleted);        }        public static IObserver<T> Create<T>(Action<T> onNext)        {            return Create<T>(onNext, UniRx.Stubs.Throw, UniRx.Stubs.Nop);        }        public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError)        {            return Create<T>(onNext, onError, UniRx.Stubs.Nop);        }        public static IObserver<T> Create<T>(Action<T> onNext, Action onCompleted)        {            return Create<T>(onNext, UniRx.Stubs.Throw, onCompleted);        }        public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)        {            // need compare for avoid iOS AOT            if (onNext == Stubs<T>.Ignore)            {                return new EmptyOnNextAnonymousObserver<T>(onError, onCompleted);            }            else            {                return new AnonymousObserver<T>(onNext, onError, onCompleted);            }        }        public static IObserver<T> CreateAutoDetachObserver<T>(IObserver<T> observer, IDisposable disposable)        {            return new AutoDetachObserver<T>(observer, disposable);        }        class AnonymousObserver<T> : IObserver<T>        {            readonly Action<T> onNext;            readonly Action<Exception> onError;            readonly Action onCompleted;            int isStopped = 0;            public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)            {                this.onNext = onNext;                this.onError = onError;                this.onCompleted = onCompleted;            }            public void OnNext(T value)            {                if (isStopped == 0)                {                    onNext(value);                }            }            public void OnError(Exception error)            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onError(error);                }            }            public void OnCompleted()            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onCompleted();                }            }        }        class EmptyOnNextAnonymousObserver<T> : IObserver<T>        {            readonly Action<Exception> onError;            readonly Action onCompleted;            int isStopped = 0;            public EmptyOnNextAnonymousObserver(Action<Exception> onError, Action onCompleted)            {                this.onError = onError;                this.onCompleted = onCompleted;            }            public void OnNext(T value)            {            }            public void OnError(Exception error)            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onError(error);                }            }            public void OnCompleted()            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onCompleted();                }            }        }        // same as AnonymousObserver...        class Subscribe<T> : IObserver<T>        {            readonly Action<T> onNext;            readonly Action<Exception> onError;            readonly Action onCompleted;            int isStopped = 0;            public Subscribe(Action<T> onNext, Action<Exception> onError, Action onCompleted)            {                this.onNext = onNext;                this.onError = onError;                this.onCompleted = onCompleted;            }            public void OnNext(T value)            {                if (isStopped == 0)                {                    onNext(value);                }            }            public void OnError(Exception error)            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onError(error);                }            }            public void OnCompleted()            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onCompleted();                }            }        }        // same as EmptyOnNextAnonymousObserver...        class Subscribe_<T> : IObserver<T>        {            readonly Action<Exception> onError;            readonly Action onCompleted;            int isStopped = 0;            public Subscribe_(Action<Exception> onError, Action onCompleted)            {                this.onError = onError;                this.onCompleted = onCompleted;            }            public void OnNext(T value)            {            }            public void OnError(Exception error)            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onError(error);                }            }            public void OnCompleted()            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onCompleted();                }            }        }        // with state        class Subscribe<T, TState> : IObserver<T>        {            readonly TState state;            readonly Action<T, TState> onNext;            readonly Action<Exception, TState> onError;            readonly Action<TState> onCompleted;            int isStopped = 0;            public Subscribe(TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted)            {                this.state = state;                this.onNext = onNext;                this.onError = onError;                this.onCompleted = onCompleted;            }            public void OnNext(T value)            {                if (isStopped == 0)                {                    onNext(value, state);                }            }            public void OnError(Exception error)            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onError(error, state);                }            }            public void OnCompleted()            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onCompleted(state);                }            }        }        class Subscribe<T, TState1, TState2> : IObserver<T>        {            readonly TState1 state1;            readonly TState2 state2;            readonly Action<T, TState1, TState2> onNext;            readonly Action<Exception, TState1, TState2> onError;            readonly Action<TState1, TState2> onCompleted;            int isStopped = 0;            public Subscribe(TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext, Action<Exception, TState1, TState2> onError, Action<TState1, TState2> onCompleted)            {                this.state1 = state1;                this.state2 = state2;                this.onNext = onNext;                this.onError = onError;                this.onCompleted = onCompleted;            }            public void OnNext(T value)            {                if (isStopped == 0)                {                    onNext(value, state1, state2);                }            }            public void OnError(Exception error)            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onError(error, state1, state2);                }            }            public void OnCompleted()            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onCompleted(state1, state2);                }            }        }        class Subscribe<T, TState1, TState2, TState3> : IObserver<T>        {            readonly TState1 state1;            readonly TState2 state2;            readonly TState3 state3;            readonly Action<T, TState1, TState2, TState3> onNext;            readonly Action<Exception, TState1, TState2, TState3> onError;            readonly Action<TState1, TState2, TState3> onCompleted;            int isStopped = 0;            public Subscribe(TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext, Action<Exception, TState1, TState2, TState3> onError,                Action<TState1, TState2, TState3> onCompleted)            {                this.state1 = state1;                this.state2 = state2;                this.state3 = state3;                this.onNext = onNext;                this.onError = onError;                this.onCompleted = onCompleted;            }            public void OnNext(T value)            {                if (isStopped == 0)                {                    onNext(value, state1, state2, state3);                }            }            public void OnError(Exception error)            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onError(error, state1, state2, state3);                }            }            public void OnCompleted()            {                if (Interlocked.Increment(ref isStopped) == 1)                {                    onCompleted(state1, state2, state3);                }            }        }        class AutoDetachObserver<T> : UniRx.Operators.OperatorObserverBase<T, T>        {            public AutoDetachObserver(IObserver<T> observer, IDisposable cancel)                : base(observer, cancel)            {            }            public override void OnNext(T value)            {                try                {                    base.observer.OnNext(value);                }                catch                {                    Dispose();                    throw;                }            }            public override void OnError(Exception error)            {                try                {                    observer.OnError(error);                }                finally                {                    Dispose();                }            }            public override void OnCompleted()            {                try                {                    observer.OnCompleted();                }                finally                {                    Dispose();                }            }        }    }    public static partial class ObserverExtensions    {        public static IObserver<T> Synchronize<T>(this IObserver<T> observer)        {            return new UniRx.Operators.SynchronizedObserver<T>(observer, new object());        }        public static IObserver<T> Synchronize<T>(this IObserver<T> observer, object gate)        {            return new UniRx.Operators.SynchronizedObserver<T>(observer, gate);        }    }    public static partial class ObservableExtensions    {        public static IDisposable Subscribe<T>(this IObservable<T> source)        {            return source.Subscribe(UniRx.InternalUtil.ThrowObserver<T>.Instance);        }        public static IDisposable Subscribe2<T>(this IObservable<T> source, Action<T> onNext)        {            return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, Stubs.Nop));        }        public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)        {            return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, Stubs.Nop));        }        public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError)        {            return source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, Stubs.Nop));        }        public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted)        {            return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, onCompleted));        }        public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)        {            return source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, onCompleted));        }        public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext)        {            return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs<TState>.Throw, Stubs<TState>.Ignore));        }        public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext, Action<Exception, TState> onError)        {            return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, Stubs<TState>.Ignore));        }        public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext, Action<TState> onCompleted)        {            return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs<TState>.Throw, onCompleted));        }        public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted)        {            return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, onCompleted));        }        public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext)        {            return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, Stubs<TState1, TState2>.Throw, Stubs<TState1, TState2>.Ignore));        }        public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext,            Action<Exception, TState1, TState2> onError)        {            return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, onError, Stubs<TState1, TState2>.Ignore));        }        public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext,            Action<TState1, TState2> onCompleted)        {            return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, Stubs<TState1, TState2>.Throw, onCompleted));        }        public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext,            Action<Exception, TState1, TState2> onError, Action<TState1, TState2> onCompleted)        {            return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, onError, onCompleted));        }        public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3,            Action<T, TState1, TState2, TState3> onNext)        {            return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, Stubs<TState1, TState2, TState3>.Throw, Stubs<TState1, TState2, TState3>.Ignore));        }        public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3,            Action<T, TState1, TState2, TState3> onNext, Action<Exception, TState1, TState2, TState3> onError)        {            return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, onError, Stubs<TState1, TState2, TState3>.Ignore));        }        public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3,            Action<T, TState1, TState2, TState3> onNext, Action<TState1, TState2, TState3> onCompleted)        {            return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, Stubs<TState1, TState2, TState3>.Throw, onCompleted));        }        public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3,            Action<T, TState1, TState2, TState3> onNext, Action<Exception, TState1, TState2, TState3> onError, Action<TState1, TState2, TState3> onCompleted)        {            return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, onError, onCompleted));        }    }    internal static class Stubs    {        public static readonly Action Nop = () => { };        public static readonly Action<Exception> Throw = ex => { ex.Throw(); };        // marker for CatchIgnore and Catch avoid iOS AOT problem.        public static IObservable<TSource> CatchIgnore<TSource>(Exception ex)        {            return Observable.Empty<TSource>();        }    }    internal static class Stubs<T>    {        public static readonly Action<T> Ignore = (T t) => { };        public static readonly Func<T, T> Identity = (T t) => t;        public static readonly Action<Exception, T> Throw = (ex, _) => { ex.Throw(); };    }    internal static class Stubs<T1, T2>    {        public static readonly Action<T1, T2> Ignore = (x, y) => { };        public static readonly Action<Exception, T1, T2> Throw = (ex, _, __) => { ex.Throw(); };    }    internal static class Stubs<T1, T2, T3>    {        public static readonly Action<T1, T2, T3> Ignore = (x, y, z) => { };        public static readonly Action<Exception, T1, T2, T3> Throw = (ex, _, __, ___) => { ex.Throw(); };    }}
 |