Android
Multithreading mit RxJava
Posted on .RxJava läuft standardmäßig synchron ab. Um Operationen mit RxJava asynchron durchzuführen gibt es Scheduler
.
Das häufiges Beispiel für den Einsatz von Schedulern
sieht so oder so ähnlich aus:
webService.login()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(session -> {
// display result
});
Der Operator .subscribeOn
sorgt dafür das der Code des Observables
auf einem Arbeiter-Thread läuft und die Subscription
auf dem Haupt-Thread, um das Ergebnis im User-Interface darzustellen.
Wo werden die Operatoren ausgeführt?
Die meisten Texte und Artikel, die den Einsatz von Schedulern erklären, besonders wenn es um Android geht, erklären nicht die genau, was die beiden Operatoren wirklich tun. Zum Beispiel folgender Code:
webService.login()
.subscribeOn(Schedulers.newThread())
.map(session -> {return session.getUserId()})
.flatMap(id -> webService.loadProfile(id))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(profile -> {});
Auf welchem Thread laufen die Operatoren, dem Haupt-Thread oder dem neuen Thread? Vor allem, wo läuft der zweite Netzwerk-Aufruf webService.loadProfile(id)
? Dafür muss man sich ansehen, was die beiden Scheduler-Operatoren genau mit dem Stream machen und wie sie sich unterscheiden.
Die Scheduler-Operatoren
.subscribeOn()
bestimmt, mit welchem ‚Scheduler‘ das Observable
und damit der Stream seine Arbeit beginnt. Wenn der Operator .subscribeOn()
mehrmals in einem Stream vorkommt, hat nur sein letzter Aufruf eine Wirkung, weil er ganz bis zum Ursprungs-Observable
zurück reicht.
webService.login() // on scheduler_2
.subscribeOn(scheduler_1)
.subscribeOn(scheduler_2)
.subscribe(profile -> {}); // on scheduler_2
Hingegen ändert .observeOn()
den benutzten Scheduler für alle nachfolgenden Operationen in einem Stream. Kommt der Operator mehrmals in einem Stream vor, wo wird mehrmals der Scheduler beim Durchlauf des Streams gewechselt.
webService.login() // on scheduler_3
.map(session -> {return session.getUserId()}) // on scheduler_3
.observeOn(scheduler_1)
.flatMap(id -> webService.loadProfile(id)) // on scheduler_1
.observeOn(scheduler_2)
.subscribeOn(scheduler_3)
.subscribe(profile -> {}); // on scheduler_2