首页 > 网络 > 云计算 > 正文
quartz+oozie+sqoop批量导数
2017-08-10 09:51:53       个评论    来源:u010810559的专栏  
收藏    我要投稿

完成工作通过quartz任务调度oozie,oozie调sqoop完成mysql数据到hive数据的导入

重点是workflow.xml先创建表,然后在导入mysql到hdfs,即完成导入,注意路径要一致

package com.dpi.data_model.core.hadoopUtil;

import com.dpi.data_model.core.entity.mybeans.SchedulingParameters;

import com.dpi.data_model.core.entity.mybeans.TaskInfo;

import com.dpi.data_model.core.job.OozieJob;

import com.dpi.data_model.core.util.DateTimeUtil;

import org.apache.commons.httpclient.util.DateUtil;

import org.quartz.*;

import org.quartz.impl.StdSchedulerFactory;

import java.util.ArrayList;

import java.util.Date;

import java.util.List;

/**

* Created by xutao on 2017/8/2.

*/

public class QuartzUtil {

private static Scheduler scheduler;

static{

try {

scheduler = StdSchedulerFactory.getDefaultScheduler();

} catch (SchedulerException e) {

e.printStackTrace();

}

}

public static void quartzTest(SchedulingParameters parameters) throws InterruptedException{

try {

//获取调度器

scheduler.start();

//1表示修改任务2表示添加任务

if(parameters.getOperate() ==1){

scheduler.deleteJob(new JobKey(parameters.getTaskName(),"oozieGroup"));

}
//创建任务器:定义任务细节

JobDetail myjob = JobBuilder.newJob(OozieJob.class).withIdentity(parameters.getTaskName(), "oozieGroup").build();

myjob.getJobDataMap().put("parameters",parameters);

ScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule().withIntervalInHours(24).withRepeatCount(1000);

//定义触发器

Trigger trigger=TriggerBuilder.newTrigger().withIdentity("simpleTrigger", "simpleTriggerGroup")

.withSchedule(scheduleBuilder).startAt(parameters.getStartTime()).endAt(parameters.getEndTime()).build();

//将任务和触发器注册到调度器中

scheduler.scheduleJob(myjob, trigger);

Thread.sleep(1000*30);

scheduler.shutdown();

} catch (SchedulerException e) {

e.printStackTrace();

}

}

public static void main(String[] args) throws InterruptedException {

SchedulingParameters parameters = new SchedulingParameters();

parameters.setTaskName("abc");

parameters.setStartTime(new Date());

parameters.setEndTime(new Date());

parameters.setTaskName("123");

parameters.setDb_user("root");

parameters.setPwd("dpibigdata");

parameters.setUrl("jdbc:mysql://192.168.1.233:3306/dpimodel?useUnicode=true&characterEncoding=UTF-8");

List taskInfoList = new ArrayList();

TaskInfo taskInfo = new TaskInfo();

taskInfo.setColumns("id string,mode_name string,technology_name string,mode_describe string,mode_arrangement string,status string,c_user_id string,c_user string,c_time string,u_time string");

taskInfo.setWare_name("tbl_mode");

taskInfo.setHive_tb_name("h_tbl_mode");

taskInfoList.add(taskInfo);

parameters.setTaskInfoList(taskInfoList);

new QuartzUtil().quartzTest(parameters);

System.out.println("==================");

}

}

