一网打尽dubbo的8种集群容错模式

来自:阿飞的博客(微信号:AfeiJavaer),作者:阿飞的博客

dubbo集群容错

在集群调用时,Dubbo 提供了多种容错方案,缺省为 failover ,即失败重试。可通过接口com.alibaba.dubbo.rpc.cluster.Cluster的SPI注解可知:

/**
 * Cluster. (SPI, Singleton, ThreadSafe)
 * 
 * <a href="http://en.wikipedia.org/wiki/Computer_cluster">Cluster</a>
 * <a href="http://en.wikipedia.org/wiki/Fault-tolerant_system">Fault-Tolerant</a>
 * 
 * @author william.liangf
 */

@SPI(FailoverCluster.NAME)
public interface Cluster {
    ...
}

集群模式配置

按照以下示例在服务提供方和消费方配置集群模式

<dubbo:service cluster="failover" />
<dubbo:reference cluster="failsafe" />

集群模式概览

dubbo支持的集群模式如下图所示,由于dubbo通过SPI实现微内核,集群模式也不例外,所以想扩展自己对集群容错的处理方式,非常简单;

dubbo集群容错总览

接下来通过对源码的阅读,一一分析各个集群容错模式的实现;

Failover Cluster

dubbo默认集群模式,失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟,且使集群的压力更大。可通过 retries="2" 来设置重试次数(默认为2,这个值是重试次数,所以不包括第一次调用,而是第一次调用失败后最大可重试次数)。重试次数配置示例如下:

<dubbo:service retries="2" />
<dubbo:reference retries="2" />
<dubbo:reference><dubbo:method name="findFoo" retries="2" /></dubbo:reference>

核心实现源码:

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    // 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常
    checkInvokers(copyinvokers, invocation);
    // 得到最大可调用次数:最大可重试次数+1,默认最大可重试次数Constants.DEFAULT_RETRIES=2
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    // 如果用户设置reties为负数,那么也要调用至少1次
    if (len <= 0) {
        len = 1;
    }
    // 保存最后一次调用的异常
    RpcException le = null;
    // 保存已经调用过的Invoker
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    // failover机制核心实现:如果出现调用失败,那么重试其他服务器
    for (int i = 0; i < len; i++) {
        //重试时,进行重新选择,避免重试时invoker列表已发生变化.
        //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
        if (i > 0) {
            checkWheatherDestoried();
            // 根据Invocation调用信息从Directory中获取所有可用Invoker
            copyinvokers = list(invocation);
            //重新检查一下
            checkInvokers(copyinvokers, invocation);
        }
        // 根据负载均衡机制从copyinvokers中选择一个Invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        // 保存每次调用的Invoker
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List)invoked);
        try {
            // RPC调用得到Result
            Result result = invoker.invoke(invocation);
            // 重试过程中,将最后一次调用的异常信息以warn级别日志输出
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method ... ... ");
            }
            return result;
        } catch (RpcException e) {
            // 如果是业务性质的异常,不再重试,直接抛出
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            // 其他性质的异常统一封装成RpcException
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    // 最大可调用次数用完还得到Result的话,抛出RpcException异常:重试了N次还是失败,并输出最后一次异常信息
    throw new RpcException("Failed to invoke the method ... ");
}

Failfast Cluster

快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
核心实现源码:

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    try {
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception.
            throw (RpcException) e;
        }
        throw new RpcException(... ...);
    }
}

FailfastCluster实现比较简单,根据负载均衡机制选择一个Invoker后只调用1次,不管结果如何,不再进行任何重试:如果调用正常就返回Result,否则返回记录了详细异常信息的RpcException

Failsafe Cluster

失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
核心实现源码:

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failsafe ignore exception: " + e.getMessage(), e);
        return new RpcResult(); // ignore
    }
}

FailsafeCluster实现比较简单,根据负载均衡机制选择一个Invoker后只调用1次,不管结果如何,不再进行任何重试:如果调用正常就返回Result,否则返回一个空的RpcResult,这是和FailfastCluster的唯一区别,不会把任何异常信息返回给consumer;

Failback Cluster

失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
核心实现源码:

protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                             + e.getMessage() + ", ", e);
        // failback实现的核心,如果调用失败,后台记录失败请求,并定时重发
        addFailed(invocation, this);
        // 吞掉异常,返回一个RpcResult对象
        return new RpcResult(); 
    }
}

