博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hive自定义UDAF
阅读量:4505 次
发布时间:2019-06-08

本文共 6433 字,大约阅读时间需要 21 分钟。

package com.rtmap.udfs;import org.apache.commons.lang3.StringUtils;import org.apache.hadoop.hive.ql.exec.UDAF;import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.*;/** * Created by doge on 2015/11/17. */public class StayTime extends UDAF {    private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");    private static final Integer TRANSITION_TIME = 10;// 10s    private static final Integer HALF_HOUR = 60 * 30;    private static final String FIELD_DELIMITER = "_";    private static final String DELIMITER_DUMMY = "-";    private static final String ROW_DELIMITER = "\t";    private static final String NO_STAY_TIME = "0";    /**     * UDAF worker     */    public static class StayTimeUDAFEvaluator implements UDAFEvaluator {        private ArrayList
source; public StayTimeUDAFEvaluator() { super(); init(); } public void init() { source = new ArrayList<>(); } /** * iterate every row * * @param objects * @return */ /*public boolean iterate(String floor, String store, String timestamp) {
*/ public boolean iterate(Object[] objects) { if (objects == null || objects.length == 0) { return false; } String floor = objects[0].toString().trim(); String store = objects[1].toString().trim().replace(FIELD_DELIMITER, DELIMITER_DUMMY); String timestamp = objects[2].toString(); try { timestamp = SIMPLE_DATE_FORMAT.format(SIMPLE_DATE_FORMAT.parse(timestamp)); } catch (ParseException e) { e.printStackTrace(); } String tuple = floor + FIELD_DELIMITER + store + FIELD_DELIMITER + timestamp; source.add(tuple); return true; } public ArrayList
terminatePartial() { return source; } public boolean merge(ArrayList
portion) { if (portion == null) { return false; } source.addAll(portion); return true; } public String terminate() { ArrayList
buffer = new ArrayList<>(); for (String string : source) { String[] tuple = string.split(FIELD_DELIMITER); buffer.add(tuple); } if (buffer.size() == 1) {
// occur one time String[] single = buffer.get(0); String floor = single[0]; String store = single[1]; String bubble = single[2]; return floor + FIELD_DELIMITER + store + FIELD_DELIMITER + NO_STAY_TIME + FIELD_DELIMITER + bubble + FIELD_DELIMITER + bubble;// ? } Collections.sort(buffer, new Comparator
() { @Override public int compare(String[] o1, String[] o2) { return o1[2].compareTo(o2[2]); } }); String regionMark = null; String regionTime = null; String lastTime = null; List
results = new ArrayList<>(); for (int i = 0, j = buffer.size(); i < j; i++) { String[] strings = buffer.get(i); String key = strings[0] + FIELD_DELIMITER + strings[1];// floor + name String timestamp = strings[2]; if (i == 0 && regionMark == null && regionTime == null) { // first time regionMark = key;// region mark regionTime = timestamp; lastTime = timestamp;// last time continue; } int normalDiff = dateDiff(timestamp, regionTime); String suffix = FIELD_DELIMITER + regionTime + FIELD_DELIMITER + timestamp; String stayTime = normalDiff + suffix; if (i == j - 1) { // last one if (key.equals(regionMark)) { if (normalDiff <= HALF_HOUR) { results.add(regionMark + FIELD_DELIMITER + stayTime); } else { stayTime = dateDiff(lastTime, regionTime) + StringUtils.EMPTY; stayTime += FIELD_DELIMITER + regionTime + FIELD_DELIMITER + lastTime; results.add(regionMark + FIELD_DELIMITER + stayTime); String lastStayTime = NO_STAY_TIME + FIELD_DELIMITER + timestamp + FIELD_DELIMITER + timestamp; results.add(key + FIELD_DELIMITER + lastStayTime); } } else { if (normalDiff <= TRANSITION_TIME) { results.add(regionMark + FIELD_DELIMITER + stayTime); } else if (normalDiff > TRANSITION_TIME) { if (normalDiff > HALF_HOUR) { stayTime = dateDiff(lastTime, regionTime) + StringUtils.EMPTY; stayTime += FIELD_DELIMITER + regionTime + FIELD_DELIMITER + lastTime; } results.add(regionMark + FIELD_DELIMITER + stayTime); String lastStayTime = NO_STAY_TIME + FIELD_DELIMITER + timestamp + FIELD_DELIMITER + timestamp; results.add(key + FIELD_DELIMITER + lastStayTime); } } break; } if (key.equals(regionMark)) { if (normalDiff > HALF_HOUR) { stayTime = dateDiff(lastTime, regionTime) + StringUtils.EMPTY; stayTime += FIELD_DELIMITER + regionTime + FIELD_DELIMITER + lastTime; results.add(regionMark + FIELD_DELIMITER + stayTime); regionTime = timestamp; } lastTime = timestamp; continue; } if (normalDiff <= TRANSITION_TIME) { // middle times, key not equals region mark // less than 10 seconds, do nothing } else if (normalDiff > TRANSITION_TIME) { if (normalDiff > HALF_HOUR) { // if (normalDiff > HALF_HOUR && !lastTime.equals(regionTime)) { stayTime = dateDiff(lastTime, regionTime) + StringUtils.EMPTY; stayTime += FIELD_DELIMITER + regionTime + FIELD_DELIMITER + lastTime; } results.add(regionMark + FIELD_DELIMITER + stayTime); regionMark = key; regionTime = timestamp; } lastTime = timestamp;// last time } String result = StringUtils.EMPTY; for (String string : results) { if (result.length() > 0) { result += ROW_DELIMITER; } result += string; } return result.length() == 0 ? null : result; } } /** * calc the seconds between two timestamp * * @param left * @param right * @return */ private static Integer dateDiff(String left, String right) { try { Date bigger = SIMPLE_DATE_FORMAT.parse(left); Date smaller = SIMPLE_DATE_FORMAT.parse(right); long transition = (bigger.getTime() - smaller.getTime()) / 1000;// return seconds return Integer.valueOf(transition + StringUtils.EMPTY); } catch (ParseException e) { e.printStackTrace(); } return 0; }}

 

转载于:https://www.cnblogs.com/mengyao/archive/2013/03/13/5667456.html

你可能感兴趣的文章
EasyDarwin云存储方案调研:海康萤石云采用的是MPEG-PS打包的方式进行的存储
查看>>
MySQL巡检
查看>>
学习笔记之传说中的圣杯布局
查看>>
oh-my-zsh的使用
查看>>
共享内存的设计
查看>>
deque容器
查看>>
2017-2018-1 20155203 20155204 实验二 固件程序设计
查看>>
数据可视化视频制作
查看>>
mysql 数据备份。pymysql模块
查看>>
FactoryMethod模式——设计模式学习
查看>>
Android中 AsyncTask
查看>>
原码、反码、补码和移码
查看>>
SQL存储过程与函数的区别
查看>>
vue项目配置使用flow类型检查
查看>>
@Resource和@Autowired区别
查看>>
VS2010打开就自动关闭问题解决
查看>>
python webdriver 测试框架-数据驱动txt文件驱动,带报告的例子
查看>>
动态代理相对于静态代理的优势
查看>>
持续部署之jenkins与gitlab(三)
查看>>
第二章 Jenkins安装与配置
查看>>