Flume not processing keywords from Twitter source with flume-ng with Hadoop 2.5 cdh5.3 -


i trying process twitter keywords memchannel , hdfs. flume-ng not showing further progress after hdfs started status on console.

here /etc/flume-ns/conf/flume-env.sh file contents.

# licensed apache software foundation (asf) under 1 # or more contributor license agreements.  see notice file # distributed work additional information # regarding copyright ownership.  asf licenses file # under apache license, version 2.0 (the # "license"); may not use file except in compliance # license.  may obtain copy of license @ #     http://www.apache.org/licenses/license-2.0 # unless required applicable law or agreed in writing, software # distributed under license distributed on "as is" basis, # without warranties or conditions of kind, either express or implied. # see license specific language governing permissions , # limitations under license.  # if file placed @ flume_conf_dir/flume-env.sh, sourced during flume startup. # environment variables can set here.  export java_home=/usr/java/jdk1.7.0_67-cloudera  # give flume more memory , pre-allocate, enable remote monitoring via jmx # export java_opts="-xms100m -xmx2000m -dcom.sun.management.jmxremote" # note flume conf directory included in classpath. #flume_classpath="" 

here twitter configuration file contents.

twitteragent.channels = memchannel twitteragent.sinks = hdfs  #twitteragent.sources.twitter.type = com.cloudera.flume.source.twittersource twitteragent.sources.twitter.type = org.apache.flume.source.twitter.twittersource twitteragent.sources.twitter.channels = memchannel  twitteragent.sources.twitter.consumerkey = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx twitteragent.sources.twitter.consumersecret = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx twitteragent.sources.twitter.accesstoken = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx twitteragent.sources.twitter.accesstokensecret = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx  twitteragent.sources.twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientist, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing  twitteragent.sinks.hdfs.channel = memchannel twitteragent.sinks.hdfs.type = hdfs twitteragent.sinks.hdfs.hdfs.path = hdfs://uat.cloudera:8020/user/root/flume/ twitteragent.sinks.hdfs.hdfs.filetype = datastream twitteragent.sinks.hdfs.hdfs.writeformat = text twitteragent.sinks.hdfs.hdfs.batchsize = 1000 twitteragent.sinks.hdfs.hdfs.rollsize = 0 twitteragent.sinks.hdfs.hdfs.rollcount = 10000  twitteragent.channels.memchannel.type = memory twitteragent.channels.memchannel.capacity = 10000 twitteragent.channels.memchannel.transactioncapacity = 100 

i running below command on centos console.

flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/twitter.conf -n twitteragent -dflume.root.logger=info,console 

when run command here output.

