Realm access from incorrect thread

Background

I am using Realm within my app. When data is loaded it then undergoes intense processing therefore the processing occurs on a background thread.

The coding pattern in use is the Unit of Work pattern and Realm only exists within a repository under a DataManager. The idea here is that each repository can have a different database/file storage solution.

What I have tried

Below is an example of some similar code to what I have in my FooRespository class.

The idea here is that an instance of Realm is obtained, used to query the realm for objects of interest, return them and close the realm instance. Note that this is synchronous and at the end copies the objects from Realm to an unmanaged state.

public Observable<List<Foo>> getFoosById(List<String> fooIds) {

    Realm realm = Realm.getInstance(fooRealmConfiguration);

    RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);

    for(String id : fooIds) {

        findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id);
        findFoosByIdQuery.or();
    }

    return findFoosByIdQuery
            .findAll()
            .asObservable()
            .doOnUnsubscribe(realm::close)
            .filter(RealmResults::isLoaded)
            .flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos))));
}

This code is later used in conjunction with the heavy processing code via RxJava:

dataManager.getFoosById(foo)
            .flatMap(this::processtheFoosInALongRunningProcess)
            .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
            .subscribe(tileChannelSubscriber);

After reading the docs, my belief is that the above should work, as it is NOT asynchronous and therefore does not need a looper thread. I obtain the instance of realm within the same thread therefore it is not being passed between threads and neither are the objects.

The problem

When the above is executed I get

Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.

This doesn't seem right. The only thing I can think of is that the pool of Realm instances is getting me an existing instance created from another process using the main thread.


Kay so

return findFoosByIdQuery
        .findAll()
        .asObservable()

This happens on UI thread, because that's where you're calling it from initially

.subscribeOn(Schedulers.io())

Aaaaand then you're tinkering with them on Schedulers.io() .

Nope, that's not the same thread!

As much as I dislike the approach of copying from a zero-copy database, your current approach is riddled with issues due to misuse of realmResults.asObservable() , so here's a spoiler for what your code should be:

public Observable<List<Foo>> getFoosById(List<String> fooIds) {
   return Observable.defer(() -> {
       try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works
           RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
           for(String id : fooIds) {
               findFoosByIdQuery.equalTo(FooFields.ID, id);
               findFoosByIdQuery.or(); // please guarantee this works?
           }
           RealmResults<Foo> results = findFoosByIdQuery.findAll();
           return Observable.just(realm.copyFromRealm(results));
       }
   }).subscribeOn(Schedulers.io());
}

Note that you are creating the instance outside of all your RxJava processing pipeline. Thus on the main thread (or whichever thread you are on, when calling getFoosById() .

Just because the method returns an Observable doesn't mean that it runs on another thread. Only the processing pipeline of the Observable created by the last statement of your getFoosById() method runs on the correct thread (the filter() , the flatMap() and all the processing done by the caller).

You thus have to ensure that the call of getFoosById() is already done on the thread used by Schedulers.io() .

One way to achieve this is by using Observable.defer() :

Observable.defer(() -> dataManager.getFoosById(foo))
            .flatMap(this::processtheFoosInALongRunningProcess)
            .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
            .subscribe(tileChannelSubscriber);
链接地址: http://www.djcxy.com/p/93406.html

上一篇: 如何使用Plotly在Python中将饼图绘制为具有自定义大小的子图

下一篇: Realm从不正确的线程访问