频道栏目
首页 > 资讯 > 其他 > 正文

实现RxBus代替EventBus

17-08-29        来源:[db:作者]  
收藏   我要投稿

EventBus是什么

EventBus是为Android优化的发布/订阅事件总线

这里写图片描述

简化组件之间的通信 分离事件发送者和接收者 对活动,片段和后台线程表现良好 避免复杂和容易出错的依赖关系和生命周期问题 使您的代码更简单 运行速度是快的 很小(约50k的jar) 在实践中证明了具有100,000,000+个安装的应用程序 具有传送线程,用户优先级等高级功能

简单的说,EventBus中有围绕着事件为中心的两种角色一条总线,事件发布者与事件订阅者以及事件总线。事件发布者将事件发送到事件总线,事件总线将具体的事件分发到订阅者,订阅者进行处理

使用方式
1.定义事件

public static class MessageEvent { /* Additional fields if needed */ }

2.准备订阅者

@Subscribe(threadMode = ThreadMode.MAIN)  
public void onMessageEvent(MessageEvent event) {/* Do something */};

Register and unregister your subscriber. For example on Android, activities and fragments should usually register according to their life cycle:

 @Override
 public void onStart() {
     super.onStart();
     EventBus.getDefault().register(this);
 }

 @Override
 public void onStop() {
     super.onStop();
     EventBus.getDefault().unregister(this);
 }

3.发布事件

 EventBus.getDefault().post(new MessageEvent());

RxJava实现RxBus

接下来,我们来用RxJava写一个RxBus,来实现EventBus的发布 / 订阅的事件总线功能。

最优单例

首先,咱们先写个最优解的RxBus单例实现。

public class RxBus {

    private void RxBus(){}

    public static RxBus getInstance(){
        return rxBusInstance;
    }

    static class RxBusInner{
        static RxBus rxBusInstance = new RxBus();
    }
}

为什么说这是最优解呢?以下列举单例模式需要注意或解决的问题

线程安全或者说并发问题

懒汉式获取方式(只在需要时候实例化)

代码重排优化导致的问题

为什么上述的单例实现能解决这些问题?首先线程安全方面,我们可以看到RxBus的单例对象放在了内部类RxBusInner中,而类的加载由ClassLoader完成,由于ClassLoader的机制,一个ClassLoader同一个类,只加载一次,那么不管多少线程,得到的也是同一个类,保证了并发下是该方式是可用的。再看懒汉式,ClassLoader在加载单例类RxBusInner时不会初始化rxBusInstance。只有在第一次调用RxBusInner的getInstance()方法。

常规的单例如下,采用双重检查锁定(DCL)

    public static SingletonLazy getInstance() {
        if (mInstance == null) {//第一次检查
            synchronized (SingletonLazy.class) {//加锁
                if (mInstance == null) {//第二次次检查
                    mInstance = new SingletonLazy();//new 一个对象
                }
            }
        }        
        return mInstance;
    }

最后看代码重拍问题

假设有两个线程ThreadA和ThreadB:
ThreadA首先执行到line1,这时mInstance为null,ThreadA将接着执行new SingletonLazy();在这个过程中如果mInstance已经分配了内存地址,但是还没有完成初始化工作(问题就出在这儿,稍后分析),如果ThreadB执行了line1,因为mInstance已经指向了某一内存,所以将跳过new SingletonLazy()直接得到mInstance,但是此时mInstance还没有完成初始化,那么问题就出现了。造成这个问题的原因就是new SingletonLazy()这个操作不是原子操作。至少可以分解成以下上个原子操作:
1. 分配内存空间
2. 初始化对象
3. 将对象指向分配好的地址空间(执行完之后就不再是null了)
其中第2,3步在一些编译器中为了优化单线程中的执行性能是可以重排的。重排之后就是这样的:
1. 分配内存空间
2. 将对象指向分配好的地址空间(执行完之后就不再是null了)
3. 初始化对象
重排之后就有可能出现上边分析的情况:

这里写图片描述

虽然,JDK1.5之后volatile关键字可以禁止代码重排,但是会影响性能。所以,上述通过ClassLoader加载的方式去解决这个问题