info: sourcing environment configuration script /etc/flume-ng/conf/flume-env.sh info: including hadoop libraries found via (/usr/bin/hadoop) hdfs access info: excluding /usr/lib/hadoop/lib/slf4j-api-1.7.5.jar classpath info: excluding /usr/lib/hadoop/lib/slf4j-log4j12.jar classpath info: including hbase libraries found via (/usr/bin/hbase) hbase access info: excluding /usr/lib/hbase/bin/../lib/slf4j-api-1.7.5.jar classpath info: excluding /usr/lib/hbase/bin/../lib/slf4j-log4j12.jar classpath info: excluding /usr/lib/hadoop/lib/slf4j-api-1.7.5.jar classpath info: excluding /usr/lib/hadoop/lib/slf4j-log4j12.jar classpath info: excluding /usr/lib/hadoop/lib/slf4j-api-1.7.5.jar classpath info: excluding /usr/lib/hadoop/lib/slf4j-log4j12.jar classpath info: excluding /usr/lib/zookeeper/lib/slf4j-api-1.7.5.jar classpath info: excluding /usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar classpath info: excluding /usr/lib/zookeeper/lib/slf4j-log4j12.jar classpath + exec /usr/java/jdk1.7.0_67-cloudera/bin/java -xmx20m -dflume.root.logger=info,console -cp r/lib/flume-ng/../search/lib/xmlbeans-2.3.0.jar:/usr/lib/flume-ng/../search/lib/xmlenc-0.52.jar:/usr/lib/flume-ng/../search/lib/xmpcore-5.1.2.jar:/usr/lib/flume-ng/../search/lib/xz-1.0.jar:/usr/lib/flume-ng/../search/lib/zookeeper.jar' -djava.library.path=:/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native org.apache.flume.node.application -f /etc/flume-ng/conf/farrukh.conf -n twitteragent 2015-09-24 12:05:38,876 (lifecyclesupervisor-1-0) [info - org.apache.flume.node.pollingpropertiesfileconfigurationprovider.start(pollingpropertiesfileconfigurationprovider.java:61)] configuration provider starting 2015-09-24 12:05:38,885 (conf-file-poller-0) [info - org.apache.flume.node.pollingpropertiesfileconfigurationprovider$filewatcherrunnable.run(pollingpropertiesfileconfigurationprovider.java:133)] reloading configuration file:/etc/flume-ng/conf/farrukh.conf 2015-09-24 12:05:38,896 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1017)] processing:hdfs 2015-09-24 12:05:38,896 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1017)] processing:hdfs 2015-09-24 12:05:38,897 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1017)] processing:hdfs 2015-09-24 12:05:38,897 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:931)] added sinks: hdfs agent: twitteragent 2015-09-24 12:05:38,897 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1017)] processing:hdfs 2015-09-24 12:05:38,897 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1017)] processing:hdfs 2015-09-24 12:05:38,897 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1017)] processing:hdfs 2015-09-24 12:05:38,897 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1017)] processing:hdfs 2015-09-24 12:05:38,898 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration$agentconfiguration.addproperty(flumeconfiguration.java:1017)] processing:hdfs 2015-09-24 12:05:38,911 (conf-file-poller-0) [warn - org.apache.flume.conf.flumeconfiguration$agentconfiguration.validatesources(flumeconfiguration.java:508)] agent configuration 'twitteragent' has no sources. 2015-09-24 12:05:38,919 (conf-file-poller-0) [info - org.apache.flume.conf.flumeconfiguration.validateconfiguration(flumeconfiguration.java:141)] post-validation flume configuration contains configuration agents: [twitteragent] 2015-09-24 12:05:38,920 (conf-file-poller-0) [info - org.apache.flume.node.abstractconfigurationprovider.loadchannels(abstractconfigurationprovider.java:145)] creating channels 2015-09-24 12:05:38,939 (conf-file-poller-0) [info - org.apache.flume.channel.defaultchannelfactory.create(defaultchannelfactory.java:42)] creating instance of channel memchannel type memory 2015-09-24 12:05:38,957 (conf-file-poller-0) [info - org.apache.flume.node.abstractconfigurationprovider.loadchannels(abstractconfigurationprovider.java:200)] created channel memchannel 2015-09-24 12:05:38,963 (conf-file-poller-0) [info - org.apache.flume.sink.defaultsinkfactory.create(defaultsinkfactory.java:42)] creating instance of sink: hdfs, type: hdfs 2015-09-24 12:05:40,019 (conf-file-poller-0) [info - org.apache.flume.sink.hdfs.hdfseventsink.authenticate(hdfseventsink.java:559)] hadoop security enabled: false 2015-09-24 12:05:40,022 (conf-file-poller-0) [info - org.apache.flume.node.abstractconfigurationprovider.getconfiguration(abstractconfigurationprovider.java:114)] channel memchannel connected [hdfs] 2015-09-24 12:05:40,031 (conf-file-poller-0) [info - org.apache.flume.node.application.startallcomponents(application.java:138)] starting new configuration:{ sourcerunners:{} sinkrunners:{hdfs=sinkrunner: { policy:org.apache.flume.sink.defaultsinkprocessor@3c1cefaa countergroup:{ name:null counters:{} } }} channels:{memchannel=org.apache.flume.channel.memorychannel{name: memchannel}} } 2015-09-24 12:05:40,040 (conf-file-poller-0) [info - org.apache.flume.node.application.startallcomponents(application.java:145)] starting channel memchannel 2015-09-24 12:05:40,218 (lifecyclesupervisor-1-0) [info - org.apache.flume.instrumentation.monitoredcountergroup.register(monitoredcountergroup.java:120)] monitored counter group type: channel, name: memchannel: registered new mbean. 2015-09-24 12:05:40,218 (lifecyclesupervisor-1-0) [info - org.apache.flume.instrumentation.monitoredcountergroup.start(monitoredcountergroup.java:96)] component type: channel, name: memchannel started 2015-09-24 12:05:40,219 (conf-file-poller-0) [info - org.apache.flume.node.application.startallcomponents(application.java:173)] starting sink hdfs 2015-09-24 12:05:40,221 (lifecyclesupervisor-1-1) [info - org.apache.flume.instrumentation.monitoredcountergroup.register(monitoredcountergroup.java:120)] monitored counter group type: sink, name: hdfs: registered new mbean. 2015-09-24 12:05:40,221 (lifecyclesupervisor-1-1) [info - org.apache.flume.instrumentation.monitoredcountergroup.start(monitoredcountergroup.java:96)] component type: sink, name: hdfs started 

