Initialisation de la chaîne Rx

Bonjour à tous, je m'appelle Ivan, je suis développeur Android. Aujourd'hui, je souhaite partager mon expérience avec RxJava2 et vous expliquer comment se déroule l'initialisation de la chaîne. Pourquoi ai-je décidé d'en parler ? Après avoir discuté avec d'autres développeurs, j'ai réalisé que tous ceux qui utilisent cet outil ne comprennent pas son fonctionnement. Et puis j'ai décidé de comprendre comment les abonnements sont organisés dans RxJava2 et dans quel ordre tout le travail est initialisé. Je n'ai pas trouvé un seul article expliquant cela. À la lumière de cela, je suis allé dans le code source pour voir comment tout fonctionnait et j'ai esquissé pour moi-même une petite feuille de triche, qui est devenue cet article.





Dans cet article, je ne décrirai pas ce que c'est Observable



, Observer



et toutes les autres entités qui sont utilisées dans RxJava2. Si vous décidez de lire cet article, je suppose que vous connaissez déjà ces informations. Et si vous n'êtes toujours pas familiarisé avec ces concepts, alors je vous recommande de vous familiariser avec eux avant de les lire.





Voici comment commencer :





Grock * RxJava





Explorer RxJava 2 pour Android





Voyons comment fonctionne la chaîne la plus simple :





Observable.just (1, 2, 3, 4, 5)
.map {…}
.filter {…}
.subscribe();
      
      



En haut

Tout d'abord, je vais décrire brièvement chaque étape que nous passons dans cette chaîne (les étapes commencent de haut en bas) :





  • Un objet est créé dans l'instruction just ObservableFromArray



    .





  • Un objet est créé dans l'instruction map ObservableMap



    , qui prend dans le constructeur une référence à l'objet précédemment créé dans l'instruction just.





  • filter ObservableFilter



    , map, just.





  • Observable



    Observable



    subscribe()



    ( ObservableFilter



    filter) Observer



    , .





  • ObservableFilter.subscribe()



    ObservableFilter.subscribeActual()



    , Observer



    , filter, FilterObserver



    . Observer



    Observer



    ObservableFilter.subscribe()



    .





  • ObservableMap.subscribe()



    ObservableMap.subscribeActual()



    Observer,



    map, MapObserver



    , FilterObserver



    .





  • ObservableFromArray.subscribe()



    ObservableFromArray.subscribeActual()



    , onSubscribe()



    ObservableFromArray.subscribeActual()



    Observer



    ’.





  • onSubscribe()



    Observer



    ’ .





  • ObservableFromArray



    onNext()



    Observer



    ’.





Une représentation visuelle du diagramme ci-dessus.
.

, just()



null, fromArray(),



Observable



.





public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {
   ObjectHelper.requireNonNull(item1, "item1 is null");
   ObjectHelper.requireNonNull(item2, "item2 is null");
   ObjectHelper.requireNonNull(item3, "item3 is null");
   ObjectHelper.requireNonNull(item4, "item4 is null");
   ObjectHelper.requireNonNull(item5, "item5 is null");

   return fromArray(item1, item2, item3, item4, item5);
}
      
      



fromArray()



, .





public static <T> Observable<T> fromArray(T... items) {
   ObjectHelper.requireNonNull(items, "items is null");
   if (items.length == 0) {
       return empty();
   }
   if (items.length == 1) {
       return just(items[0]);
   }
   return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
      
      



ObservableFromArray



, .





onAssembly()



, - Observable



, , .





public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
   Function<? super Observable, ? extends Observable> f = onObservableAssembly;
   if (f != null) {
       return apply(f, source);
   }
   return source;
}
      
      



onAssembly()



Observable



- , :





RxJavaPlugins.setOnObservableAssembly(o -> {
	if (o instanceof ObservableFromArray) {
    	return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });
	}
	return o;
});
 
Observable.just(1, 2, 3)
.filter(v -> v > 3)
.test()
.assertResult(4, 5, 6);
      
      



Le tout juste créé ObservableFromArray
ObservableFromArray

map()



. , . null, ObservableMap



.





public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
   ObjectHelper.requireNonNull(mapper, "mapper is null");
   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
      
      



, ObservableMap



mapper, , this (source). this ObservableFromArray