RxJava中的Subject

Subject有两种用途

做为observable向其他的observable发送事件 做为observer接收其他的observable发送的事件

PublishSubject

该Subject不会改变事件的发送顺序。如果在已经发送了一部分事件之后注册的observer,是不会收到之前发送的事件的

PublishSubject publish = PublishSubject.create();
        publish.subscribe(new PublishObserver("first"));
        publish.onNext("1");
        publish.onNext("2");
        publish.subscribe(new PublishObserver("seconde"));
        publish.onNext("3");
        publish.onCompleted();

BehaviorSubject

该类有创建时需要一个默认参数,该默认参数会在subject未发送过其他的事件时,向注册的observer发送

        //将事件发送到observer,如果先前已经漏掉的事件,除了最近的一个事件以外,
        //其他相关事件不会重新发送到后注册的observer上。所以需要带默认值,
        //第一次被observer注册时,observable中没有内容的时候,就会将默认值发给observer
        BehaviorSubject behavior = BehaviorSubject.create("创建beahavior时候带的消息");
        behavior.subscribe(new SubjectObserver("first"));
        behavior.onNext("1");
        behavior.onNext("2");
        behavior.subscribe(new SubjectObserver("seconde"));
        behavior.onNext("3");
        behavior.onCompleted();

ReplaySubject

将事件发送到observer,无论什么时候注册observer,无论何时通过该observable发射的所有事件,均会发送给新的observer

ReplaySubject replay = ReplaySubject.create();
        replay.subscribe(new SubjectObserver("first"));
        replay.onNext("1");
        replay.onNext("2");
        replay.subscribe(new SubjectObserver("seconde"));
        replay.onNext("3");
        replay.onCompleted();

注意:Subject它负责在非Rx-Apis之间充当桥梁。但是也存在问题。他没有能力去处理onComplete 或 onError事件如果接收onError / onComplete ,那么它将不再可用,即onNext无效。当然,这其实就是RxJava设计初衷。

比如下面代码,观察者将不会收到事件:

        bus.onComplete();
        bus.onNext(o);

开始打造RxBus

我们要实现:事件总线、事件发布者以及事件订阅者。OK,其实Subject就可以同时作为这三个角色?怎么说,首先Subject既可以作为被观察者发送事件,也可以作为观察者接收事件,而RxJava内部的响应式的支持也天然的实现了事件总线的功能

public class RxBus {

    // Subject as event bus
    private Subject bus;

    private RxBus(){
        // to serialized method making them thread-safe
        bus = PublishSubject.create().toSerialized();
    }

    // post a event, is to execute onNext
    public void post(Object o){
        bus.onNext(o);
    }

    // observe, is to register a subscribe
    public  Observable toObservable(Class tClass) {
        // 
        return bus.ofType(tClass);
    }

    // has observers
    public boolean hasObservers() {
        return bus.hasObservers();
    }

    public static RxBus getInstance(){
        return rxBusInstance;
    }

    static class RxBusInner{
        static RxBus rxBusInstance = new RxBus();
    }
}

bus = PublishSubject.create().toSerialized();生成一个Subject对象,这里我们采用的是PublishSubject,当然可以根据具体需求去选择,你可以去完善这个RxBus,去支持更多方式的事件传递规则,比如类似粘性广播。toSerialized是为了实现方法的序列化,保证线程安全

post方法提供给事件发布者使用,发布事件,而事件则以传入的对象Object作为标识与区分。这里有个问题:所有的事件必须是不同的实体类型,略坑~

toObservable方法提供给事件订阅者,订阅事件,通过ofType来区分事件类型,达到准事件的确分发

若某个事件发送崩溃,会导致后续事件无法正常的生效,略坑~

RxBus填坑

上面的初步实现,我已经验证过了,是OK的,具体的代码和截图就不贴了。两个问题:

所有的事件必须是不同的实体类型,因为我们过滤事件是用的ofType 若某个事件处理崩溃,会导致后续事件无法正常的生效,这个是真的坑

filter代替ofType

ofType操作符只是通过类型去判断过滤事件,而filter能够自行定义过滤规则。那么,我们怎么通过自己的规则去实现事件过滤

