博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Apache Storm技术实战之2 -- BasicDRPCTopology
阅读量:6203 次
发布时间:2019-06-21

本文共 5313 字,大约阅读时间需要 17 分钟。

欢迎转载,转载请注明出处,徽沪一郎.

本文通过BasicDRPCTopology的实例来分析DRPCTopology在提交的时候, Topology中究竟含有哪些内容?

BasicDRPCTopology

main函数

DRPC 分布式远程调用(这个说法有意思,远程调用本来就是分布的,何须再加个D, <头文字D>看多了, :)

 

public static void main(String[] args) throws Exception {    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");    builder.addBolt(new ExclaimBolt(), 3);    Config conf = new Config();    if (args == null || args.length == 0) {      LocalDRPC drpc = new LocalDRPC();      LocalCluster cluster = new LocalCluster();      cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));      for (String word : new String[]{ "hello", "goodbye" }) {        System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));      }      cluster.shutdown();      drpc.shutdown();    }    else {      conf.setNumWorkers(3);      StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());    }  }

 

问题: 上面的代码中只是添加了一个bolt,并没有设定Spout. 我们知道一个topology中最起码得有一个Spout,那么这里的Spout又隐身于何处呢?

关键的地方就在builder.createLocalTopology, 调用关系如下

  • LinearDRPCTopologyBuilder::createLocalTopology
    •   LinearDRPCTopologyBuilder::createTopology()
      •   LinearDRPCTopologyBuilder::createTopology(new DRPCSpout(_function))

       

原来DRPCTopology中使用的Spout是DRPCSpout.

LinearDRPCTopology::createTopology

既然代码已经读到此处,何不再进一步看看createTopology的实现.

简要说明一下该段代码的处理逻辑:

  1. 设置DRPCSpout
  2. 以bolt为入参,创建CoordinatedBolt
  3. 添加JoinResult Bolt
  4. 添加ReturnResult Bolt: ReturnResultBolt连接到DRPCServer,并返回结果
private StormTopology createTopology(DRPCSpout spout) {        final String SPOUT_ID = "spout";        final String PREPARE_ID = "prepare-request";                TopologyBuilder builder = new TopologyBuilder();        builder.setSpout(SPOUT_ID, spout);        builder.setBolt(PREPARE_ID, new PrepareRequest())                .noneGrouping(SPOUT_ID);        int i=0;        for(; i<_components.size();i++) {            Component component = _components.get(i);                        Map
source = new HashMap
(); if (i==1) { source.put(boltId(i-1), SourceArgs.single()); } else if (i>=2) { source.put(boltId(i-1), SourceArgs.all()); } IdStreamSpec idSpec = null; if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) { idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM); } BoltDeclarer declarer = builder.setBolt( boltId(i), new CoordinatedBolt(component.bolt, source, idSpec), component.parallelism); for(Map conf: component.componentConfs) { declarer.addConfigurations(conf); } if(idSpec!=null) { declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request")); } if(i==0 && component.declarations.isEmpty()) { declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM); } else { String prevId; if(i==0) { prevId = PREPARE_ID; } else { prevId = boltId(i-1); } for(InputDeclaration declaration: component.declarations) { declaration.declare(prevId, declarer); } } if(i>0) { declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); } } IRichBolt lastBolt = _components.get(_components.size()-1).bolt; OutputFieldsGetter getter = new OutputFieldsGetter(); lastBolt.declareOutputFields(getter); Map
streams = getter.getFieldsDeclaration(); if(streams.size()!=1) { throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology"); } String outputStream = streams.keySet().iterator().next(); List
fields = streams.get(outputStream).get_output_fields(); if(fields.size()!=2) { throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result."); } builder.setBolt(boltId(i), new JoinResult(PREPARE_ID)) .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0))) .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request")); i++; builder.setBolt(boltId(i), new ReturnResults()) .noneGrouping(boltId(i-1)); return builder.createTopology(); }

 

Bolt

处理逻辑: 在接收到的每一个单词后面添加'!'.

public static class ExclaimBolt extends BaseBasicBolt {    @Override    public void execute(Tuple tuple, BasicOutputCollector collector) {      String input = tuple.getString(1);      collector.emit(new Values(tuple.getValue(0), input + "!"));    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {      declarer.declare(new Fields("id", "result"));    }  }

运行

java -cp $(lein classpath) storm.starter.BasicDRPCTopology

 

 

转载于:https://www.cnblogs.com/hseagle/p/3511185.html

你可能感兴趣的文章
2. Rust的三板斧 安全,迅速,并发
查看>>
不同网段数据转发过程
查看>>
Linux教程-使用truss、strace或ltrace诊断软件
查看>>
java-第十三章-类的无参方法(一)-根据三角形的三条边长,判断是直角,锐角还是钝角三角形...
查看>>
浅入浅出Android(000):Hello Android
查看>>
MYSQL 数据库导入导出命令
查看>>
利用组策略部署软件全攻略之一
查看>>
PHP学习之旅(一)
查看>>
linux系统内核参数说明
查看>>
check_postgres脚本集
查看>>
网站编辑要失业了!
查看>>
magento清空产品和分类数据
查看>>
我理解的--java适配器模式
查看>>
Apache ActiveMQ 官方文档中文版
查看>>
AOP面向切面编程
查看>>
linux-shell面试题 之二
查看>>
基于Spring Boot的数据缓存
查看>>
JVM - 类加载机制(一)
查看>>
Spring单元测试
查看>>
select,iocp,epoll,kqueue及各种I/O复用机制
查看>>