V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
binbinyouliiii
V2EX  ›  Java

请教一下 Webflux Reactor 编程

  •  
  •   binbinyouliiii · 2020-05-18 11:44:04 +08:00 · 2736 次点击
    这是一个创建于 1694 天前的主题,其中的信息可能已经有所发展或是发生改变。
    public People getPeople(String name) {
        People people = getCache(name);
        if (people != null) {
            return people;
        }
        //Mono<People> people = httpService.monoRequest("http://www.xxx.com/path", People.class);
        people = httpService.syncRequest("http://www.xxx.com/path/info?name=" + name, People.class);
        setCache(msisdn, operatorInfo);
        return people;
    }
    
    private People getCache(String name) {
        String str = redisTemplate.opsForValue().get(name);
        if (str == null) {
            return null;
        }
        return deserialize(str);
    }
    
    private void setCache(String name, People people) {
        String str = serialize(people);
        redisTemplate.opsForValue().set(name, str);
    }
    

    以上是一个非常简单的带有缓存的获取信息的方法,我尝试使用 Spring Webflux 来做,但是写起来发现举步维艰,尤其对于 null 值的流式条件判断根本想不出有什么办法去写


    以下是我自己的理解写的,看起来非常不优雅,而且没有 null 判断,请问哪位大佬可以给个优雅的样例吗?网上的例子少得可怜

    public Mono<People> getPeople(String name) {
        Mono<People> peopleMono = httpService
                .monoRequest("http://www.xxx.com/path/info?name=" + name, People.class);
        Mono<Boolean> setCacheMono = peopleMono.flatMap(people -> setCache(name, people));
        return peopleMono
                .and(setCacheMono)
                .then(peopleMono);
    }
    
    private Mono<People> getnCache(String name) {
        return reactiveRedisTemplate.opsForValue()
                .get(name)
                .map(this::deserialize);
    }
    
    private Mono<Boolean> setCache(String name, People people) {
        return reactiveRedisTemplate.opsForValue()
                .set(name, serialize(people));
    }
    
    10 条回复    2020-05-19 20:10:43 +08:00
    jaylee4869
        1
    jaylee4869  
       2020-05-18 11:51:56 +08:00
    Optional
    TtTtTtT
        2
    TtTtTtT  
       2020-05-18 12:08:28 +08:00
    其实是一样的。。

    People people = getCache(name);
    if (people != null) {
    return people;
    }

    =>

    getCahe(name).switchIfEmpty(<after logic>)
    TtTtTtT
        3
    TtTtTtT  
       2020-05-18 12:24:13 +08:00   ❤️ 3
    换个姿势解释一下,传统代码是这样子的:
    statement1;
    statement2;
    statement3;

    那么对于 Reactor,其根本目标就是在于让 statement1 和 statement2 之间可以执行一些额外的代码,来保证代码是可调度的,进而起到所谓的 IO 优化。

    那么首先,就需要改造成:

    (statement1 as Mono).then(statement2 as Mono).then(statement3 as Mono)

    这样就能在每个 Mono 之间插入一些调度的代码。

    另一方面,为了兼容其他的语法,比如 return,比如 null,Mono 就提供了大量的成员方法来简化这个 then,让它写起来更爽。
    比如 null 对应了 Mono.empty(),return 对应了 Mono<T>里的 T 。

    以上,你需要考虑每个逻辑对应的是 Mono 的哪个方法。
    比如 if(x != null) { return x;} <after statement>对应的就是 switchIfEmpty(<after statement> as Mono);
    比如 if(x != null) { return x;} else { return y; } 对应的就是 defaultIfEmpty(y)。
    azcvcza
        4
    azcvcza  
       2020-05-18 12:34:24 +08:00
    如果可以的话,康康 js 的 promise 可能会有点帮助
    hantsy
        5
    hantsy  
       2020-05-18 12:53:34 +08:00   ❤️ 1
    Mono 中 Empty 时用 SwitchIfEmpty 或者 defaultIfEmpty 。

    https://github.com/hantsy/spring-reactive-sample

    ReactiveStreams 实现太多了,Reactor,Rxjava2/3, SmallRye Munity, Microprofile Reactive message Operators,Helidon 也包含了一个实现。
    hantsy
        6
    hantsy  
       2020-05-18 12:55:06 +08:00
    @azcvcza js 世界的 Rxjs 才是用的 ReactiveStreams 标准。Promise 算是老一代流式处理,没有订阅模式,通知机制。
    binbinyouliiii
        7
    binbinyouliiii  
    OP
       2020-05-18 14:24:55 +08:00
    @TtTtTtT #3 看了以下 switchIfEmpty 看起来像是为了保证不出错,碰到 empty 的时候以补偿的形式继续执行下去的,走到 setCache 的时候,因为无法判断是否是从缓存拿出来的,所以只能一股脑继续执行 setCahe 了。
    azcvcza
        8
    azcvcza  
       2020-05-18 15:41:05 +08:00
    @hantsy keyi
    TtTtTtT
        9
    TtTtTtT  
       2020-05-18 17:14:41 +08:00   ❤️ 1
    @binbinyouliiii 不不不,switchIfEmpty 应该这么用:
    getFromCache(key).switchIfEmpty(
    getFromDb(key).flatMap(value -> setCache(key,value))
    )
    如果设置缓存是纯异步的,也可以:
    getFromCache(key).switchIfEmpty(
    getFromDb(key).doOnSuccess(value -> setCache(key,value).subscribe())
    )
    jinzhongyuan
        10
    jinzhongyuan  
       2020-05-19 20:10:43 +08:00
    活捉大佬
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5510 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 03:41 · PVG 11:41 · LAX 19:41 · JFK 22:41
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.