规范事件

我们定义一个RxBusEvent类来规范事件,所有事件必须由RxBusEvent进行包装。其中T作为事件标识,你可以用任何类型来规范,比如Integer、String、Boolean甚至是Class等类型。但是需要注意的是,若你使用的类型没有重写hashCode与equals这两个方法,那么你必须正确重写这两个方法,以保证能够正确的完成比较事件标识

public class RxBusEvent{

    private T eventTag;

    private Object eventContent;

    public RxBusEvent() {
    }

    public RxBusEvent(T eventTag, Object eventContent) {
        this.eventTag = eventTag;
        this.eventContent = eventContent;
    }

    public T getEventTag() {
        return eventTag;
    }

    public void setEventTag(T eventTag) {
        this.eventTag = eventTag;
    }

    public Object getEventContent() {
        return eventContent;
    }

    public void setEventContent(Object eventContent) {
        this.eventContent = eventContent;
    }
}

修改RxBus

首先,我们要把事件类型由Object全部替换为RxBusEvent。接着,定义一个Predicate用于事件过滤。最后,将ofType操作符更换为filter操作符即可

public class RxBus {

    // Subject as event bus
    private Subject bus;
    private BusPredicate predicate;

    private RxBus(){
        PublishSubject publishSubject = PublishSubject.create();
        // to serialized method making them thread-safe
        bus = publishSubject.toSerialized();
        predicate = new BusPredicate();
    }

    // post a event, is to execute onNext
    public void post(RxBusEvent o){
        bus.onNext(o);
    }

    // observe, is to register a subscribe
    public Observable toObservable(RxBusEvent event) {
        return bus.filter(predicate.setEvent(event));
    }

    // has observers
    public boolean hasObservers() {
        return bus.hasObservers();
    }

    public static RxBus getInstance(){
        return rxBusInstance;
    }

    static class RxBusInner{
        static RxBus rxBusInstance = new RxBus();
    }

    private class BusPredicate implements Predicate {

        RxBusEvent event;

        public BusPredicate setEvent(RxBusEvent event){
            this.event = event;
            return this;
        }

        @Override
        public boolean test(RxBusEvent event) throws Exception {
            // If the event is consistent with the type of event required by the subscriber
            if(event.getEventTag().equals(this.event.getEventTag()))
                return true;
            return false;
        }
    }
}

RxRelay

RxRelay是JakeWharton大神写的一个依赖库,Relays 是既是Observable也是Consumer的RxJava 类型,它同样能够很容易在non-Rx api和 Rx api之间搭起桥梁,而不必要担心触发终止状态(onComplete 或 onError),咱们主要用它解决在在触发 onError / onComplete 后终止订阅关系的问题。所以基于 RxRelay 就可以不必担心订阅关系的破坏。以下是官方描述:

Relays are RxJava types which are both an Observable and a Consumer.
Basically: A Subject except without the ability to call onComplete or onError.

Subjects are useful to bridge the gap between non-Rx APIs. However, they are stateful in a damaging way: when they receive an onComplete or onError they no longer become usable for moving data. This is the observable contract and sometimes it is the desired behavior. Most times it is not.

Relays are simply Subjects without the aforementioned property. They allow you to bridge non-Rx APIs into Rx easily, and without the worry of accidentally triggering a terminal state.

As more of your code moves to reactive, the need for Subjects and Relays should diminish. In the transitional period, or for quickly adapting a non-Rx API, Relays provide the convenience of Subjects without the worry of the statefulness of terminal event behavior.

引入RxRelay

compile 'com.jakewharton.rxrelay2:rxrelay:2.0.0'

Subject替换为Relay

public class RxBus {

    // Relay as event bus
    private Relay bus;
    private BusPredicate predicate;

    private RxBus(){
        // to serialized method making them thread-safe
        PublishRelay publishSubject = PublishRelay.create();
        bus = publishSubject.toSerialized();
        predicate = new BusPredicate();
    }

    // post a event, is to execute accept
    public void post(RxBusEvent o){
        bus.accept(o);
    }

    // observe, is to register a subscribe
    public Observable toObservable(RxBusEvent event) {
        return bus.filter(predicate.setEvent(event));
    }