package com.dpi.data_model.core.job; import com.dpi.data_model.core.entity.mybeans.SchedulingParameters; import com.dpi.data_model.core.entity.mybeans.TaskInfo; import com.dpi.data_model.core.hadoopUtil.WorkflowClient; import org.apache.oozie.client.OozieClientException; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; /** * Created by xutao on 2017/8/2. */ public class OozieJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { // 执行定时任务 WorkflowClient client = new WorkflowClient(); String jobId = null; try { JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap(); SchedulingParameters parameters = (SchedulingParameters)dataMap.get("parameters"); //1.执行数据清理任务 2. 执行数据导入任务 for(int i=0;ipackage com.dpi.data_model.core.hadoopUtil;

import com.dpi.data_model.core.entity.mybeans.SchedulingParameters;

import com.dpi.data_model.core.entity.mybeans.TaskInfo;

import com.dpi.oozie.etl.JDBCUtil;

import org.apache.log4j.Logger;

import org.apache.oozie.client.OozieClient;

import org.apache.oozie.client.OozieClientException;

import org.apache.oozie.client.WorkflowJob;

import org.apache.oozie.client.WorkflowJob.Status;

import java.util.Properties;

public class WorkflowClient {

private static final Logger logger = Logger.getLogger(WorkflowClient.class);

// oozie,hadoop

private static String USER_NAME = "hadoop";

private static String OOZIE_URL = "http://192.168.1.237:11000/oozie/";

private static String JOB_TRACKER = "rm1";

private static String NAMENAME = "hdfs://192.168.1.235:9000/";

private static String QUEUENAME = "default";

private static String OOZIE_LIBPATH = "hdfs://192.168.1.235:9000/share/lib_oozie/share/lib/hive,hdfs://192.168.1.235:9000/share/lib_oozie/share/lib/sqoop";

private static String jdbcUrl="jdbc:mysql://192.168.1.235:3306/dbview?useUnicode=true&characterEncoding=UTF-8";

private static String jdbcUser="hive";

private static String jdbcPassword="hive";

private static String DRIVER = "com.mysql.jdbc.Driver";

private static Properties conf = null;

private static OozieClient wc = null;

private static JDBCUtil jdbc = null;

private static WorkflowClient client = null;

private static String ENV = "TEST";

public WorkflowClient() {

wc = new OozieClient(OOZIE_URL);

conf = wc.createConfiguration();

conf.setProperty(OozieClient.USER_NAME, USER_NAME);

// setting workflow parameters

conf.setProperty(OozieClient.CHANGE_VALUE_CONCURRENCY, "2");

conf.setProperty("nameNode", NAMENAME);

conf.setProperty("jobTracker", JOB_TRACKER);

conf.setProperty("queueName", QUEUENAME);

conf.setProperty("oozie.libpath", OOZIE_LIBPATH);

conf.setProperty("env", ENV);

}

public void jobWait(String jobId,WorkflowClient client){

Status status = null;

try {

status = client.getJobStatus(jobId);

if (status == Status.RUNNING) {

while (true) {

try {

Thread.sleep(30*1000);

} catch (Exception e) {

e.printStackTrace();

}

status = client.getJobStatus(jobId);

if (status == Status.SUCCEEDED

|| status == Status.FAILED

|| status == Status.KILLED) {

break;

}

}

logger.info("Workflow job running .");

} else {

logger.info("Problem starting Workflow job .");

}

} catch (OozieClientException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

public String startHISJob(SchedulingParameters parameters, TaskInfo taskInfo) throws OozieClientException {

conf.setProperty(OozieClient.APP_PATH, "hdfs://192.168.1.235:9000/user/workflows/oozie-table04/workflow.xml");

conf.setProperty("datareportRoot", "user/workflows");

conf.setProperty("jdbcUrl",parameters.getUrl());

conf.setProperty("jdbcUser",parameters.getDb_user());

conf.setProperty("jdbcPassword",parameters.getPwd());

conf.setProperty("table",taskInfo.getWare_name());

conf.setProperty("htable",taskInfo.getHive_tb_name());

// conf.setProperty("basis_one",taskInfo.getBasis_one());

// conf.setProperty("basis_two",taskInfo.getBasis_two());

// conf.setProperty("basis_three",taskInfo.getBasis_three());

// conf.setProperty("exportPath","/hive/warehouse/dpimodel/thm/"+taskInfo.getWare_name());

conf.setProperty("columns",taskInfo.getColumns());

return wc.run(conf);

}

// job 状态

public Status getJobStatus(String jobID) throws OozieClientException {

WorkflowJob job = wc.getJobInfo(jobID);

return job.getStatus();

}

public static void main(String[] args) throws OozieClientException,

InterruptedException {

// Create client

client = new WorkflowClient();

// String jobId = client.startHISJob();

Thread.sleep(30*1000);

// client.jobWait(jobId);

}

}

workflow.xml

${jobTracker}

${nameNode}

${nameNode}/user/oozie/conf/hive-site.xml

mapred.job.queue.name

${queueName}

htable=${htable}columns=${columns}

${jobTracker}

${nameNode}

mapred.job.queue.name

${queueName}

import --connect ${jdbcUrl} --username ${jdbcUser} --password ${jdbcPassword} --table ${table} --append --target-dir /hive/warehouse/zcyj_db.db/thm/tmp_thm_13/${htable} -m 1

xutaotest workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]

script.hql

use default; drop table IF EXISTS ${htable}; create external table ${htable}( ${columns}) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\`' LOCATION '/hive/warehouse/zcyj_db.db/thm/tmp_thm_13/${htable}';

 













						
点击复制链接 与好友分享!回本站首页
上一篇:Hadoop(4)-MapReduce原理
下一篇:ceph-deploy:Couldnotfindkeyringfile:/var/lib/ceph/mon/ceph-ceph01/keyring
相关文章
图文推荐
文章
推荐
点击排行

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训
版权所有: 红黑联盟--致力于做实用的IT技术学习网站