圖片 19.1 pic


    流量比較大的日誌要是直接寫入 hadoop 對 namenode 負載過大,所以入庫前合並,可以把各個節點的日誌湊並成一個文件寫入 hdfs。 根據情況定期合成,寫入到 hdfs 裏麵。


    咱們看看日誌的大小,200 g 的 dns 日誌文件,我壓縮到了 18 g,要是用 awk perl 當然也可以,但是處理速度肯定沒有分布式那樣的給力。


    圖片 19.2 pic


    hadoop streaming 原理


    mapper 和 reducer 會從標準輸入中讀取用戶數據,一行一行處理後發送給標準輸出。streaming 工具會創建 mapreduce 作業,發送給各個 tasktracker,同時監控整個作業的執行過程。


    任何語言,隻要是方便接收標準輸入輸出就可以做 mapreduce~


    再搞之前我們先簡單測試下 shell 模擬 mapreduce 的性能速度~


    圖片 19.3 pic


    看下他的結果,350 m 的文件用時 35 秒左右。


    圖片 19.4 pic


    這是 2 g 的日誌文件,居然用了 3 分鍾。 當然和我寫的腳本也有問題,我們是模擬 mapreduce 的方式,而不是調用 shell 下牛逼的 awk,gawk 處理。


    圖片 19.5 pic


    awk 的速度!果然很霸道,處理日誌的時候,我也很喜歡用 awk,隻是學習的難度有點大,不像別的 shell 組件那麽靈活簡單。


    圖片 19.6 pic


    這是官方的提供的兩個 demo ~


    map.py


    #!/usr/bin/env python"""a more advanced mapper, using python iterators and generators."""import sysdef read_input(file):    for line in file:# split the line into wordsyield line.splitdef main(separator=\''t\''):    # inputes from stdin (standard input)    data = read_input(sys.stdin)    for words in data:# write the results to stdout (standard output);# what we output here will be the input for the# reduce step, i.e. the input for reducer.py## tab-delimited; the trivial word count is 1for word in words:    print \''%s%s%d\'' % (word, separator, 1)if __name__ == "__main__":    main  </pre>


    reduce.py 的修改方式


    #!/usr/bin/env python"""a more advanced reducer, using python iterators and generators."""from itertools import groupbyfrom operator import itemgetterimport sysdef read_mapper_output(file, separator=\''t\''):    for line in file:yield line.rstrip.split(separator, 1)def main(separator=\''t\''):    # inputes from stdin (standard input)    data = read_mapper_output(sys.stdin, separator=separator)    # groupby groups multiple word-count pairs by word,    # and creates an iterator that returns consecutive keys and their group:    #   current_word - string containing a word (the key)    #   group - iterator yielding all ["<current_word>", "<count>"] items    for current_word, group in groupby(data, itemgetter(0)):try:    total_count = sum(int(count) for current_word, count in group)    print "%s%s%d" % (current_word, separator, total_count)except valueerror:    # count was not a number, so silently discard this item    passif __name__ == "__main__":    main  </pre>


    咱們再簡單點:


    #!/usr/bin/env pythonimport sysfor line in sys.stdin:    line = line.strip    words = line.split    for word in words:print \''%st%s\'' % (word, 1)  </pre>


    #!/usr/bin/env pythonfrom operator import itemgetterimport syscurrent_word = nonecurrent_count = 0word = nonefor line in sys.stdin:    line = line.strip    word, count = line.split(\''t\'', 1)    try:count = int(count)    except valueerror:continue    if current_word == word:current_count += count    else:if current_word:    print \''%st%s\'' % (current_word, current_count)current_count = countcurrent_word = wordif current_word == word:    print \''%st%s\'' % (current_word, current_count)  </pre>


    咱們就簡單模擬下數據,跑個測試


    圖片 19.7 pic


    剩下就沒啥了,在 hadoop 集群環境下,運行 hadoop 的 steaming.jar 組件,加入 mapreduce 的腳本,指定輸出就行了.  下麵的例子我用的是 shell 的成分。


    [root@101 cron]#$hadoop_home/bin/hadoop  jar $hadoop_home/contrib/streaming/hadoop-*-streaming.jar -input myinputdirs -output myoutputdir -mapper cat -reducer wc  </pre>


    詳細的參數,對於咱們來說提供性能可以把 tasks 的任務數增加下,根據情況自己測試下,也別太高了,增加負擔。


    (1)-input:輸入文件路徑


    (2)-output:輸出文件路徑


    (3)-mapper:用戶自己寫的 mapper 程序,可以是可執行文件或者腳本


    (4)-reducer:用戶自己寫的 reducer 程序,可以是可執行文件或者腳本


    (5)-file:打包文件到提交的作業中,可以是 mapper 或者 reducer 要用的輸入文件,如配置文件,字典等。


    (6)-partitioner:用戶自定義的 partitioner 程序


    (7)biner:用戶自定義的biner 程序(必須用 java 實現)


    (8)-d:作業的一些屬性(以前用的是-jonconf),具體有:


    1)mapred.map.tasks:map task 數目


    2)mapred.reduce.tasks:reduce task 數目


    3)stream.map.input.field.separator/stream.map.output.field.separator: map task 輸入/輸出數據的分隔符,默認均為 t。


    4)stream.num.map.output.key.fields:指定 map task 輸出記錄中 key 所占的域數目


    5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task 輸入/輸出數據的分隔符,默認均為 t。


    6)stream.num.reduce.output.key.fields:指定 reduce task 輸出記錄中 key 所占的域數目


    這裏是統計 dns 的日誌文件有多少行 ~


    圖片 19.8 pic


    在 mapreduce 作為參數的時候,不能用太多太複雜的 shell 語言,他不懂的~


    可以寫成 shell 文件的模式;


    #! /bin/bashwhile read line; do #  for word in $line #  do #    echo "$word 1"awk \''{print $5}\''         donedone  </pre>


    #! /bin/bashcount=0started=0word=""while read line;do  goodk=`echo $line | cut -d \'' \''  -f 1`  if [ "x" == x"$goodk" ];then     continue  fi  if [ "$word" != "$goodk" ];then    [ $started -ne 0 ] && echo -e "$wordt$count"    word=$goodk    count=1    started=1  else    count=$(( $count + 1 ))  fidone  </pre>


    有時候會出現這樣的問題,好好看看自己寫的 mapreduce 程序 ~


    13/12/14 13:26:52 info streaming.streamjob: tracking url: http://101.rui:50030/jobdetails.jsp?jobid=job_201312131904_003013/12/14 13:26:53 info streaming.streamjob:  map 0%  reduce 0%13/12/14 13:27:16 info streaming.streamjob:  map 100%  reduce 100%13/12/14 13:27:16 info streaming.streamjob: to kill this job, run:13/12/14 13:27:16 info streaming.streamjob: /usr/local/hadoop/libexec/../bin/hadoop job  -dmapred.job.tracker=localhost:9001 -kill job_201312131904_003013/12/14 13:27:16 info streaming.streamjob: tracking url: http://101.rui:50030/jobdetails.jsp?jobid=job_201312131904_003013/12/14 13:27:16 error streaming.streamjob: job not sessful. error: # of failed map tasks exceeded allowed limit. failedcount: 1.stfailedtask: task_201312131904_0030_m_00000013/12/14 13:27:16 info streaming.streamjob: killjob...streamingmand failed!  </pre>


    python 做為 mapreduce 執行成功後,結果和日誌一般是放在你指定的目錄下的,結果是在 part-00000 文件裏麵~


    圖片 19.9 pic


    下麵咱們談下,如何入庫和後台的執行


    本文出自 “峰雲,就她了。” 博客,謝絕轉載!

章節目錄

閱讀記錄

Python實戰-從菜鳥到大牛的進階之路所有內容均來自互聯網,uu小說網隻為原作者極客學院的小說進行宣傳。歡迎各位書友支持極客學院並收藏Python實戰-從菜鳥到大牛的進階之路最新章節