    // has observers
    public boolean hasObservers() {
        return bus.hasObservers();
    }

    public static RxBus getInstance(){
        return rxBusInstance;
    }

    static class RxBusInner{
        static RxBus rxBusInstance = new RxBus();
    }

    private class BusPredicate implements Predicate {

        RxBusEvent event;

        public BusPredicate setEvent(RxBusEvent event){
            this.event = event;
            return this;
        }

        @Override
        public boolean test(RxBusEvent event) throws Exception {
            // If the event is consistent with the type of event required by the subscriber
            if(event.getEventTag().equals(this.event.getEventTag()))
                return true;
            return false;
        }
    }
}

测试

测试代码:使用Interger作为事件标记

        RxBus.getInstance().toObservable(new RxBusEvent(1,"hehe")).subscribe(new Consumer() {
            @Override
            public void accept(RxBusEvent event) throws Exception {
                // do thing
            }
        });

        RxBus.getInstance().post(new RxBusEvent(1,"xxx"));
        RxBus.getInstance().post(new RxBusEvent(2,"xxx"));

测试代码:使用自定义对象作为事件标记,重写equals和hashCode方法

public class CustomerTag {

    private String username;
    private String password;

    public CustomerTag(String username, String password) {
        this.username = username;
        this.password = password;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        CustomerTag that = (CustomerTag) o;

        if (username != null ? !username.equals(that.username) : that.username != null)
            return false;
        return password != null ? password.equals(that.password) : that.password == null;

    }

    @Override
    public int hashCode() {
        int result = username != null ? username.hashCode() : 0;
        result = 31 * result + (password != null ? password.hashCode() : 0);
        return result;
    }
}
        RxBus.getInstance().toObservable(new RxBusEvent(new CustomerTag("lee","xxx"),"hehe")).subscribe(new Consumer() {
            @Override
            public void accept(RxBusEvent event) throws Exception {
                // do thing
                System.out.println("MainActivity.accept");
            }
        });

        RxBus.getInstance().post(new RxBusEvent(new CustomerTag("lee","xxx"),"xxx"));
        RxBus.getInstance().post(new RxBusEvent(new CustomerTag("lee","yyy"),"xxx"));

经过测试,功能全部正常实现,这里就不细贴运行结果了。大家可以自己试试~

内存泄漏

RxJava作为一种响应式编程框架,可谓是家喻户晓,其简洁的编码风格、易用易读的链式方法调用、强大的异步支持等使得RxJava被广泛使用,它通过线程调度器更容易控制和切换线程,但是如果该工作线程还没执行结束就退出Activity或者Fragment的话,就会导致Activity或者Fragment无法释放引起内存泄漏

最简单的处理

private CompositeDisposable compositeDisposable;
......
compositeDisposable = new CompositeDisposable();
......
Disposable subscribe = RxBus.getInstance().toObservable(new RxBusEvent(new CustomerTag("lee", "xxx"), "hehe")).subscribe(new Consumer() {
            @Override
            public void accept(RxBusEvent event) throws Exception {
                // do thing
                System.out.println("MainActivity.accept");
            }
        });
        compositeDisposable.add(subscribe);
......

    @Override
    protected void onStop() {
        super.onStop();
        compositeDisposable.dispose();
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        compositeDisposable.dispose();
    }

使用Rxlifecycle

Rxlifecycle 是trello开发的用于解决RxJava引起的内存泄漏的开源框架。

github地址:https://github.com/trello/RxLifecycle

引入依赖

//Rxlifecycle
   compile 'com.trello:rxlifecycle:0.3.1'
   compile 'com.trello:rxlifecycle-components:0.3.1'

   //Rxjava
   compile 'io.reactivex:rxjava:1.0.16'

使用方式

1.手动设置取消订阅的时机,例子1、例子3
2.绑定生命周期,自动取消订阅,例子2

public class MainActivity extends RxAppCompatActivity {

//Note:Activity需要继承RxAppCompatActivity,fragment需要继承RxFragment,等等
//可以使用的组件在components包下面

private static final String TAG = "RxLifecycle";

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    Log.d(TAG, "onCreate()");
    setContentView(R.layout.activity_main);

