分布式系统

流式计算之Storm(zz)

2011年11月20日 阅读(189)

zz from:http://blog.sina.com.cn/s/blog_406d9bb00100ui5p.html

Storm简介

Storm是一个分布式的、容错的实时计算系统,遵循Eclipse Public License 1.0,Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。可以使用任意编程语言来做开发。
主要商业应用及案例:Twitter
Storm的优点
1. 简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。
2. 服务化,一个服务框架,支持热部署,即时上线或下线App.
3. 可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
4. 容错性。Storm会管理工作进程和节点的故障。
5. 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
6. 可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。
7. 快速。系统的设计保证了消息能得到快速的处理,使用ZeroMQ作为其底层消息队列。
8. 本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。
Storm目前存在的问题

1. 目前的开源版本中只是单节点Nimbus,挂掉只能自动重启,可以考虑实现一个双nimbus的布局。
2. Clojure是一个在JVM平台运行的动态函数式编程语言,优势在于流程计算, Storm的部分核心内容由Clojure编写,虽然性能上提高不少但同时也提升了维护成本。

Storm架构

Storm集群由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。每个工作节点都运行了一个名为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。Nimbus和Supervisor都能快速失败,而且是无状态的,这样一来它们就变得十分健壮,两者的协调工作是由Zookeeper来完成的。ZooKeeper用于管理集群中的不同组件,ZeroMQ是内部消息系统,JZMQ是ZeroMQMQ的Java Binding。有个名为storm-deploy的子项目,可以在AWS上一键部署Storm集群.

http://b2b-doc.alibaba-inc.com/download/attachments/59153862/cluster.jpg?version=1&modificationDate=1318936760000

 

Storm术语解释

Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的 线程。Worker是运行这些线程的进程。Stream Grouping规定了Bolt接收什么东西作为输入数据。数据可以随机分配(术语为Shuffle),或者根据字段值分配(术语为Fields),或者 广播(术语为All),或者总是发给一个Task(术语为Global),也可以不关心该数据(术语为None),或者由自定义逻辑来决定(术语为 Direct)。Topology是由Stream Grouping连接起来的Spout和Bolt节点网络.下面进行详细介绍:

Topologies 用于封装一个实时计算应用程序的逻辑,类似于Hadoop的MapReduce Job

http://b2b-doc.alibaba-inc.com/download/attachments/59153862/topo.jpg?version=1&modificationDate=1318936822000

Stream 消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地创建和处理
Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple
Bolts 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生输出的新数据流,可执行过滤,聚合,查询数据库等操作

http://b2b-doc.alibaba-inc.com/download/attachments/59153862/Bolts.jpg?version=1&modificationDate=1318936793000

Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.

http://b2b-doc.alibaba-inc.com/download/attachments/59153862/task.jpg?version=1&modificationDate=1318936806000

Stream groupings 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该如果分配给Bolts上面的多个Tasks.

stream grouping分类

1. Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同.
2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts.
3. All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到.
4. Global Grouping: 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task.再具体一点就是分配给id值最低的那个task.
5. Non Grouping: 不分组,意思是说stream不关心到底谁会收到它的tuple.目前他和Shuffle grouping是一样的效果,有点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程去执行.
6. Direct Grouping: 直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息.只有被声明为Direct Stream的消息流可以声明这种分组方法.而且这种消息tuple必须使用emitDirect方法来发射.消息处理者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)

Storm如何保证消息被处理

storm保证每个tuple会被topology完整的执行。storm会追踪由每个spout tuple所产生的tuple树(一个bolt处理一个tuple之后可能会发射别的tuple从而可以形成树状结构), 并且跟踪这棵tuple树什么时候成功处理完。每个topology都有一个消息超时的设置, 如果storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功, 那么topology会把这个tuple标记为执行失败,并且过一会会重新发射这个tuple。

一个tuple能根据新获取到的spout而触发创建基于此的上千个tuple

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",

                                     22133,

                                     "sentence_queue",

                                     new StringScheme()));

builder.setBolt(2, new SplitSentence(), 10)

        .shuffleGrouping(1);

