Ønce again I prepared a programming contest on GeeCØN 2016 for my company. This time the assignment required designing and optionally implementing a system given the following requirements:
一个系统每秒发送大约一千个事件。 每事件具有至少两个属性
clientId-我们预计一位客户每秒最多可发生几次事件UUID-全球唯一消耗一个事件大约需要10毫秒。 设计此类流的使用者
允许实时处理事件与一个客户相关的事件应按顺序进行处理 即您不能并行处理同一客户的事件clientId如果重复UUID在10秒钟内出现 将其放下。 假设10秒钟后不会出现重复这些要求中没有几个重要的细节
1000个事件/秒和10毫秒消耗一个事件。 显然 我们至少需要10个并发使用者才能实时消费。活动具有自然的汇总ID clientId 。 在一秒钟内 我们可以为给定的客户端预期一些事件 并且不允许我们同时或无序处理它们。我们必须以某种方式忽略重复的消息 很可能是通过记住最近10秒钟内的所有唯一ID。 这大约一万UUID暂时保留。在本文中 我将指导您完成一些正确的解决方案 并进行一些尝试。 您还将学习如何使用少量精确定位的指标来解决问题。
Naive sequential processing让我们通过迭代解决这个问题。 首先 我们必须对API进行一些假设。 想象一下
interface EventStream { void consume(EventConsumer consumer); FunctionalInterfaceinterface EventConsumer { Event consume(Event event); Valueclass Event { private final Instant created Instant.now(); private final int clientId; private final UUID uuid;
典型的基于推送的API 类似于JMS。 重要说明是事件Consumer正在阻止 这意味着它不会提供新的事件直到上一个被事件Consumer。 这只是我所做的一个假设 并没有彻底改变需求。 这也是JMS中消息侦听器的工作方式。 天真的实现只附加了一个侦听器 该侦听器需要大约10毫秒才能完成
class ClientProjection implements EventConsumer { Override public Event consume(Event event) { Sleeper.randSleep(10, 1); return event;
当然 在现实生活中 该用户会在数据库中存储一些内容 进行远程调用等。我在睡眠时间分配中添加了一些随机性 以使手动测试更加实际
class Sleeper { private static final Random RANDOM new Random(); static void randSleep(double mean, double stdDev) { final double micros 1_000 * (mean RANDOM.nextGaussian() * stdDev); try { TimeUnit.MICROSECONDS.sleep((long) micros); } catch (InterruptedException e) { throw new RuntimeException(e);//...EventStream es new EventStream(); //some real implementation herees.consume(new ClientProjection());
It compiles and runs but in order to figure out that the requirements aren t met we must plug in few metrics. The most important metric is the latency of message consumption, measured as a time between message creation and start of processing. We ll use Dropwizard Metrics for that:
class ClientProjection implements EventConsumer { private final ProjectionMetrics metrics; ClientProjection(ProjectionMetrics metrics) { this.metrics metrics; Override public Event consume(Event event) { metrics.latency(Duration.between(event.getCreated(), Instant.now())); Sleeper.randSleep(10, 1); return event;
的投影指标类被提取为单独的职责
import com.codahale.metrics.Histogram;import com.codahale.metrics.MetricRegistry;import com.codahale.metrics.Slf4jReporter;import lombok.extern.slf4j.Slf4j;import java.time.Duration;import java.util.concurrent.TimeUnit; Slf4jclass ProjectionMetrics { private final Histogram latencyHist; ProjectionMetrics(MetricRegistry metricRegistry) { final Slf4jReporter reporter Slf4jReporter.forRegistry(metricRegistry) .outputTo(log) .convertRatesTo(TimeUnit.SECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS) .build(); reporter.start(1, TimeUnit.SECONDS); latencyHist metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class, latency void latency(Duration duration) { latencyHist.update(duration.toMillis());
现在 当您运行朴素的解决方案时 您会迅速发现中值延迟以及99.9 的百分数无限增长
type HISTOGRAM, [...] count 84, min 0, max 795, mean 404.88540608274104, [...] median 414.0, p75 602.0, p95 753.0, p98 783.0, p99 795.0, p999 795.0type HISTOGRAM, [...] count 182, min 0, max 1688, mean 861.1706371990878, [...] median 869.0, p75 1285.0, p95 1614.0, p98 1659.0, p99 1678.0, p999 1688.0[...30 seconds later...]type HISTOGRAM, [...] count 2947, min 14, max 26945, mean 15308.138585757424, [...] median 16150.0, p75 21915.0, p95 25978.0, p98 26556.0, p99 26670.0, p999 26945.0
30秒后 我们的应用程序平均会延迟15秒处理事件。 不是完全即时的。 显然 缺少并发是任何原因。 我们的客户投影事件使用者大约需要10毫秒才能完成 因此每秒可以处理多达100个事件 而我们还需要一个数量级。 我们必须扩展客户投影不知何故。 而且我们甚至都没有触及其他要求
Naive thread pool最明显的解决方案是调用EventConsumer来自多个线程。 最简单的方法是利用执行器服务
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;class NaivePool implements EventConsumer, Closeable { private final EventConsumer downstream; private final ExecutorService executorService; NaivePool(int size, EventConsumer downstream) { this.executorService Executors.newFixedThreadPool(size); this.downstream downstream; Override public Event consume(Event event) { executorService.submit(() - downstream.consume(event)); return event; Override public void close() throws IOException { executorService.shutdown();
We use a decorator pattern here. The original ClientProjection, implementing EventConsumer was correct. However we wrap it with another implementation of EventConsumer that adds concurrency. This will allows us to compose complex behaviors without changing ClientProjection itself. Such design promotes:
loose coupling: various EventConsumer don t know about each other and can be combined freelysingle responsibility: each does one job and delegates to the next component open/closed principle: we can change the behavior of the system without modifying existing implementations.打开/关闭原理通常通过注入策略和模板方法模式来实现。 在这里 它甚至更简单。 整体接线如下
MetricRegistry metricRegistry new MetricRegistry();ProjectionMetrics metrics new ProjectionMetrics(metricRegistry);ClientProjection clientProjection new ClientProjection(metrics);NaivePool naivePool new NaivePool(10, clientProjection);EventStream es new EventStream();es.consume(naivePool);
我们精心设计的指标表明情况确实好得多
type HISToOGRAM, count 838, min 1, max 422, mean 38.80768197277468, [...] median 37.0, p75 45.0, p95 51.0, p98 52.0, p99 52.0, p999 422.0type HISTOGRAM, count 1814, min 1, max 281, mean 47.82642776789085, [...] median 51.0, p75 57.0, p95 61.0, p98 62.0, p99 63.0, p999 65.0[...30 seconds later...]type HISTOGRAM, count 30564, min 5, max 3838, mean 364.2904915942238, [...] median 352.0, p75 496.0, p95 568.0, p98 574.0, p99 1251.0, p999 3531.0
然而 我们仍然看到延迟的规模越来越小 在30秒后 延迟达到了364毫秒。 它一直在增长 因此问题是系统的。 我们……需要……更多……指标。 注意朴素池 您很快就会知道为什么会这样幼稚 正好有10个线程可供使用。 这应该足以处理数千个事件 每个事件需要10毫秒来处理。 实际上 我们需要一点额外的处理能力 以避免垃圾收集后或负载高峰时出现问题。 为了证明线程池实际上是我们的瓶颈 最好监视其内部队列。 这需要一些工作
class NaivePool implements EventConsumer, Closeable { private final EventConsumer downstream; private final ExecutorService executorService; NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) { LinkedBlockingQueue Runnable queue new LinkedBlockingQueue (); String name MetricRegistry.name(ProjectionMetrics.class, queue Gauge Integer gauge queue::size; metricRegistry.register(name, gauge); this.executorService new ThreadPoolExecutor( size, size, 0L, TimeUnit.MILLISECONDS, queue); this.downstream downstream; Override public Event consume(Event event) { executorService.submit(() - downstream.consume(event)); return event; Override public void close() throws IOException { executorService.shutdown();
The idea here is to create ŤhreadPoolExecutor manually in order to provide custom LinkedBlockingQueue instance. We can later use that queue to monitor its length (see: ExecutorService - 10 tips and tricks).
Gauge will periodically invoke queue::size and report it to wherever you need it. Metrics confirm that thread pool size was indeed a problem:
type GAUGE, name [...].queue, value 35type GAUGE, name [...].queue, value 52[...30 seconds later...]type GAUGE, name [...].queue, value 601
容纳待处理任务的队列的大小不断增加 这会损害延迟。 将线程池大小从10增加到20最终会报告出不错的结果 并且没有停顿。 但是 我们仍然没有解决重复项 也没有针对相同的事件进行并发修改clientId。
Obscure locking让我们从避免同时处理同一事件开始clientId。 如果两个事件很快接come而至 则两者都与同一个事件相关clientId 朴素池将同时选择它们并开始同时处理它们。 首先 我们至少会通过锁每个clientId
Slf4jclass FailOnConcurrentModification implements EventConsumer { private final ConcurrentMap Integer, Lock clientLocks new ConcurrentHashMap (); private final EventConsumer downstream; FailOnConcurrentModification(EventConsumer downstream) { this.downstream downstream; Override public Event consume(Event event) { Lock lock findClientLock(event); if (lock.tryLock()) { try { downstream.consume(event); } finally { lock.unlock(); } else { log.error( Client {} already being modified by another thread , event.getClientId()); return event; private Lock findClientLock(Event event) { return clientLocks.computeIfAbsent( event.getClientId(), clientId - new ReentrantLock());
这肯定是朝错误的方向前进。 复杂程度不堪重负 但是运行此代码至少表明存在问题。 事件处理管道如下所示 一个装饰器包装了另一个装饰器
ClientProjection clientProjection new ClientProjection(new ProjectionMetrics(metricRegistry));FailOnConcurrentModification failOnConcurrentModification new FailOnConcurrentModification(clientProjection);NaivePool naivePool new NaivePool(10, failOnConcurrentModification, metricRegistry);EventStream es new EventStream();es.consume(naivePool);
偶尔会弹出错误消息 告诉我们其他一些线程已经在处理同一事件。clientId。 对于每个clientId我们关联一个锁我们检查一下以确定当前是否有另一个线程不在处理该客户端。 尽管丑陋 但实际上我们已经接近残酷的解决方案。 而不是失败的时候锁无法获得 因为另一个线程已经在处理某些事件 让我们稍等一下 希望锁将被释放
Slf4jclass WaitOnConcurrentModification implements EventConsumer { private final ConcurrentMap Integer, Lock clientLocks new ConcurrentHashMap (); private final EventConsumer downstream; private final Timer lockWait; WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) { this.downstream downstream; lockWait metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification.class, lockWait Override public Event consume(Event event) { try { final Lock lock findClientLock(event); final Timer.Context time lockWait.time(); try { final boolean locked lock.tryLock(1, TimeUnit.SECONDS); time.stop(); if(locked) { downstream.consume(event); } finally { lock.unlock(); } catch (InterruptedException e) { log.warn( Interrupted , e); return event; private Lock findClientLock(Event event) { return clientLocks.computeIfAbsent( event.getClientId(), clientId - new ReentrantLock());
这个想法非常相似。 但是不要失败try锁 等待最多1秒 希望锁给定的客户端将被释放。 如果两个事件很快相继发生 一个事件将获得锁然后继续 而另一个会阻止等待开锁 即将发生。
不仅这些代码确实令人费解 而且还可能以许多微妙的方式被破坏。 例如 如果两个事件相同clientId几乎是在同一时间来到的 但显然是第一个 这两个事件都将要求锁同时我们也无法保证哪个事件会导致不公平锁首先 可能会消耗混乱的事件。 一定会有更好的办法...
Dedicated threads让我们退后一步 深吸一口气。 您如何确保事情不会同时发生 好吧 只需使用一个线程 事实上 这是我们一开始所做的 但是吞吐量并不令人满意。 但是我们不关心不同的并发clientIds 我们只需要确保事件具有相同的clientId总是由同一线程处理
也许从创建地图clientId至线 comes至your mind? 好吧 这将过于简单化。 我们将创建数千个线程 每个线程在大多数情况下都按照要求空闲 对于给定的条件 每秒只有很少的事件clientId 。 一个不错的折衷方案是固定大小的线程池 每个线程负责一个众所周知的线程子集。clientIds。 这样两种不同clientId可能以相同的线程结尾但相同clientId将始终由同一线程处理。 如果两个事件相同clientId appear, they will both be routed至the same thread, thus avoiding concurrent processing. The implementation is embarrassingly simple:
class SmartPool implements EventConsumer, Closeable { private final List ExecutorService threadPools; private final EventConsumer downstream; SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) { this.downstream downstream; List ExecutorService list IntStream .range(0, size) .mapToObj(i - Executors.newSingleThreadExecutor()) .collect(Collectors.toList()); this.threadPools new CopyOnWriteArrayList (list); Override public void close() throws IOException { threadPools.forEach(ExecutorService::shutdown); Override public Event consume(Event event) { final int threadIdx event.getClientId() % threadPools.size(); final ExecutorService executor threadPools.get(threadIdx); executor.submit(() - downstream.consume(event)); return event;
关键部分就在最后
int threadIdx event.getClientId() % threadPools.size();ExecutorService executor threadPools.get(threadIdx);
这个简单的算法将始终使用相同的单线程执行器服务对于相同的clientId。 例如 当池大小为20 客户7 27 47 etc. will use the same thread. But this is OK as long as one clientId always uses the same thread. At this point no locking is necessary and sequential invocation is guaranteed because events对于相同的client are always executed by the same thread. Side note: one thread per clientId would not scale but one actor per clientId 例如在Akka中 是一个很好的想法 可以简化很多工作。
为了更加安全 我在每个线程池中插入了平均队列大小的指标 这使实现更长
class SmartPool implements EventConsumer, Closeable { private final List LinkedBlockingQueue Runnable queues; private final List ExecutorService threadPools; private final EventConsumer downstream; SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) { this.downstream downstream; this.queues IntStream .range(0, size) .mapToObj(i - new LinkedBlockingQueue Runnable ()) .collect(Collectors.toList()); List ThreadPoolExecutor list queues .stream() .map(q - new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q)) .collect(Collectors.toList()); this.threadPools new CopyOnWriteArrayList (list); metricRegistry.register(MetricRegistry.name(ProjectionMetrics.class, queue ), (Gauge Double ) this::averageQueueLength); private double averageQueueLength() { double totalLength queues .stream() .mapToDouble(LinkedBlockingQueue::size) .sum(); return totalLength / queues.size(); //...
如果您偏执狂 您甚至可以为每个队列创建一个指标。
Deduplication and idempotency在分布式环境中 当生产者拥有至少一次保证。 这种行为背后的原因不在本文讨论范围之内 但是我们必须学习如何解决该问题。 一种方法是附加全局唯一标识符 UUID 到每封邮件 并确保在用户端不会重复处理具有相同标识符的邮件。 每事件有这样UUID。 我们要求下最直接的解决方案是简单地存储所有可见的UUID并确认收到的到达UUID以前从未见过。 使用ConcurrentHashMap UUID, UUID 没有并发哈希集在JDK中 按原样会导致内存泄漏 因为随着时间的推移 我们将不断积累越来越多的ID。 这就是为什么我们仅在最近10秒内查找重复项。 从技术上讲 您可以ConcurrentHashMap UUID, Instant 从UUID在遇到问题时加盖时间戳记。 通过使用后台线程 我们可以删除10秒钟以上的元素。 但是如果您是快乐的Guava用户 Cache UUID, UUID 使用声明式驱逐策略将达到目的
import com.codahale.metrics.Gauge;import com.codahale.metrics.Meter;import com.codahale.metrics.MetricRegistry;import com.google.common.cache.Cache;import com.google.common.cache.CacheBuilder;import java.util.UUID;import java.util.concurrent.TimeUnit;class IgnoreDuplicates implements EventConsumer { private final EventConsumer downstream; private Cache UUID, UUID seenUuids CacheBuilder.newBuilder() .expireAfterWrite(10, TimeUnit.SECONDS) .build(); IgnoreDuplicates(EventConsumer downstream) { this.downstream downstream; Override public Event consume(Event event) { final UUID uuid event.getUuid(); if (seenUuids.asMap().putIfAbsent(uuid, uuid) null) { return downstream.consume(event); } else { return event;
为了保证生产安全 我至少可以想到两个指标可能会变得有用 缓存大小和发现的重复项数量。 让我们也插入以下指标
class IgnoreDuplicates implements EventConsumer { private final EventConsumer downstream; private final Meter duplicates; private Cache UUID, UUID seenUuids CacheBuilder.newBuilder() .expireAfterWrite(10, TimeUnit.SECONDS) .build(); IgnoreDuplicates(EventConsumer downstream, MetricRegistry metricRegistry) { this.downstream downstream; duplicates metricRegistry.meter(MetricRegistry.name(IgnoreDuplicates.class, duplicates metricRegistry.register(MetricRegistry.name(IgnoreDuplicates.class, cacheSize ), (Gauge Long ) seenUuids::size); Override public Event consume(Event event) { final UUID uuid event.getUuid(); if (seenUuids.asMap().putIfAbsent(uuid, uuid) null) { return downstream.consume(event); } else { duplicates.mark(); return event;
最终 我们拥有了构建解决方案的所有要素。 这个想法是从EventConsumer实例彼此包裹
首先我们申请忽略重复项拒绝重复然后我们打电话SmartPool总是给定的clientId到同一线程并在该线程中执行下一阶段最后客户投影被调用以执行真正的业务逻辑。您可以选择放置FailOnConcurrentModification介于SmartPool和客户投影为了增加安全性 设计上不应同时进行修改
ClientProjection clientProjection new ClientProjection(new ProjectionMetrics(metricRegistry));FailOnConcurrentModification concurrentModification new FailOnConcurrentModification(clientProjection);SmartPool smartPool new SmartPool(12, concurrentModification, metricRegistry);IgnoreDuplicates withoutDuplicates new IgnoreDuplicates(smartPool, metricRegistry);EventStream es new EventStream();es.consume(withoutDuplicates);
我们花了很多工作才能提出相对简单且结构合理的 希望您同意 解决方案。 最后 解决并发问题的最佳方法是...避免并发 并在一个线程中运行受竞争条件约束的代码。 这也是Akka actor 每个actor处理单个消息 和RxJava 由订户 。
到目前为止 我们提出的是线程池和共享缓存的组合。 这次我们将使用RxJava实现解决方案。 首先 我从未透露过事件流实现 仅提供API
interface EventStream { void consume(EventConsumer consumer);
实际上 对于手动测试 我构建了一个简单的RxJava流 其行为与系统的要求类似
Slf4jclass EventStream { void consume(EventConsumer consumer) { observe() .subscribe( consumer::consume, e - log.error( Error emitting event , e) Observable Event observe() { return Observable .interval(1, TimeUnit.MILLISECONDS) .delay(x - Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS)) .map(x - new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID())) .flatMap(this::occasionallyDuplicate, 100) .observeOn(Schedulers.io()); private Observable Event occasionallyDuplicate(Event x) { final Observable Event event Observable.just(x); if (Math.random() 0.01) { return event; final Observable Event duplicated event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS); return event.concatWith(duplicated);
了解此模拟器的工作原理不是必不可少的 但很有趣。 首先 我们产生稳定的长值 0 1个 2... 每毫秒 每秒数千个事件 使用间隔 操作员。 然后 我们将每个事件延迟两次之间的随机时间0和1个_000微秒延迟 operator. This way events will appears in less predictable moments in time a bit more realistic situation. Finally we map (using ekhem 地图 操作员 每个长随机值事件与clientId介于1个_000和1个_1个00 包含在内 。
最后一点很有趣。 我们想模拟偶尔的重复。 为此 我们映射了每个事件 使用flatMap 在99 的情况下 。 但是 在1 的情况下 我们两次返回此事件 第二次发生在10毫秒至5秒后。 在实践中 事件的重复实例将在其他数百个事件之后出现 这使流的行为逼真。
有两种方法可以与事件流-通过进行回调消耗 并通过观察 。 我们可以利用Observable Event 快速建立功能与第1部分但要简单得多。
Missing backpressure利用RxJava的第一个幼稚方法很快就失败了
EventStream es new EventStream();EventConsumer clientProjection new ClientProjection( new ProjectionMetrics( new MetricRegistry()));es.observe() .subscribe( clientProjection::consume, e - log.error( Fatal error , e)
(ClientProjection, ProjectionMetrics et. al. come from part 1). We get MissingBackpressureException almost instantaneously and that was expected. Remember how our first solution was lagging by handling events with more and more latency?
RxJava tries to avoid that, as well as avoiding overflow of queues.
MissingBackpressureException is thrown because consumer (ClientProjection) is incapable of handling events in real time. This is fail-fast behavior. The quickest solution is to move consumption to a separate thread pool, just like before, but using RxJava s facilities:
EventStream es new EventStream();EventConsumer clientProjection new FailOnConcurrentModification( new ClientProjection( new ProjectionMetrics( new MetricRegistry())));es.observe() .flatMap(e - clientProjection.consume(e, Schedulers.io())) .window(1, TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c - log.info( Processed {} events/s , c), e - log.error( Fatal error , e)
EventConsumer接口具有帮助程序方法 该方法可以异步消耗所提供的事件排程器
FunctionalInterfaceinterface EventConsumer { Event consume(Event event); default Observable Event consume(Event event, Scheduler scheduler) { return Observable .fromCallable(() - this.consume(event)) .subscribeOn(scheduler);
通过使用使用事件flatMap 在一个单独的Scheduler.io 每个消耗都是异步调用的。 这次事件几乎是实时处理的 但是存在更大的问题。 我装饰客户投影与FailOnConcurrentModification因为某种原因。 事件彼此独立地消耗 因此可能发生两个事件同时发生clientId are processed concurrently. Not good. Luckily in RxJava solving this problem is much easier than与plain threads:
es.observe() .groupBy(Event::getClientId) .flatMap(byClient - byClient .observeOn(Schedulers.io()) .map(clientProjection::consume)) .window(1, TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c - log.info( Processed {} events/s , c), e - log.error( Fatal error , e)
有点改变了。 首先 我们将事件分组clientId。 这分裂单可观察的流进溪流。 每个子流命名客户代表与同一事件相关的所有事件clientId。 现在 如果我们映射到此子流 则可以确保事件与同一子流相关clientId永远不会同时处理。 外部流很懒 因此我们必须订阅它。 与其单独订阅每个事件 我们不每秒收集事件并进行计数。 这样 我们将收到一个类型的事件整数每秒代表每秒消耗的事件数。
Impure, non-idiomatic, error-prone, unsafe solution of deduplication using global state现在我们必须删除重复的UUIDs。 丢弃重复项的最简单但非常愚蠢的方法是利用全局状态。 我们可以通过在外部可用的缓存中查找重复项来简单地过滤掉重复项过滤 操作员
final Cache UUID, UUID seenUuids CacheBuilder.newBuilder() .expireAfterWrite(10, TimeUnit.SECONDS) .build();es.observe() .filter(e - seenUuids.getIfPresent(e.getUuid()) null) .doOnNext(e - seenUuids.put(e.getUuid(), e.getUuid())) .subscribe( clientProjection::consume, e - log.error( Fatal error , e)
如果要监视此机制的使用 只需添加指标
Meter duplicates metricRegistry.meter( duplicates es.observe() .filter(e - { if (seenUuids.getIfPresent(e.getUuid()) ! null) { duplicates.mark(); return false; } else { return true;
从操作员内部访问全局状态 尤其是可变状态非常危险 并且破坏了RxJava的唯一目的-简化并发。 显然我们使用线程安全快取从Guava中获取 但是在许多情况下 很容易错过从多个线程访问共享的全局可变状态的地方。 如果您发现自己在运算符链之外对某些变量进行了变异 请非常小心。
Custom distinct() operator in RxJava 1.xRxJava 1.x有一个不同 可以完成这项工作的运算子
es.observe() .distinct(Event::getUuid) .groupBy(Event::getClientId)
不幸不同 存储所有密钥 UUIDs 内部不断增长哈希集。 但是我们只关心最近10秒钟内的重复 通过复制粘贴实现独立运营商我创建Distinct事件利用Guava缓存的操作符仅存储最后10秒钟价值的UUID。 我故意硬编码事件使用此运算符 而不是使其变得更通用 使代码更易于理解
class DistinctEvent implements Observable.Operator Event, Event { private final Duration duration; DistinctEvent(Duration duration) { this.duration duration; Override public Subscriber ? super Event call(Subscriber ? super Event child) { return new Subscriber Event (child) { final Map UUID, Boolean keyMemory CacheBuilder.newBuilder() .expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS) . UUID, Boolean build().asMap(); Override public void onNext(Event event) { if (keyMemory.put(event.getUuid(), true) null) { child.onNext(event); } else { request(1); Override public void onError(Throwable e) { child.onError(e); Override public void onCompleted() { child.onCompleted();
用法非常简单 整个实现 加上自定义运算符 如下
es.observe() .lift(new DistinctEvent(Duration.ofSeconds(10))) .groupBy(Event::getClientId) .flatMap(byClient - byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) .window(1, TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c - log.info( Processed {} events/s , c), e - log.error( Fatal error , e)
实际上 如果您跳过每秒的日志记录 它甚至可以更短
es.observe() .lift(new DistinctEvent(Duration.ofSeconds(10))) .groupBy(Event::getClientId) .flatMap(byClient - byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) .subscribe( e - {}, e - log.error( Fatal error , e)
该解决方案比以前基于线程池和装饰器的解决方案要短得多。 唯一尴尬的部分是自定义运算符 可在存储太多历史记录时避免内存泄漏UUIDs。 幸运的是RxJava 2得以解救
RxJava 2.x and more powerful built-in distinct()我当时这个无法通过更强大的实现将PR提交给RxJava不同 操作员。 但是在我检查之前2.x分支 那里是 不同 允许提供自定义采集相对于硬编码哈希集. Believe it or not, dependency inversion is not only about Spring framework or Java EE. When a library allows you to provide custom implementation of its internal data structure, 这个 is also DI. First I create a helper method that can build Set UUID 受支持Map UUID, Boolean 受支持Cache UUID, Boolean 。 我们一定喜欢代表团
private Set UUID recentUuids() { return Collections.newSetFromMap( CacheBuilder.newBuilder() .expireAfterWrite(10, TimeUnit.SECONDS) . UUID, Boolean build() .asMap()
有了这种方法 我们可以使用以下表达式实现整个任务
es.observe() .distinct(Event::getUuid, this::recentUuids) .groupBy(Event::getClientId) .flatMap(byClient - byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) .subscribe( e - {}, e - log.error( Fatal error , e)
优雅 简单 清晰 它看起来像是一个问题
观察事件流仅考虑不同的UUID客户分组活动为每个客户消费 顺序希望您喜欢所有这些解决方案 并发现它们对您的日常工作很有用。
from: https://dev.to//tnurkiewicz/small-scale-stream-processing-kata-thread-pools
本文链接: http://bekata.immuno-online.com/view-785318.html