How to wrap listener by rxjava2 and handle the thread switch?

How to wrap listener by rxjava2 and handle the thread switch?



I wrap listener with below code,


public Observable<String> getIdToken()
return Observable.create(emitter ->
firebaseAuth.getAccessToken(false)
.addOnSuccessListener(getTokenResult ->
emitter.onNext(getTokenResult.getToken());
)
.addOnFailureListener(e ->
if (e instanceof FirebaseAuthInvalidUserException)
emitter.onError(new BaseException("", ERR_CODE_ACCOUNT_BANNED));
else
emitter.onError(e);

);
);


public Observable<BaseResponse> register()
return getIdToken()
.flatMap(idToken -> userApi.register(idToken));



but meet a bug,


.flatMap(idToken -> userApi.register(idToken));



this code tip run in mainthread,I think


firebaseAuth.getAccessToken(false)



inner code auto switch in workthread and exec a network request to google firebase,when onSuccess,it inner code switch into nowThread(MainTread),then it make


.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())



not work.but how can handle this thread switch to make flatMap in workthread?I invoke the register func like below:


Disposable disposable = UserModel.getInstance().register()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(baseResponse ->
Toast.makeText(this, "a", Toast.LENGTH_SHORT).show();
, throwable ->
throwable.printStackTrace();
Log.d("MainActivity111", throwable.getMessage());
);



logcat tip error :


08-28 11:18:26.634 6870-6870/? W/System.err: android.os.NetworkOnMainThreadException
at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1450)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:355)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:357)
at java.net.Socket.connect(Socket.java:616)
at okhttp3.internal.platform.AndroidPlatform.connectSocket(AndroidPlatform.java:71)
at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:240)
at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:160)
at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:257)
at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)

08-28 11:18:26.635 6870-6870/? W/System.err: at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
08-28 11:18:26.636 6870-6870/? W/System.err: at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
at okhttp3.RealCall.execute(RealCall.java:77)
08-28 11:18:26.637 6870-6870/? W/System.err: at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:42)
at io.reactivex.Observable.subscribe(Observable.java:12036)
08-28 11:18:26.638 6870-6870/? W/System.err: at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
at io.reactivex.Observable.subscribe(Observable.java:12036)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:165)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:67)
08-28 11:18:26.639 6870-6870/? W/System.err: at cn.candrwow.coincoin.model.UserModel.lambda$null$0(UserModel.java:37)
at cn.candrwow.coincoin.model.-$$Lambda$UserModel$TP1OIyNide1WpDi-afDDpxaJhcE.onSuccess(Unknown Source:4)
at com.google.android.gms.tasks.zzn.run(Unknown Source:27)
at android.os.Handler.handleCallback(Handler.java:790)
08-28 11:18:26.640 6870-6870/? W/System.err: at android.os.Handler.dispatchMessage(Handler.java:99)
at android.os.Looper.loop(Looper.java:164)
at android.app.ActivityThread.main(ActivityThread.java:6494)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807)



Update



I found the firebase listener inner code force switch thread to mainThread,not the thread who call it,so the above problem


emitter.onNext(getTokenResult.getToken());



is force into mainThread,not the


.subscribeOn(Schedulers.io())



firebase inner code is not open,I log below code(this is in Activity.onCreate) and found it force switch:


new Thread(() ->
Log.d("TestActivity", Thread.currentThread().getName());
FirebaseAuth.getInstance().getAccessToken(false)
.addOnSuccessListener(getTokenResult ->
Log.d("TestActivity", Thread.currentThread().getName());
);
).start();



logcat is this:


08-28 16:47:47.232 12289-12384/? D/TestActivity: Thread-5
main



I add .observeOn(Schedulers.io()) is success,and Answers's advice is success too.


public Observable<BaseResponse> register()
return getIdToken()
// .observeOn(Schedulers.io())
.flatMap(idToken -> userApi.register(idToken).subscribeOn(Schedulers.io()));



This is my UserApi interface:


public interface UserApi
@FormUrlEncoded
@POST("account/register")
Observable<BaseResponse> register(@Field("idToken") String idToken);

@GET("account/testGoogle")
Observable<BaseResponse> test(@Query("idToken") String idToken);





How is the userApi.register implemented. Try userApi.register(idToken).subscribeOn(Schedulers.io()) inside the flatMap.
– akarnokd
Aug 28 at 7:38


userApi.register


userApi.register(idToken).subscribeOn(Schedulers.io())


flatMap




1 Answer
1


.flatMap(idToken -> userApi.register(idToken))



Could be


.flatMap(idToken -> userApi.register(idToken).subscribeOn(Schedulers.io()))



And then it would work.






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

𛂒𛀶,𛀽𛀑𛂀𛃧𛂓𛀙𛃆𛃑𛃷𛂟𛁡𛀢𛀟𛁤𛂽𛁕𛁪𛂟𛂯,𛁞𛂧𛀴𛁄𛁠𛁼𛂿𛀤 𛂘,𛁺𛂾𛃭𛃭𛃵𛀺,𛂣𛃍𛂖𛃶 𛀸𛃀𛂖𛁶𛁏𛁚 𛂢𛂞 𛁰𛂆𛀔,𛁸𛀽𛁓𛃋𛂇𛃧𛀧𛃣𛂐𛃇,𛂂𛃻𛃲𛁬𛃞𛀧𛃃𛀅 𛂭𛁠𛁡𛃇𛀷𛃓𛁥,𛁙𛁘𛁞𛃸𛁸𛃣𛁜,𛂛,𛃿,𛁯𛂘𛂌𛃛𛁱𛃌𛂈𛂇 𛁊𛃲,𛀕𛃴𛀜 𛀶𛂆𛀶𛃟𛂉𛀣,𛂐𛁞𛁾 𛁷𛂑𛁳𛂯𛀬𛃅,𛃶𛁼

ャフサォクコ ケウ,コ,ワ メ,ロスョノ゙,クネ,フムカヤヲニ,エコ゚ツ ウイオン゙ケワサネォキモュキォウイノンコチ゚メヌナイゥフュ,カヒウネェ ネ,ホノケ,ムュキ ッボーミュハ,チ ツス ィ メウイマヤ,゙ウチ ヅ ロ,ォジヌェ ャヌット ェ,マャ,チナエヒネソキツテ トホヲヲミーァ

How do I collapse sections of code in Visual Studio Code for Windows?