builder.setBolt(3, new WordCount(), 20)

        .fieldsGrouping(2, new Fields("word"));

这个topology从kestrel queue读取句子,并把句子划分成单词,然后汇总每个单词出现的次数,一个tuple负责读取句子,每一个tuple分别对应计算每一个单词出现的次数,大概样子如下所示:

http://b2b-doc.alibaba-inc.com/download/attachments/59153862/wordcount.jpg?version=2&modificationDate=1318936897000

一个tuple的生命周期:

public interface ISpout extends Serializable {

    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

    void close();

    void nextTuple();

    void ack(Object msgId);

    void fail(Object msgId);

}

首先storm通过调用spout的nextTuple方法来获取下一个tuple, Spout通过open方法参数里面提供的SpoutOutputCollector来发射新tuple到它的其中一个输出消息流, 发射tuple的时候spout会提供一个message-id, 后面我们通过这个tuple-id来追踪这个tuple。举例来说, KestrelSpout从kestrel队列里面读取一个消息,并且把kestrel提供的消息id作为message-id, 看例子:

collector.emit(new Values("field1", "field2", 3) , msgId);

 

接下来, 这个发射的tuple被传送到消息处理者bolt那里, storm会跟踪这个消息的树形结构是否创建,根据messageid调用Spout里面的ack函数以确认tuple是否被完全处理。如果tuple超时就会调用spout的fail方法。由此看出同一个tuple不管是acked还是fail都是由创建他的那个spout发出的,所以即使spout在集群环境中执行了很多的task,这个tule也不会被不同的task去acked或failed.
当kestrelspout从kestrel队列中得到一个消息后会打开这个他,这意味着他并不会把此消息拿走,消息的状态会显示为pending,直到等待确认此消息已经处理完成,处于pending状态直到ack或者fail被调用,处于"Pending"的消息不会再被其他队列消费者使用.如果在这过程中spout中处理此消息的task断开连接或失去响应则此pending的消息会回到"等待处理"状态.

 

Storm的一些常用应用场景

1.流聚合
流聚合把两个或者多个数据流聚合成一个数据流 — 基于一些共同的tuple字段。

builder.setBolt(5, new MyJoiner(), parallelism)

  .fieldsGrouping(1, new Fields("joinfield1", "joinfield2"))

  .fieldsGrouping(2, new Fields("joinfield1", "joinfield2"))

  .fieldsGrouping(3, new Fields("joinfield1", "joinfield2")) 

2.批处理
有时候为了性能或者一些别的原因, 你可能想把一组tuple一起处理, 而不是一个个单独处理。

3.BasicBolt
1. 读一个输入tuple
2. 根据这个输入tuple发射一个或者多个tuple
3. 在execute的方法的最后ack那个输入tuple
遵循这类模式的bolt一般是函数或者是过滤器, 这种模式太常见,storm为这类模式单独封装了一个接口: IbasicBolt

4.内存内缓存+Fields grouping组合
在bolt的内存里面缓存一些东西非常常见。缓存在和fields grouping结合起来之后就更有用了。比如,你有一个bolt把短链接变成长链接(bit.ly, t.co之类的)。你可以把短链接到长链接的对应关系利用LRU算法缓存在内存里面以避免重复计算。比如组件一发射短链接,组件二把短链接转化成长链接并缓存在内存里面。看一下下面两段代码有什么不一样:

builder.setBolt(2, new ExpandUrl(), parallelism)

  .shuffleGrouping(1);

builder.setBolt(2, new ExpandUrl(), parallelism)

  .fieldsGrouping(1, new Fields("url"));

5.计算top N
比如你有一个bolt发射这样的tuple: "value", "count"并且你想一个bolt基于这些信息算出top N的tuple。最简单的办法是有一个bolt可以做一个全局的grouping的动作并且在内存里面保持这top N的值。
这个方式对于大数据量的流显然是没有扩展性的, 因为所有的数据会被发到同一台机器。一个更好的方法是在多台机器上面并行的计算这个流每一部分的top N, 然后再有一个bolt合并这些机器上面所算出来的top N以算出最后的top N, 代码大概是这样的:

builder.setBolt(2, new RankObjects(), parallellism)

  .fieldsGrouping(1, new Fields("value"));

builder.setBolt(3, new MergeObjects())

  .globalGrouping(2);

这个模式之所以可以成功是因为第一个bolt的fields grouping使得这种并行算法在语义上是正确的。
用TimeCacheMap来高效地保存一个最近被更新的对象的缓存

6.用TimeCacheMap来高效地保存一个最近被更新的对象的缓存
有时候你想在内存里面保存一些最近活跃的对象,以及那些不再活跃的对象。 TimeCacheMap 是一个非常高效的数据结构,它提供了一些callback函数使得我们在对象不再活跃的时候我们可以做一些事情.

7.分布式RPC:CoordinatedBolt和KeyedFairBolt
用storm做分布式RPC应用的时候有两种比较常见的模式:它们被封装在CoordinatedBolt和KeyedFairBolt里面. CoordinatedBolt包装你的bolt,并且确定什么时候你的bolt已经接收到所有的tuple,它主要使用Direct Stream来做这个.
KeyedFairBolt同样包装你的bolt并且保证你的topology同时处理多个DRPC调用,而不是串行地一次只执行一个。

 

流式计算之Storm_wordcount例子

一.构建maven开发环境
为了开发storm topology, 你需要把storm相关的jar包添加到classpath里面去:要么手动添加所有相关的jar包, 要么使用maven来管理所有的依赖。storm的jar包发布在Clojars(一个maven库), 如果你使用maven的话,把下面的配置添加在你项目的pom.xml里面。
<repository>
    <id>clojars.org</id>
    <url>http://clojars.org/repo</url>
</repository>
<dependency>
     <groupId>storm</groupId>
     <artifactId>storm</artifactId>
     <version>0.5.3</version>
     <scope>test</scope>
</dependency>
二.代码范例
1.Topology 入口点 RollingTopWords ————类似于hadoop的Job定义
本地模式(嵌入Local):
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.starter.bolt.MergeObjects;
import storm.starter.bolt.RankObjects;
import storm.starter.bolt.RollingCountObjects;
 
public class RollingTopWords {
    public static void main(String[] args) throws Exception {
 
        final int TOP_N = 3;
 
        TopologyBuilder builder = new TopologyBuilder();
 
        builder.setSpout(1, new TestWordSpout(), 5);
 
        builder.setBolt(2, new RollingCountObjects(60, 10), 4)
                 .fieldsGrouping(1, new Fields("word"));
        builder.setBolt(3, new RankObjects(TOP_N), 4)
                 .fieldsGrouping(2, new Fields("obj"));
        builder.setBolt(4, new MergeObjects(TOP_N))
                 .globalGrouping(3);
        Config conf = new Config();
        conf.setDebug(true);
 
        LocalCluster cluster = new LocalCluster(); // 本地模式启动集群
        cluster.submitTopology("rolling-demo", conf, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();
    }
}
 
部署模式:
package storm.starter;
import storm.starter.bolt.MergeObjects;
import storm.starter.bolt.RankObjects;
import storm.starter.bolt.RollingCountObjects;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
 
public class RollingTopWords {
    public static void main(String[] args) throws Exception {
 
        final int TOP_N = 3;
 
        TopologyBuilder builder = new TopologyBuilder();
 
        builder.setSpout(1, new TestWordSpout(), 5);
 
        builder.setBolt(2, new RollingCountObjects(60, 10), 4).fieldsGrouping(
                1, new Fields("word"));
        builder.setBolt(3, new RankObjects(TOP_N), 4).fieldsGrouping(2,
                new Fields("obj"));
        builder.setBolt(4, new MergeObjects(TOP_N)).globalGrouping(3);
 
        Config conf = new Config();
        conf.setDebug(true);
        conf.setNumWorkers(20);
        conf.setMaxSpoutPending(5000);
 
        StormSubmitter.submitTopology("demo", conf,
                builder.createTopology());
        Thread.sleep(10000);
    }
}
 
2. 直接使用内置的TestWordSpout(随机产生一个word)
TestWordSpout
package backtype.storm.testing;
 
import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Random;
import org.apache.log4j.Logger;
 
 
public class TestWordSpout implements IRichSpout {
    public static Logger LOG = Logger.getLogger(TestWordSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;
 
    public TestWordSpout() {
        this(true);
    }
 
    public TestWordSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }
 
