Rxjava: Call onNext from Main thread dont repect subscribeOn when flatMap or switchMap

Created on 5 Jan 2016  路  6Comments  路  Source: ReactiveX/RxJava

Im doing a wrapper for the facebook sdk and trying to do it with rx java but Im getting an issue with the actual thread for the observables created in my wrapper. I want to get the user object after the user log in with fb.

public class LoginPresenter extends BasePresenter<LoginView> {

public void startFacebookLogin() {
    if (!mNetworkChecker.isNetworkConnectionAvailable()) {
      mView.showNoConnectionError();
      return;
    }

    Subscription subscription = mFacebookRepository.startLoginFlow()
        .flatMap(loginResult -> mFacebookRepository.getUser())
        .compose(Transformer.applyIoSchedulers())
        .subscribe(user -> mView.startSignUpView(user), this::manageErrorFlow);

    mCompositeSubscription.add(subscription);
  }
}
public class FacebookRepository {

  private WeakReference<Activity> mActivityWeakReference;
  @Nullable private CallbackManager callbackManager;

  public FacebookRepository(String facebookPublicKey, Activity activity) {
    if (!FacebookSdk.isInitialized()) {
      FacebookSdk.sdkInitialize(com.barista_v.wapa.api.Api.getContext());
    }

    if (com.barista_v.wapa.api.Api.getLogLevel() > com.barista_v.wapa.api.Api.LOG_LEVEL_BASIC) {
      FacebookSdk.addLoggingBehavior(LoggingBehavior.REQUESTS);
    }

    mActivityWeakReference = new WeakReference<>(activity);

    FacebookSdk.setApplicationId(facebookPublicKey);
  }

  public Observable<LoginResult> startLoginFlow() {
    return Observable.create(subscriber -> {
      Activity activity = mActivityWeakReference.get();
      if (activity != null) {
        LoginManager loginManager = LoginManager.getInstance();
        callbackManager = CallbackManager.Factory.create();

        loginManager.logInWithReadPermissions(activity, getPermissions());
        loginManager.registerCallback(callbackManager, new FacebookCallback<LoginResult>() {
          @Override
          public void onSuccess(LoginResult loginResult) {
            subscriber.onNext(loginResult);
            subscriber.onCompleted();
          }

          @Override
          public void onCancel() {
            subscriber.onError(
                new FacebookLoginCancelException("Login canceled.", "Facebook Error", null));
          }

          @Override
          public void onError(FacebookException exception) {
            subscriber.onError(new FacebookLoginException(exception));
          }
        });
      } else {
        Timber.w("The activity reference is null for FacebookRepository.startLoginFlow()");
        subscriber.onError(
            new FacebookLoginException("Something happened internally.", "Facebook Error", null));
      }
    });
  }

  public Observable<User> getUser() {
    return Observable.defer(() -> {
      AccessToken accessToken = AccessToken.getCurrentAccessToken();
      String userId = accessToken.getUserId();
      Bundle params = new Bundle();
      params.putString("fields", "name,email");
      String graphPath = "/" + userId;
      GraphRequest graphRequest = new GraphRequest(accessToken, graphPath, params, HttpMethod.GET);

      JSONObject json;
      try {
        GraphResponse result = graphRequest.executeAndWait();
        json = result.getJSONObject();
      } catch (FacebookException e) {
        return Observable.error(e);
      }

      String name, email;

      try {
        name = json.getString("name");
      } catch (JSONException | JsonSyntaxException | NullPointerException e) {
        return Observable.error(e);
      }

      try {
        email = json.getString("email");
      } catch (JSONException | JsonSyntaxException | NullPointerException e) {
        com.barista_v.wapa.api.error_handling.ApiException exception =
            new UserInfoMissingException(
                "Email access permission is not granted but it is required.",
                "Invalid Facebook Permissions", e);
        return Observable.error(exception);
      }

      String photoUrl =
          String.format("https://graph.facebook.com/%s/picture?type=large", userId);

      return Observable.just(new User(name, email, photoUrl));
    });
  }

  public void onActivityResult(int requestCode, int resultCode, Intent data) {
    if (callbackManager != null) {
      callbackManager.onActivityResult(requestCode, resultCode, data);
    }
  }
}

public class Transformer {
  public static <T> Observable.Transformer<T, T> applyIoSchedulers() {
    return observable -> observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
  }
}

The problem is that the code that execute the mFacebookRepository.getUser() is in the main thread.
Should not be the IO thread?

I create the observable and call onNext when FacebookCallback call onSuccess function and it is called in the main thread, so subscriber.onNext is called on the main thread.

Is this a bug or something Im doing wrong?

Question

Most helpful comment

Subscription subscription = mFacebookRepository.startLoginFlow()
        .flatMap(loginResult -> mFacebookRepository.getUser().subscribeOn(Schedulers.io())
        .compose(Transformer.applyIoSchedulers())
        .subscribe(user -> mView.startSignUpView(user), this::manageErrorFlow);

All 6 comments

Hi. Where does the loginManager's callback run? Is it possible the facebook api forces it onto the main thread because most consumers want it there? Regardless, if you are uncertain where things execute, apply more of the subscribeOn, for example, right after getUser() to make sure it gets executed off the main thread.

@akarnokd the LoginManager is from fb, yeah I think that too, they force it into the main thread. What do you mean by apply more of the subscribeOn?

Subscription subscription = mFacebookRepository.startLoginFlow()
        .flatMap(loginResult -> mFacebookRepository.getUser().subscribeOn(Schedulers.io())
        .compose(Transformer.applyIoSchedulers())
        .subscribe(user -> mView.startSignUpView(user), this::manageErrorFlow);

@akarnokd that worked thanks :+1: but is not a little confusing? all observables should not use the same compose transform?

No, streams can go around on pretty much any scheduler or even external threads. The only thing that can be done is apply subscribeOn and observeOn at questionable points to make sure the stream runs on the required threads.

:+1:

Was this page helpful?
0 / 5 - 0 ratings

Related issues

dsvoronin picture dsvoronin  路  4Comments

francorolando picture francorolando  路  3Comments

dlew picture dlew  路  4Comments

aballano picture aballano  路  3Comments

philleonard picture philleonard  路  3Comments