package com.curiousby.baoyou.cn.storm; import java.util.UUID; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; /** * @see com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology * @Type TerminalInfosAnalysisTopology.java * @Desc * @author cmcc-B100036 * @date 2016年12月15日 下午4:54:50 * @version */ public class TerminalInfosAnalysisTopology { private static String topicName = "baoy-topic"; private static String zkRoot = "/kafka" ; public static void main(String[] args) { BrokerHosts hosts = new ZkHosts( "172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka"); SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, zkRoot, UUID.randomUUID().toString()); spoutConfig.forceFromStart= false; spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); //spoutConfig.socketTimeoutMs=60; KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("\r\n"); SyncPolicy syncPolicy = new CountSyncPolicy(2); FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.HOURS); FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/user/hadoop/storm/").withPrefix("terminalInfo_").withExtension(".log"); HdfsBolt hdfsBolt = new HdfsBolt() .withFsUrl("hdfs://172.23.27.120:9000/") .withFileNameFormat(fileNameFormat).withRecordFormat(format) .withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", kafkaSpout); builder.setBolt("terminalInfosAnalysisIsValidBolt", new TerminalInfosAnalysisIsValidBolt(),1).shuffleGrouping("kafkaSpout"); builder.setBolt("terminalInfosAnalysisRedisBolt", new TerminalInfosAnalysisRedisBolt(),1).shuffleGrouping("terminalInfosAnalysisIsValidBolt"); builder.setBolt("terminalInfosAnalysisHdfsReportBolt", new TerminalInfosAnalysisHdfsReportBolt(),1).shuffleGrouping("terminalInfosAnalysisIsValidBolt"); builder.setBolt("terminalInfo", hdfsBolt,1).fieldsGrouping("terminalInfosAnalysisHdfsReportBolt",new Fields("hdfs-terminalinfo")); // builder.setBolt("terminalInfosAnalysisHdfsBolt", new TerminalInfosAnalysisHdfsBolt(),1).shuffleGrouping("terminalInfosAnalysisIsValidBolt"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(1); try { StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } else { conf.setMaxSpoutPending(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("terminalInfosAnalysisTopology", conf, builder.createTopology()); } } }
public class TerminalInfosAnalysisIsValidBolt extends BaseRichBolt { private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisIsValidBolt.class); private OutputCollector collector; @Override public void execute(Tuple tuple) { System.out.println(tuple.size()); logger.info("============================TerminalInfosAnalysisIsValidBolt execute==============================="); for (int i = 0; i < tuple.size(); i++) { JSONObject formate = TerminalInfos.formate(tuple.getString(i)); TerminalInfos entity = new TerminalInfos(); entity.formate(formate); if (entity != null && entity.isValid()) { System.out.println(entity); collector.emit(tuple, new Values(entity)); collector.ack(tuple); } } } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("after_isvalid")); } }
public class TerminalInfosAnalysisRedisBolt extends BaseRichBolt { private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisRedisBolt.class); private OutputCollector collector; JedisPool pool; @Override public void execute(Tuple tuple) { Jedis jedis = pool.getResource(); logger.info("============================TerminalInfosAnalysisRedisBolt execute==============================="); for (int i = 0; i < tuple.size(); i++) { TerminalInfos entity = (TerminalInfos) tuple.getValue(i); TerminalInfoHeader tih = entity.getTerminalInfoHeader(); String key = tih.getAppId()+"-"+tih.getDeviceToken(); String value = jedis.get(key); if (value == null || "".equals(value)) { // jedis.set( key, JSON.toJSONString(tih)); // insert es all infos }else{ //update es lastupdatetime } } } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { logger.info("============================redis prepare==============================="); this.collector = collector; JedisPoolConfig config = new JedisPoolConfig(); config.setMaxActive(1000); config.setMaxIdle(50); config.setMaxWait(1000l); config.setTestOnBorrow(false); this.pool = new JedisPool(config, "172.23.27.120", 6379); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
public class TerminalInfosAnalysisHdfsReportBolt extends BaseRichBolt { private Logger logger =LoggerFactory.getLogger(TerminalInfosAnalysisHdfsReportBolt.class); private OutputCollector collector; @Override public void execute(Tuple tuple) { logger.info("============================TerminalInfosAnalysisHdfsReportBolt execute==============================="); for (int i = 0; i < tuple.size(); i++) { TerminalInfos entity = (TerminalInfos) tuple.getValue(i); TerminalInfoHeader tih = entity.getTerminalInfoHeader(); StringBuffer sb = new StringBuffer(); sb.append(tih.getAppId()).append(","); sb.append(tih.getDeviceMac()).append(","); sb.append(tih.getDeviceId()).append(","); sb.append(tih.getDeviceToken()).append(","); sb.append(tih.getDeviceImsi()).append(","); sb.append(tih.getDeviceModel()).append(","); sb.append(tih.getDeviceManufacture()).append(","); sb.append(tih.getChannel()).append(","); sb.append(tih.getAppKey()).append(","); sb.append(tih.getUserId()).append(","); sb.append(tih.getAppVersion()).append(","); sb.append(tih.getVersionCode()).append(","); sb.append(tih.getSdkType()).append(","); sb.append(tih.getOs()).append(","); sb.append(tih.getCountry()).append(","); sb.append(tih.getLanguage()).append(","); sb.append(tih.getTimezone()).append(","); sb.append(tih.getResolution()).append(","); sb.append(tih.getAccess()).append(","); sb.append(tih.getAccessSubtype()).append(","); sb.append(tih.getCarrier()).append(","); sb.append(tih.getCpu()); collector.emit(tuple, new Values("hdfs-terminalinfo",sb.toString())); collector.ack(tuple); } } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hdfs-terminalinfo", "record")); } }
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.curiousby.baoy.cn</groupId> <artifactId>KafkaStormJavaDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>SpringKafkaStormDemo</name> <url>http://maven.apache.org</url> <!-- properties constant --> <properties> <spring.version>4.2.5.RELEASE</spring.version> <java.version>1.7</java.version> <log4j.version>1.2.17</log4j.version> </properties> <repositories> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.4</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.4</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> <version>0.9.4</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- json start --> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1.1</version> </dependency> <!-- JSON转化 --> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.13</version> </dependency> <!-- JSON库 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.1.23</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20160810</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.2.3</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.2.3</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.2.3</version> </dependency> <!-- <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.4</version> <type>jar</type> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.4</version> <type>jar</type> </dependency> --> <!-- Other Dependency --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.2.0</version> </dependency> <!-- hdfs start --> <!-- <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>2.2.0</version> </dependency> --> <!-- <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.5</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> --> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.7</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> </dependencies> <build> <finalName>SpringKafkaStormDemo</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <dependencies> <dependency> <groupId>org.codehaus.plexus</groupId> <artifactId>plexus-compiler-javac</artifactId> <version>2.5</version> </dependency> </dependencies> <configuration> <source>1.7</source> <target>1.7</target> <encoding>UTF-8</encoding> <compilerArguments> <verbose /> <bootclasspath>${java.home}/lib/rt.jar:${java.home}/lib/jce.jar</bootclasspath> </compilerArguments> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <appendAssemblyId>false</appendAssemblyId> <finalName>${project.artifactId}_TerminalInfosAnalysisTopology_main_start</finalName> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <finalName>${project.artifactId}_main_start</finalName> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.curiousby.baoyou.cn.storm.TerminalInfosAnalysisTopology</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
捐助开发者
在兴趣的驱动下,写一个免费
的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。
谢谢您的赞助,我会做的更好!
相关推荐
lnmp(linux+nginx+mysql+php)安装配置及分布式系统大数据处理hadoop集群中的flume+Kafka+Storm+HDFS等实时系统搭分享
kafka-storm-hdfs这个项目有3个小项目分别是kafka 到 storm ; storm 到 hdfs ; kafka ~ storm ~ hdfsstorm 到hdfs 需要手动添加这几个类到storm的 lib 下commons-cli-1.2.jarcommons-collections-3.2.1....
直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边...
此项目不再维护有关最新信息,请参阅 。Storm-kafka-hdfs-starter 提供使用 KafkaSpout 和 HdfsBolt 的示例
对于离线处理,hadoop还是比较适合的,但是对于实时性比较强的,数据量比较大的,我们可以采用Storm,那么Storm和什么技术搭配,才能够做一个适合自己的项目。下面给大家可以参考。1.一个好的项目架构应该具备什么...
- emsite采用dubbo作为服务层框架,后台将集成单点登录、oauth2.0、storm+kafka消息处理系统、kafka+ flume+storm+hdfs+hadoop作为日志分析系统、配置中心、分布式任务调度系统、服务器实时监控系统、搜索引擎系统...
日志分析器-分析大数据组件的客户日志,例如HDFS,Hive,HBase,Yarn,MapReduce,Storm,Spark,Spark 2,Knox,Ambari Metrics,Nifi,Accumulo,Kafka,Flume,Oozie,Falcon,Atlas和Zookeeper。 内部架构 分析...
emsite框架是一个分布式的后台全自动快速开发框架,采用dubbo作为服务层框架,后台将集成单点登录、Auth2.0、storm+kafka消息处理系统、kafka+ flume+storm+hdfs+hadoop作为日志分析系统、配置中心、分布式任务调度...
Storm3--Hbase-HDFS-Hive-from-HortonWorks Storm3-来自 HortonWorks 的 Hbase HDFS Hive ====================== 参考来自 Horton Works 教程 ==================== Storm 和 Hadoop 生态系统版本 =============...
2、结合数据传输功能可以把收集到的日志信息实时传输到kafka集群,或保存到Hadoophdfs中保存。这里之所以选择kafka集群是因为kafka集群具备缓冲功能,可以防止数据采集速度和数据处理速度不匹配导致数据丢失,这样做...
有的人用这个骗钱,或者上传的都是残缺资源,我这免费给你们真实可用的资源,还免费的,让那些靠卖别人知识赚钱的人回家吧 -------------------课程目录------------------- ...第七天 storm+kafka 第八天 实战项目
包括Hadoop、Hive、Spark、Storm、Flink、HBase、Kafka、Zookeeper、Flume、Sqoop等技术的学习 Hadoop 分布式文件存储系统 —— HDFS 分布式计算框架 —— MapReduce 集群资源管理器 —— YARN Hadoop 单机伪集群...
KafkaSpout Hdfs螺栓 蜂巢螺栓 MongoBolt 依存关系: 这些示例取决于项目,该项目已添加到pom中。 设置: 克隆项目 cd /tmp && git clone https://github.com/sakserv/storm-topology-examples.git 建立项目 ...
stormcore.jar是一款综合编程类应用程序,提供Apache Kafka集成改进、Storm SQL数据库、PMML(预测模型标记语言)支持、HDFS喷口等功能,是用户实现汇编语言的有效助手
006-kafka整合storm.avi 01-storm基本概念.avi 02-storm编程规范及demo编写.avi 03-storm的topology提交执行.avi 04-kafka介绍.avi 05-kafuka集群部署及客户端编程.avi 06-kafka消费者java客户端编程.avi ...
channel.type=memory #内存channel #sink详细配置 exec-memory-kafka.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink #类型 为kafka sink exec-memory-kafka.sinks.kafka-sink.brokerList=hadoop000:...
Syslog-> Flume-> Kafka-> Storm-> Hive 这是一个简单的演示项目,显示了从kafka主题收集风暴日志并将其推入HDFS的过程。 它还每n分钟将文件旋转到一个配置单元分区中。 入门 使用HDP 2.2.4,或在pom中相应地调整...
HDFS,HBase,Hive,Zookeep,Yarn,HadoopLoader,Storm,Kafka,Spark等命令实例,系统全面。
一个资源搞懂大数据所有内容,内含linux基础,hbase,kafka,spark,hdfs等大数据技术.