    // Specifically bind this until onPause()

    //Note:例子1:
    Observable.interval(1, TimeUnit.SECONDS)
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.i(TAG, "Unsubscribing subscription from onCreate()");
                }
            })
            //Note:手动设置在activity onPause的时候取消订阅
            .compose(this.bindUntilEvent(ActivityEvent.PAUSE))
            .subscribe(new Action1() {
                @Override
                public void call(Long num) {
                    Log.i(TAG, "Started in onCreate(), running until onPause(): " + num);
                }
            });
}

@Override
protected void onStart() {
    super.onStart();
    Log.d(TAG, "onStart()");

    //Note:例子2:
    // Using automatic unsubscription, this should determine that the correct time to
    // unsubscribe is onStop (the opposite of onStart).
    Observable.interval(1, TimeUnit.SECONDS)
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.i(TAG, "Unsubscribing subscription from onStart()");
                }
            })
            //Note:bindToLifecycle的自动取消订阅示例,因为是在onStart的时候调用,所以在onStop的时候自动取消订阅
            .compose(this.bindToLifecycle())
            .subscribe(new Action1() {
                @Override
                public void call(Long num) {
                    Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
                }
            });
}

@Override
protected void onResume() {
    super.onResume();
    Log.d(TAG, "onResume()");

    //Note:例子3:
    // `this.` is necessary if you're compiling on JDK7 or below.
    // If you're using JDK8+, then you can safely remove it.
    Observable.interval(1, TimeUnit.SECONDS)
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.i(TAG, "Unsubscribing subscription from onResume()");
                }
            })
            //Note:手动设置在activity onDestroy的时候取消订阅
            .compose(this.bindUntilEvent(ActivityEvent.DESTROY))
            .subscribe(new Action1() {
                @Override
                public void call(Long num) {
                    Log.i(TAG, "Started in onResume(), running until in onDestroy(): " + num);
                }
            });
}
...

内部实现

RxLifecycle 的内部实现原理,刚好与咱们今天的主要内容之一Subject相关。既能够监听Activity生命周期事件并对外发射,又能够接收每一个生命周期事件并作出判断。很明显,就是Subject!还有一个核心就是compose操作符,后续有空单独写篇博客

public abstract class RxAppCompatActivity extends AppCompatActivity implements LifecycleProvider {

    private final BehaviorSubject lifecycleSubject = BehaviorSubject.create();

    @Override
    @NonNull
    @CheckResult
    public final Observable lifecycle() {
        return lifecycleSubject.hide();
    }

    @Override
    @NonNull
    @CheckResult
    public final  LifecycleTransformer bindUntilEvent(@NonNull ActivityEvent event) {
        return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
    }

    @Override
    @NonNull
    @CheckResult
    public final  LifecycleTransformer bindToLifecycle() {
        return RxLifecycleAndroid.bindActivity(lifecycleSubject);
    }

    @Override
    @CallSuper
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        lifecycleSubject.onNext(ActivityEvent.CREATE);
    }

    @Override
    @CallSuper
    protected void onStart() {
        super.onStart();
        lifecycleSubject.onNext(ActivityEvent.START);
    }

    @Override
    @CallSuper
    protected void onResume() {
        super.onResume();
        lifecycleSubject.onNext(ActivityEvent.RESUME);
    }

    @Override
    @CallSuper
    protected void onPause() {
        lifecycleSubject.onNext(ActivityEvent.PAUSE);
        super.onPause();
    }

    @Override
    @CallSuper
    protected void onStop() {
        lifecycleSubject.onNext(ActivityEvent.STOP);
        super.onStop();
    }

    @Override
    @CallSuper
    protected void onDestroy() {
        lifecycleSubject.onNext(ActivityEvent.DESTROY);
        super.onDestroy();
    }
}

RxLifecycle 其他部分源码,大家有兴趣可以自己看看。

总结

经过小半天的努力,总结一句话:All the findings in the code are pleasant surprises.

相关TAG标签
上一篇:Project——编制进度计划、保存基准
下一篇:linux常用命令(2)
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站