here detail of computer environment.

jdk

java version "1.7.0_67" java(tm) se runtime environment (build 1.7.0_67-b01) java hotspot(tm) 64-bit server vm (build 24.65-b04, mixed mode) 

os

centos release 6.4 (final) lsb_version=base-4.0-amd64:base-4.0-noarch:core-4.0-amd64:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-noarch cat: /etc/lsb-release.d: directory cpe:/o:centos:linux:6:ga 

flume-ng

flume 1.5.0-cdh5.3.0 source code repository: https://git-wip-us.apache.org/repos/asf/flume.git revision: cc2139f386f7fccc9a6e105e2026228af58c6e9f compiled jenkins on tue dec 16 20:25:18 pst 2014 source checksum 0b02653a07c9e96af03ce2189b8d51c3 

hadoop

hadoop 2.5.0-cdh5.3.0 subversion http://github.com/cloudera/hadoop -r f19097cda2536da1df41ff6713556c8f7284174d compiled jenkins on 2014-12-17t03:05z compiled protoc 2.5.0 source checksum 9c4267e6915cf5bbd4c6e08be54d54e0 command run using /usr/lib/hadoop/hadoop-common-2.5.0-cdh5.3.0.jar 

here output of hdfs report command.

configured capacity: 20506943488 (19.10 gb) present capacity: 20506943488 (19.10 gb) dfs remaining: 20057721155 (18.68 gb) dfs used: 449222333 (428.41 mb) dfs used%: 2.19% under replicated blocks: 0 blocks corrupt replicas: 0 missing blocks: 0 missing blocks (with replication factor 1): 0  ------------------------------------------------- live datanodes (1):  name: 127.0.0.1:50010 (uat.cloudera) hostname: uat.cloudera rack: /default decommission status : normal configured capacity: 20506943488 (19.10 gb) dfs used: 449222333 (428.41 mb) non dfs used: 0 (0 b) dfs remaining: 20057721155 (18.68 gb) dfs used%: 2.19% dfs remaining%: 97.81% configured cache capacity: 4294967296 (4 gb) cache used: 0 (0 b) cache remaining: 4294967296 (4 gb) cache used%: 0.00% cache remaining%: 100.00% xceivers: 6 last contact: thu sep 25 12:09:42 pdt 2015 

you missing ".sources" property of agent. how can flume-ng work without knowing source? missing following line.

twitteragent.sources = twitter 

see source, channel , sink relationship diagram. enter image description here

to see more detail, see following link: https://flume.apache.org/flumeuserguide.html

always remember there 3 things in flume configuration file.( sources, channels, sinks ). first 3 lines setting these 3 properties.

twitteragent.sources = twitter twitteragent.sinks = hdfs twitteragent.channels = memchannel 