定时重发核心实现源码:

// 处理重试任务的线程池
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2new NamedThreadFactory("failback-cluster-timer"true));

private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
    if (retryFuture == null) {
        // double-check保证线程安全
        synchronized (this) {
            if (retryFuture == null) {
                // 一个独立的线程池处理,执行周期是5s
                retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
                    public void run() {
                        // 收集统计信息
                        try {
                            // 重试失败的请求,如果重试成功,把请求从remove掉;
                            retryFailed();
                        } catch (Throwable t) { // 防御性容错
                            logger.error("Unexpected error occur at collect statistic", t);
                        }
                    }
                }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
            }
        }
    }
    failed.put(invocation, router);
}

Forking Cluster

并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。
核心实现源码:

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    final List<Invoker<T>> selected;
    // forks数,默认为2
    final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
    // 请求超时
    final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    // 如果设置的forks值为负数,或者超过了可用Invoker数,那么选择所有可用Invoker,即invokers
    if (forks <= 0 || forks >= invokers.size()) {
        selected = invokers;
    } else {
        selected = new ArrayList<Invoker<T>>();
        // 只选择forks值指定的Invoker数量
        for (int i = 0; i < forks; i++) {
            //在invoker列表(排除selected)后,如果没有选够,则存在重复循环问题.见select实现.
            Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
            if(!selected.contains(invoker)){//防止重复添加invoker
                selected.add(invoker);
            }
        }
    }
    RpcContext.getContext().setInvokers((List)selected);
    final AtomicInteger count = new AtomicInteger();
    final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
    for (final Invoker<T> invoker : selected) {
        // ForkingCluster核心实现,多线程并行调用
        executor.execute(new Runnable() {
            public void run() {
                try {
                    Result result = invoker.invoke(invocation);
                    // 把结果放到BlockingQueue中
                    ref.offer(result);
                } catch(Throwable e) {
                    int value = count.incrementAndGet();
                    if (value >= selected.size()) {
                        ref.offer(e);
                    }
                }
            }
        });
    }
    try {
        // 从BlockingQueue中取结果:即并行调用最先返回的结果
        Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
        // 如果取得的是异常,那么将异常封装成RpcException并抛给Consumer
        if (ret instanceof Throwable) {
            Throwable e = (Throwable) ret;
            throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0"Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
        }
        return (Result) ret;
    } catch (InterruptedException e) {
        // 如果timeout指定超时时间内还没有返回结果,那么将异常封装成RpcException并抛给Consumer
        throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
    }
}

Broadcast Cluster

广播调用所有提供者,逐个调用,任意一台报错则报错 。通常用于通知所有提供者更新缓存等本地资源信息。

核心实现源码:

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    RpcContext.getContext().setInvokers((List)invokers);
    // 保存最后一个调用的异常
    RpcException exception = null;
    Result result = null;
    for (Invoker<T> invoker: invokers) {
        try {
            // 遍历所有Invoker,每个Invoker都会被调用(不管某个Invoker是否抛出异常)
            result = invoker.invoke(invocation);
        } catch (RpcException e) {
            exception = e;
            logger.warn(e.getMessage(), e);
        } catch (Throwable e) {
            exception = new RpcException(e.getMessage(), e);
            logger.warn(e.getMessage(), e);
        }
    }
    // 如果调用过程有异常,那么抛出该异常
    if (exception != null) {
        throw exception;
    }
    return result;
}

Available Cluster

遍历所有从Directory中list出来的Invoker集合,调用第一个isAvailable()为true的Invoker,只发起一次调用,失败立即报错。
isAvailable()判断逻辑如下--Client处理连接状态,且不是READONLY:

@Override
public boolean isAvailable() {
    if (!super.isAvailable())
        return false;
    for (ExchangeClient client : clients){
        if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)){
            //cannot write == not Available ?
            return true ;
        }
    }
    return false;
}


Mergeable Cluster

这个Mergeable Cluster应该是dubbo自带8种集群容错模式中最复杂的一种。

简介

在dubbo官方的用户手册中,提到了使用MergeableCluster的场景--分组聚合:

按组合并返回结果 ,比如菜单服务,接口一样,但有多种实现,用group区分,现在消费者需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。

功能示意图如下:

merger示意图

用法

定义菜单接口方式:

public interface MenuService {
    List<Menu> getMenuList(String traceId);
}

这个接口有两个实现类:HotMenuServiceImplColdMenuServiceImpl,
前者返回结果为:[{"id":1,"name":"青椒炒肉"},{"id":2,"name":"剁椒鱼头"},{"id":3,"name":"口味虾"}],
后者返回结果为:[{"id":101,"name":"凉拌黄瓜"},{"id":102,"name":"凉拌木耳"}]

Provider暴露服务:

一个服务属于group-hot
<dubbo:service interface="com.alibaba.dubbo.demo.MenuService" ref="hotMenuServiceImpl"  group="group-hot"/>

一个服务属于group-cold
<dubbo:service interface="com.alibaba.dubbo.demo.MenuService" ref="coldMenuServiceImpl" group="group-cold"/>

笔者测试时启动了两个Provider,所以总计有四个服务,dubbo-monitor监控显示如下:

dubbo-monitor服务列表

Consumer调用服务:
<dubbo:reference id="menuService" interface="com.alibaba.dubbo.demo.MenuService" retries="0" timeout="1800000" merger="list" group="*" cluster="mergeable"/>

几个重要的配置说明:

  • mergermerger="list"指定merge方式,可以自定义,也可以指定com.alibaba.dubbo.rpc.cluster.Merger文件中申明的方式;

  • groupgroup="*"表示调用接口com.alibaba.dubbo.demo.MenuService所有的分组服务,由于只有group-hot和group-cold两个分组,这里也可以配置为group="group-hot,group-cold";

  • clustercluster="mergeable"即指定集群容错模式为MergeableCluster模式,也就是本文分析的模式;

  • timeouttimeout="1800000"即超时时间,之所以设置这么大,是为了debug源码过程中不会发生超时,此配置不适用于生产环境;

com.alibaba.dubbo.rpc.cluster.Merger文件内容如下:

list=com.alibaba.dubbo.rpc.cluster.merger.ListMerger
set=com.alibaba.dubbo.rpc.cluster.merger.SetMerger
map=com.alibaba.dubbo.rpc.cluster.merger.MapMerger
byte=com.alibaba.dubbo.rpc.cluster.merger.ByteArrayMerger
... ...
boolean=com.alibaba.dubbo.rpc.cluster.merger.BooleanArrayMerger

这里需要指出的一点,dubbo官方在2017年11月份对这个文件有过修改,修改记录请戳:合并结果问题,应该是笔误,建议修复:https://github.com/apache/incubator-dubbo/issues/922,修改内容如下图所示,所以老版本和新版本的merger="list"效果不一样:

Merger bug fix
  • 运行结果

[{"id":101,"name":"凉拌黄瓜"},{"id":102,"name":"凉拌木耳"},{"id":1,"name":"青椒炒肉"},{"id":2,"name":"剁椒鱼头"},{"id":3,"name":"口味虾"}]

从运行结果可以看出,合并了HotMenuServiceImplColdMenuServiceImpl两个不同group服务的结果;

源码分析

核心源码在MergeableClusterInvoker.java中,源码如下所示:

public Result invoke(final Invocation invocation) throws RpcException {
    // 拿到可用的Invoker集合
    List<Invoker<T>> invokers = directory.list(invocation);

    // 得到配置的merger参数值
    String merger = getUrl().getMethodParameter( invocation.getMethodName(), Constants.MERGER_KEY );
    // 如果方法不需要Merge,退化为只调一个group即可--选择第一个有效的Invoker调用并返回结果
    if ( ConfigUtils.isEmpty(merger) ) {
        for(final Invoker<T> invoker : invokers ) {
            if (invoker.isAvailable()) {
                return invoker.invoke(invocation);
            }
        }
        // 如果没有任意Invoker满足isAvailable(), 那么尝试调用第一个Invoker(多尝试一下, 多一次机会)
        return invokers.iterator().next().invoke(invocation);
    }
    // 得到方法的返回类型
    Class<?> returnType;
    try {
        returnType = getInterface().getMethod(
                invocation.getMethodName(), invocation.getParameterTypes() ).getReturnType();
    } catch ( NoSuchMethodException e ) {
        returnType = null;
    }

    // 由于我们调用的服务, 有两个不同的group, 且没有申明version, 所以这个map的key有两个值
    Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
    forfinal Invoker<T> invoker : invokers ) {
        // 线程池方法异步调用
        Future<Result> future = executor.submit( new Callable<Result>() {
            public Result call() throws Exception {
                return invoker.invoke(new RpcInvocation(invocation, invoker));
            }
        } );
        // serviceKey非常重要--serviceKey的值为: groupName/serviceInterface:version,
        // 如果version没有申明, serviceKey的值为: groupName/serviceInterface
        // 如果group没有申明, serviceKey的值为: serviceInterface:version
        // 如果version和group都没有申明, serviceKey的值为: serviceInterface
        results.put( invoker.getUrl().getServiceKey(), future );
    }

    Object result = null;

    // 保存异步执行结果集合
    List<Result> resultList = new ArrayList<Result>( results.size() );

    int timeout = getUrl().getMethodParameter( invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT );
    for ( Map.Entry<String, Future<Result>> entry : results.entrySet() ) {
        Future<Result> future = entry.getValue();
        try {
            Result r = future.get(timeout, TimeUnit.MILLISECONDS);
            // 如果异步执行有异常(包括超时), 那么输出error级别的日志, 不影响最终的结果(只是部分数据缺失)
            if (r.hasException()) {
                log.error(" Invoke ... failed: ...");
            } else {
                resultList.add(r);
            }
        } catch ( Exception e ) {
            throw new RpcException("Failed to invoke service ...");
        }
    }

    if (resultList.size() == 0) {
        // 如果没有结果, 那么new一个result为null的RpcResult返回即可
        return new RpcResult((Object)null);
    } else if (resultList.size() == 1) {
        // 如果只有一个结果, 那么直接返回即可
        return resultList.iterator().next();
    }
    // 如果返回类型为void, 那么new一个result为null的RpcResult返回即可
    if (returnType == void.class) {
        return new RpcResult((Object)null);
    }
    // 如果merger的值是以.开头, 例如merger=".addAll", 这段逻辑就是调用结果类型的原生方法, 例如服务的返回结果是List<Menu>,即list类型,那么merger=".addAll"就是调用List集合的.addAll()。
    if ( merger.startsWith(".") ) {
        merger = merger.substring(1);
        Method method;
        try {
            // 首先得到调用方法,如果方法不存在,则抛出异常
            method = returnType.getMethod( merger, returnType );
        } catch ( NoSuchMethodException e ) {
            throw new RpcException(... ...);
        }
        if ( method != null ) {
            if ( !Modifier.isPublic( method.getModifiers() ) ) {
                method.setAccessible( true );
            }
            // 先取得第一个结果
            result = resultList.remove( 0 ).getValue();
            try {
                // 如果merger=".addAll"指定的方法返回类型不为void,且和dubbo服务接口方法返回类型是相同类型,以测试代码为例,.addAll()返回类型是boolean,而dubbo服务接口方法返回类型是List,所以这里的if条件分支为false
                if ( method.getReturnType() != void.class
                        && method.getReturnType().isAssignableFrom( result.getClass() ) ) {
                    // 遍历剩余的结果集
                    for ( Result r : resultList ) {
                        // 根据配置的merger值,例如merger=".addAll",依次对剩余结果集调用addAll()方法
                        result = method.invoke( result, r.getValue() );
                    }
                } else {
                    // 遍历剩余的结果集
                    for ( Result r : resultList ) {
                        // 根据配置的merger值,例如merger=".addAll",依次对剩余结果集调用addAll()方法
                        method.invoke( result, r.getValue() );
                    }
                }
            } catch ( Exception e ) {
                throw new RpcException("Can not merge result: ...");
            }
        } else {
            throw new RpcException(... ...);
        }
    } else {
        // 如果merger申明为不以.开头, 例如merger="list"
        Merger resultMerger;
        // true和default都是默认值(大小写不敏感)
        if (ConfigUtils.isDefault(merger)) {
            resultMerger = MergerFactory.getMerger(returnType);
        } else {
            // 如果merger配置的是com.alibaba.dubbo.rpc.cluster.Merger文件中申明的值, 例如merger="list", 或者merger="map"
            resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
        }
        if (resultMerger != null) {
            List<Object> rets = new ArrayList<Object>(resultList.size());
            for(Result r : resultList) {
                rets.add(r.getValue());
            }
            // 根据不用的merger实现, 合并结果
            result = resultMerger.merge(
                    rets.toArray((Object[])Array.newInstance(returnType, 0)));
        } else {
            // 如果申明一些未知的merger, 那么抛出异常
            throw new RpcException( "There is no merger to merge result." );
        }
    }
    return new RpcResult( result );
}

