MapReduce
Map:
import sysfor line in sys.stdin: words = line.strip().split(' ')for word in words: print "\t".join([word.strip(),'1'])
Reduce:
import syssum,tmp = 0,Nonefor line in sys.stdin: words = line.strip().split('\t') if len(words)!=2: continue word,cnt = words if not tmp: tmp = word if tmp!=word: print '\t'.join([tmp,str(sum)]) tmp = word sum = 0 sum += int(cnt)print '\t'.join([tmp, str(sum)])
hadoopStream提交:
HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"INPUT_FILE_PATH="/The_Man_of_Property.txt"OUTPUT_PATH="/wordCount/mapreduce"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH$HADOOP_CMD jar $STREAM_JAR_PATH \ -input $INPUT_FILE_PATH \ -output $OUTPUT_PATH \ -mapper "python map.py" \ -reducer "python reduce.py" \ -file map.py \ -file reduce.py
验证:
[root@master wordCount]# hadoop fs -text /wordCount/mapreduce/part-00000 | head(Baynes 1(Dartie 1(Dartie’s 1(Down-by-the-starn) 2(Down-by-the-starn), 1(He 1(I 1(James) 1(L500) 1(Louisa 1text: Unable to write to output stream.
Spark
pyspark代码:
from __future__ import print_function import sysfrom operator import addfrom pyspark import SparkContext,SparkConf reload(sys)sys.setdefaultencoding('utf-8')def f(x): return x.strip().split(' ')if __name__ == "__main__": conf = SparkConf().setMaster("local").setAppName("spark_wordcount") sc = SparkContext(conf = conf) in_file = sc.textFile("/The_Man_of_Property.txt") # resultRdd = in_file.flatMap(f).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b) # resultColl = resultRdd.collect() # for line in resultColl: # print(line) resultRdd = in_file\ .flatMap(f)\ .map(lambda word:(word,1))\ .reduceByKey(lambda a,b:a+b)\ .map(lambda line:'\t'.join([line[0],str(line[1])]))\ .saveAsTextFile("/wordCount/spark") sc.stop()
验证:
[root@master wordCount]# hadoop fs -text /wordCount/spark/part-00000 | head—‘the 1writings 1considered, 1shouted, 1yellow 7‘Humbug!’ 1four 30aegis 1considered; 1text: Unable to write to output stream.
Hive
导入文章
create table wordcount_docs(line string);
LOAD DATA INPATH '/The_Man_of_Property.txt' OVERWRITE INTO TABLE wordcount_docs;第一种方法:
create table words(word string);
insert into table words select explode(split(line, " ")) as word from wordcount_docs;
拆分单词 : hive的内置表生成函数(UDTF):explode => 将单列扩展成多行
- explode(ARRAY) 列表中的每个元素生成一行
- explode(MAP) map中每个key-value对,生成一行,key为一列,value为一列
验证:
hive> select * from words limit 2;OKPreface“TheTime taken: 0.051 seconds, Fetched: 2 row(s)
split => hive字符串分割函数
split(str, regex) - Splits str around occurances that match regex Time taken: 0.769 seconds, Fetched: 1 row(s) 返回值为一个数组验证wordcount:
hive> select word, count(*) as cnt from words group by word order by cnt desc limit 2;Total MapReduce CPU Time Spent: 9 seconds 470 msecOKthe 5144of 3407Time taken: 73.257 seconds, Fetched: 10 row(s)
第二种方法(transform):
create table wordCount(word string,cnt int) row format delimited fields terminated by '\t';
hive> desc wordCount;OKword string cnt int Time taken: 0.078 seconds, Fetched: 2 row(s)
add file /home/master/test/wordCount/hive/map.py;
验证map
select transform(line) using "python map.py" as word,cnt from wordcount_docs limit 10;
add file /home/master/test/wordCount/hive/reduce.py;
map -> reduce 中间需要partition
验证reduce
select transform (M.word , M.cnt) using "python reduce.py" as w,c from (select transform(line) using "python map.py" as word,cnt from wordcount_docs cluster by word) M limit 10;
clusterby 默认升序
hive> insert overwrite table wordCount > select transform (M.word , M.cnt) using "python reduce.py" as w,c > from > (select transform(line) using "python map.py" as word,cnt > from wordcount_docs cluster by word) M ;
验证:
hive> select * from wordCount order by cnt desc limit 3;
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 4.6 sec HDFS Read: 187579 HDFS Write: 84 SUCCESSTotal MapReduce CPU Time Spent: 4 seconds 600 msecOKthe 5144of 3407to 2782Time taken: 32.815 seconds, Fetched: 10 row(s)