. ObservableMap



AbstractObservableWithUpstream



, source.





AbstractObservableWithUpstream



, Observable



.





onAssembly()



Observable







Schéma mis à jour avec ObservableMap généré
ObservableMap

filter()



. , , ObservableFilter



this ObservableMap



( ObservableFromArray



, ) .





public final Observable<T> filter(Predicate<? super T> predicate) {
   ObjectHelper.requireNonNull(predicate, "predicate is null");
   return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
      
      



Schéma mis à jour avec ObservableFilter généré
ObservableFilter

subscribe()



, . onNext()



. subscribe()



ObservableFilter



, Observable



.





public final Disposable subscribe(Consumer<? super T> onNext) {
   return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
      
      



null, LambdaObserver



.





public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
       Action onComplete, Consumer<? super Disposable> onSubscribe) {
   ObjectHelper.requireNonNull(onNext, "onNext is null");
   ObjectHelper.requireNonNull(onError, "onError is null");
   ObjectHelper.requireNonNull(onComplete, "onComplete is null");
   ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

   LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

   subscribe(ls);

   return ls;
}
      
      



, .





public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

       subscribeActual(observer);
   } catch (NullPointerException e) { 
     ......
 }
}
      
      



subscribeActual()



LambdaObserver



. subscribeActual()



ObservableFilter



. .





public void subscribeActual(Observer<? super T> observer) {
   source.subscribe(new FilterObserver<T>(observer, predicate));
}
      
      



FilterObserver



, LambdaObserver



, ObservableFilter



.





FilterObserver



BasicFuseableObserver



, onSubscribe()



. BasicFuseableObserver



, Observer



’. , 6 , FilterObserver



MapObserver



. BasicFuseableObserver.onSubscribe()



onSubscribe()



Observer



’, . :





public final void onSubscribe(Disposable d) {
   if (DisposableHelper.validate(this.upstream, d)) {
       this.upstream = d;
       if (d instanceof QueueDisposable) {
           this.qd = (QueueDisposable<T>)d;
       }
       if (beforeDownstream()) {

           downstream.onSubscribe(this);

           afterDownstream();
       }
   }
}
      
      



, ObservableFilter



FilterObserver



, source.subscribe()



. , source ObservableMap



, . ObservableMap



subscribe()



.





public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

       subscribeActual(observer);
   } catch (NullPointerException e) { 
     ......
 }
}
      
      



, subscribe()



subscribeActual()



, ObservableMap



. subscribeActual()



MapObserver



FilterObserver



mapper



’. 





public void subscribeActual(Observer<? super U> t) {
   source.subscribe(new MapObserver<T, U>(t, function));
}
      
      



public void subscribeActual(Observer<? super T> observer) {
   FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);

   observer.onSubscribe(d);

   if (d.fusionMode) {
       return;
   }
   d.run();
}
      
      



Observer



BasicFuseableObserver



, onSubscribe()



, Observer



, onSubscribe()



.





subscribeActual()



run()



, Observer



.





void run() {
   T[] a = array;
   int n = a.length;

   for (int i = 0; i < n && !isDisposed(); i++) {
       T value = a[i];
       if (value == null) {
           downstream.onError(new NullPointerException("The element at index " + i + " is null"));
           return;
       }
       downstream.onNext(value);
   }
   if (!isDisposed()) {
       downstream.onComplete();
   }
}
      
      



onNext()



Observer



’, onComplete()



onError()



, .





Représentation visuelle du processus de création et de souscription





Observable



callback’ Observer



, .





onSubscribe()



, doOnSubscribe()



.





3 :









  • Observable







  • Observer







Par conséquent, lors de l'utilisation d'opérateurs, il convient de garder à l'esprit que chaque opérateur alloue de la mémoire à plusieurs objets et vous ne devez pas ajouter d'opérateurs à la chaîne, simplement parce que c'est « possible ».





RxJava est un outil puissant, mais vous devez comprendre comment il fonctionne et pour quoi l'utiliser. Si vous avez juste besoin d'exécuter une requête réseau dans un thread d'arrière-plan puis d'exécuter le résultat sur le thread principal, alors c'est comme "tirer des moineaux avec un canon", vous pouvez vous faire prendre, mais les conséquences peuvent être graves.








All Articles