这一段代码还是很有借鉴意义的,比如支付宝获取支付方式(支付方式有多种,例如余额,红包,优惠券等),假设每种支付方式需要通过实时调用远程服务获取可用性,就可以模拟这种方式进行调用(笔者一位朋友在阿里笔试的时候碰到了这道题目),美滋滋^_^

注意

在条件分支if ( merger.startsWith(".") ) {}中,有一段逻辑:method = returnType.getMethod( merger, returnType );,即从dubbo服务接口方法返回类型即java.util.List中查找merger配置的方法,例如.addAll,我们先看一下debug过程各变量的值:

debug过程中变量的值

dubbo源码中method = returnType.getMethod( merger, returnType );的方法匹配核心逻辑需要满足几个条件:

  1. 方法名一样,即m.getName() == internedName。配置的是merger=".addAll",而List中也有addAll方法,这个条件符合;

  2. 寻找的方法参数类型和dubbo服务接口方法的返回类型完全一致(不能是继承关系),即arrayContentsEq(parameterTypes, m.getParameterTypes())。List中.addAll()方法参数类型是Collection(boolean addAll(Collection c);),而dubbo服务接口方法的返回类型是List类型,虽然List继承自Collection,但是并不等于,即arrayContentsEq()返回的还是false;

由上面的分析可知,如果要merger=".addAll"能够正常工作,那么只需要将dubbo服务的返回类型改成Collection即可,例如:
Collection<Menu> getMenuList(String traceId);

自定义merger实现

如果com.alibaba.dubbo.rpc.cluster.Merger文件集中方法无法满足需求,需要自定义实现,那么还是和dubbo其他扩展实现一样,依赖SPI。只需要一下几步实现即可:

  1. step1

在consumer侧的resources/META-INF/dubbo目录下,创建名为com.alibaba.dubbo.rpc.cluster.Merger的文件,且内容为:
afei=com.afei.consumer.merger.AfeiMerger

  1. step2

实现AfeiMerger,参考dubbo源码中若干Merger.java的实现类即可,例如:

public class AfeiMerger implements Merger<List<?>> {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    private static final int TOP_COUNT = 3;

    /**
     * 只随机取合并后的三个结果
     */

    public List<Object> merge(List<?>... items) {
        List<Object> result = new ArrayList<Object>();
        for (List<?> item : items) {
            if (item != null) {
                result.addAll(item);
            }
        }

        List<Integer> randList = new ArrayList<Integer>();
        for (int i=0; i<result.size(); i++) {
            randList.add(i);
        }
        log.info("before shuffle: "+randList);
        Collections.shuffle(randList);
        log.info("after shuffle: "+randList);

        int resultSize = TOP_COUNT > result.size()?result.size() : TOP_COUNT;
        List<Object> finalResult = new ArrayList<Object>(resultSize);
        for(int i=0; i<resultSize; i++){
            finalResult.add(result.get(randList.get(i)));
        }
        return finalResult;
    }

}

  1. step3

最后一步非常简单,<dubbo:reference/>中配置merger="afei"即可,这个merger的值,对应step1文件内容中的key;
<dubbo:reference id="menuService" interface="com.alibaba.dubbo.demo.MenuService" retries="0" timeout="1800000" merger="afei" group="group-hot,group-cold" cluster="mergeable"/>

  1. step4

just run it。多运行几次,可以看到不到的结果,即达到了随机取3个结果的目的:

[{"id":101,"name":"凉拌黄瓜"},{"id":102,"name":"凉拌木耳"},{"id":2,"name":"剁椒鱼头"}]
[{"id":101,"name":"凉拌黄瓜"},{"id":3,"name":"口味虾"},{"id":2,"name":"剁椒鱼头"}]
[{"id":2,"name":"剁椒鱼头"},{"id":102,"name":"凉拌木耳"},{"id":3,"name":"口味虾"}]

推荐↓↓↓
Java编程
上一篇:Kafka如何做到1秒处理1500万条消息? 下一篇:阿里监控诊断工具 Arthas 源码原理分析