rest of configuration file setting detailed properties of these 3 main things (sources, channels, sinks).

check below corrected configuration file contents.

twitteragent.sources = twitter twitteragent.channels = memchannel twitteragent.sinks = hdfs  #twitteragent.sources.twitter.type = com.cloudera.flume.source.twittersource twitteragent.sources.twitter.type = org.apache.flume.source.twitter.twittersource twitteragent.sources.twitter.channels = memchannel  twitteragent.sources.twitter.consumerkey = xxxxx twitteragent.sources.twitter.consumersecret = xxxxxx twitteragent.sources.twitter.accesstoken = xxxxx twitteragent.sources.twitter.accesstokensecret = xxxxx  twitteragent.sources.twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing  twitteragent.sinks.hdfs.channel = memchannel twitteragent.sinks.hdfs.type = hdfs twitteragent.sinks.hdfs.hdfs.path = hdfs://uat.cloudera:8020/user/root/flume/     twitteragent.sinks.hdfs.hdfs.filetype = datastream twitteragent.sinks.hdfs.hdfs.writeformat = text twitteragent.sinks.hdfs.hdfs.batchsize = 10 twitteragent.sinks.hdfs.hdfs.rollsize = 0 twitteragent.sinks.hdfs.hdfs.rollcount = 10  twitteragent.channels.memchannel.type = memory twitteragent.channels.memchannel.capacity = 10 twitteragent.channels.memchannel.transactioncapacity = 10 

other setting sources property, have changed below properties, can see results on hdfs temp files.

twitteragent.sinks.hdfs.hdfs.batchsize = 10 twitteragent.sinks.hdfs.hdfs.rollcount = 10 twitteragent.channels.memchannel.capacity = 10 twitteragent.channels.memchannel.transactioncapacity = 10 

copy contents , save in configuration file e.g sample.conf in /etc/flume-ng/conf/ folder , use below command.

flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/sample.conf -n twitteragent -dflume.root.logger=info,console 

after hdfs started status, should show processing message this.