    public boolean isDistributed() {
        return _isDistributed;
    }
 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }
 
    public void close() {
 
    }
 
    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }
 
    public void ack(Object msgId) {
 
    }
 
    public void fail(Object msgId) {
 
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
 
3.各环节处理Bolt
RollingCountObjects 滚动计数word,并通过定时触发时间,清空计数列表
package storm.starter.bolt;
 
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
 
@SuppressWarnings("serial")
public class RollingCountObjects implements IRichBolt {
 
    private HashMap<Object, long[]> _objectCounts = new HashMap<Object, long[]>();
    private int _numBuckets;
    private transient Thread cleaner;
    private OutputCollector _collector;
    private int _trackMinutes;
 
    public RollingCountObjects(int numBuckets, int trackMinutes) {
        _numBuckets = numBuckets;
        _trackMinutes = trackMinutes;
    }
 
    public long totalObjects (Object obj) {
        long[] curr = _objectCounts.get(obj);
        long total = 0;
        for (long l: curr) {
            total+=l;
        }
        return total;
    }
 
    public int currentBucket (int buckets) {
        return (currentSecond()  / secondsPerBucket(buckets)) % buckets;
    }
 
    public int currentSecond() {
        return (int) (System.currentTimeMillis() / 1000);
    }
 
    public int secondsPerBucket(int buckets) {
        return (_trackMinutes * 60 / buckets);
    }
 
    public long millisPerBucket(int buckets) {
        return (long) secondsPerBucket(buckets) * 1000;
    }
 
    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
        cleaner = new Thread(new Runnable() {
            @SuppressWarnings("unchecked")
            public void run() {
                Integer lastBucket = currentBucket(_numBuckets);
 
 
                while(true) {
                  int currBucket = currentBucket(_numBuckets);
                  if(currBucket!=lastBucket) {
                      int bucketToWipe = (currBucket + 1) % _numBuckets;
                      synchronized(_objectCounts) {
                          Set objs = new HashSet(_objectCounts.keySet());
                          for (Object obj: objs) {
                            long[] counts = _objectCounts.get(obj);
                            long currBucketVal = counts[bucketToWipe];
                            counts[bucketToWipe] = 0; //  *这行代码很关键*
                            long total = totalObjects(obj);
                            if(currBucketVal!=0) {
                                _collector.emit(new Values(obj, total));
                            }
                            if(total==0) {
                                _objectCounts.remove(obj);
                            }
                          }
                      }
                      lastBucket = currBucket;
                  }
                  long delta = millisPerBucket(_numBuckets) – (System.currentTimeMillis() % millisPerBucket(_numBuckets));
                  Utils.sleep(delta);
                }
            }
        });
        cleaner.start();
    }
 
    public void execute(Tuple tuple) {
 
        Object obj = tuple.getValue(0);
        int bucket = currentBucket(_numBuckets);
        synchronized(_objectCounts) {
            long[] curr = _objectCounts.get(obj);
            if(curr==null) {
                curr = new long[_numBuckets];
                _objectCounts.put(obj, curr);
            }
            curr[bucket]++;
            _collector.emit(new Values(obj, totalObjects(obj)));
            _collector.ack(tuple);
        }
    }
 
    public void cleanup() {
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("obj", "count"));
    }
 
}
 
RankObjects
package storm.starter.bolt;
 
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.json.simple.JSONValue;
 
 
@SuppressWarnings("serial")
public class RankObjects implements IBasicBolt {
 
    @SuppressWarnings("rawtypes")
    List<List> _rankings = new ArrayList<List>();
 
    int _count;
    Long _lastTime = null;
 