2015-09-25 13:44:18,045 (lifecyclesupervisor-1-4) [info - org.apache.flume.source.twitter.twittersource.start(twittersource.java:139)] twitter source twitter started. 2015-09-25 13:44:18,045 (twitter stream consumer-1[initializing]) [info - twitter4j.internal.logging.slf4jlogger.info(slf4jlogger.java:83)] establishing connection. 2015-09-25 13:44:19,931 (twitter stream consumer-1[establishing connection]) [info - twitter4j.internal.logging.slf4jlogger.info(slf4jlogger.java:83)] connection established. 2015-09-25 13:44:19,931 (twitter stream consumer-1[establishing connection]) [info - twitter4j.internal.logging.slf4jlogger.info(slf4jlogger.java:83)] receiving status stream. 2015-09-25 13:44:20,283 (sinkrunner-pollingrunner-defaultsinkprocessor) [info - org.apache.flume.sink.hdfs.hdfsdatastream.configure(hdfsdatastream.java:58)] serializer = text, userawlocalfilesystem = false 2015-09-25 13:44:20,557 (sinkrunner-pollingrunner-defaultsinkprocessor) [info - org.apache.flume.sink.hdfs.bucketwriter.open(bucketwriter.java:261)] creating hdfs://uat.cloudera:8020/user/root/flume/flumedata.1443213860284.tmp 2015-09-25 13:44:22,435 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.onstatus(twittersource.java:178)] processed 100 docs 2015-09-25 13:44:25,383 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.onstatus(twittersource.java:178)] processed 200 docs 2015-09-25 13:44:28,178 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.onstatus(twittersource.java:178)] processed 300 docs 2015-09-25 13:44:30,505 (sinkrunner-pollingrunner-defaultsinkprocessor) [info - org.apache.flume.sink.hdfs.bucketwriter.close(bucketwriter.java:413)] closing hdfs://uat.cloudera:8020/user/root/flume/flumedata.1443213860284.tmp 2015-09-25 13:44:30,506 (hdfs-hdfs-call-runner-2) [info - org.apache.flume.sink.hdfs.bucketwriter$3.call(bucketwriter.java:339)] close tries incremented 2015-09-25 13:44:30,526 (hdfs-hdfs-call-runner-3) [info - org.apache.flume.sink.hdfs.bucketwriter$8.call(bucketwriter.java:673)] renaming hdfs://uat.cloudera:8020/user/root/flume/flumedata.1443213860284.tmp hdfs://uat.cloudera:8020/user/root/flume/flumedata.1443213860284 2015-09-25 13:44:30,607 (sinkrunner-pollingrunner-defaultsinkprocessor) [info - org.apache.flume.sink.hdfs.bucketwriter.open(bucketwriter.java:261)] creating hdfs://uat.cloudera:8020/user/root/flume/flumedata.1443213860285.tmp 2015-09-25 13:44:31,157 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.onstatus(twittersource.java:178)] processed 400 docs 2015-09-25 13:44:33,330 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.onstatus(twittersource.java:178)] processed 500 docs 2015-09-25 13:44:36,131 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.onstatus(twittersource.java:178)] processed 600 docs 2015-09-25 13:44:38,298 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.onstatus(twittersource.java:178)] processed 700 docs 2015-09-25 13:44:40,465 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.onstatus(twittersource.java:178)] processed 800 docs 2015-09-25 13:44:41,158 (sinkrunner-pollingrunner-defaultsinkprocessor) [info - org.apache.flume.sink.hdfs.bucketwriter.close(bucketwriter.java:413)] closing hdfs://uat.cloudera:8020/user/root/flume/flumedata.1443213860285.tmp 2015-09-25 13:44:41,158 (hdfs-hdfs-call-runner-6) [info - org.apache.flume.sink.hdfs.bucketwriter$3.call(bucketwriter.java:339)] close tries incremented 2015-09-25 13:44:41,166 (hdfs-hdfs-call-runner-7) [info - org.apache.flume.sink.hdfs.bucketwriter$8.call(bucketwriter.java:673)] renaming hdfs://uat.cloudera:8020/user/root/flume/flumedata.1443213860285.tmp hdfs://uat.cloudera:8020/user/root/flume/flumedata.1443213860285 2015-09-25 13:44:41,230 (sinkrunner-pollingrunner-defaultsinkprocessor) [info - org.apache.flume.sink.hdfs.bucketwriter.open(bucketwriter.java:261)] creating hdfs://uat.cloudera:8020/user/root/flume/flumedata.1443213860286.tmp 2015-09-25 13:44:43,238 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.onstatus(twittersource.java:178)] processed 900 docs 2015-09-25 13:44:46,118 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.onstatus(twittersource.java:178)] processed 1,000 docs 2015-09-25 13:44:46,118 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.logstats(twittersource.java:300)] total docs indexed: 1,000, total skipped docs: 0 2015-09-25 13:44:46,118 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.logstats(twittersource.java:302)]     35 docs/second 2015-09-25 13:44:46,118 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.logstats(twittersource.java:304)] run took 28 seconds , processed: 2015-09-25 13:44:46,118 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.logstats(twittersource.java:306)]     0.009 mb/sec sent index 2015-09-25 13:44:46,119 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.logstats(twittersource.java:308)]     0.255 mb text sent index 2015-09-25 13:44:46,119 (twitter4j async dispatcher[0]) [info - org.apache.flume.source.twitter.twittersource.logstats(twittersource.java:310)] there 0 exceptions ignored: ^c2015-09-25 13:44:46,666 (agent-shutdown-hook) [info - org.apache.flume.lifecycle.lifecyclesupervisor.stop(lifecyclesupervisor.java:79)] stopping lifecycle supervisor 10 2015-09-25 13:44:46,673 (agent-shutdown-hook) [info - org.apache.flume.source.twitter.twittersource.stop(twittersource.java:150)] twitter source twitter stopping... 

let me know if issue still resolved.


Comments