    public RankObjects(int n) {
        _count = n;
    }
 
 
    @SuppressWarnings("rawtypes")
    private int _compare(List one, List two) {
 
        long valueOne = (Long) one.get(1);
        long valueTwo = (Long) two.get(1);
 
        long delta = valueTwo – valueOne;
        if(delta > 0) {
            return 1;
        } else if (delta < 0) {
            return -1;
        } else {
            return 0;
        }
 
    } //end compare
 
    private Integer _find(Object tag) {
        for(int i = 0; i < _rankings.size(); ++i) {
 
            Object cur = _rankings.get(i).get(0);
            if (cur.equals(tag)) {
                return i;
            }
 
        }
 
        return null;
 
    }
 
    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context) {
 
    }
 
    @SuppressWarnings("rawtypes")
    public void execute(Tuple tuple, BasicOutputCollector collector) {
 
        Object tag = tuple.getValue(0);
 
 
        Integer existingIndex = _find(tag);
        if (null != existingIndex) {
            _rankings.set(existingIndex, tuple.getValues());
        } else {
 
            _rankings.add(tuple.getValues());
 
 
        }
 
 
        Collections.sort(_rankings, new Comparator<List>() {
            public int compare(List o1, List o2) {
                return _compare(o1, o2);
            }
        });
 
 
        if (_rankings.size() > _count) {
            _rankings.remove(_count);
        }
 
        long currentTime = System.currentTimeMillis();
        if(_lastTime==null || currentTime >= _lastTime + 2000) {
            collector.emit(new Values(JSONValue.toJSONString(_rankings)));
            _lastTime = currentTime;
        }
    }
 
    public void cleanup() {
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("list"));
    }
 
}
MergeObjects 对排序结果进行归并
package storm.starter.bolt;
 
import org.apache.log4j.Logger;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.json.simple.JSONValue;
 
 
@SuppressWarnings("serial")
public class MergeObjects implements IBasicBolt {
    public static Logger LOG = Logger.getLogger(MergeObjects.class);
 
    @SuppressWarnings({ "rawtypes", "unchecked" })
    private List<List> _rankings = new ArrayList();
    int _count = 10;
    Long _lastTime;
 
    public MergeObjects(int n) {
        _count = n;
    }
 
 
    @SuppressWarnings("rawtypes")
    private int _compare(List one, List two) {
 
        long valueOne = (Long) one.get(1);
        long valueTwo = (Long) two.get(1);
 
        long delta = valueTwo – valueOne;
        if(delta > 0) {
            return 1;
        } else if (delta < 0) {
            return -1;
        } else {
            return 0;
        }
 
    } //end compare
 
    private Integer _find(Object tag) {
        for(int i = 0; i < _rankings.size(); ++i) {
 
            Object cur = _rankings.get(i).get(0);
            if (cur.equals(tag)) {
                return i;
            }
 
        }
 
        return null;
 
    }
 
 
    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context) {
 
    }
 
    @SuppressWarnings({ "unchecked", "rawtypes" })
    public void execute(Tuple tuple, BasicOutputCollector collector) {
 
 
        List<List> merging = (List) JSONValue.parse(tuple.getString(0));
        for(List pair : merging) {
 
            Integer existingIndex = _find(pair.get(0));
            if (null != existingIndex) {
                _rankings.set(existingIndex, pair);
            } else {
 
                _rankings.add(pair);
 
            }
 
            Collections.sort(_rankings, new Comparator<List>() {
                public int compare(List o1, List o2) {
                    return _compare(o1, o2);
                }
            });
 
 
            if (_rankings.size() > _count) {
                _rankings.subList(_count, _rankings.size()).clear();
            }
 
        }
 
        long currentTime = System.currentTimeMillis();
        if(_lastTime==null || currentTime >= _lastTime + 2000) {
            String fullRankings = JSONValue.toJSONString(_rankings);
            collector.emit(new Values(fullRankings));
            LOG.info("Rankings: " + fullRankings);
            _lastTime = currentTime;
        }
    }
 
    public void cleanup() {
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("list"));
    } 

You Might Also Like