频道栏目
首页 > 资讯 > 云计算 > 正文

Sqoopjava接口将MySQL数据导入导出HDFS及BUG

17-12-18        来源:[db:作者]  
收藏   我要投稿

先是试了一下sqoop2的接口,不知道为什么总是报错,搜了半天没找到解决办法于是又用回了 Sqoop 1.4.6 版本,也有点小bug,后面再说,记录一下。

Sqoop 2 Demo: HDFS 是远程集群上,MySQL 是本地,没有成功,可能是环境问题

package com.kay.transfer;
import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.*;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;

import java.util.Collection;
import java.util.UUID;

/**
 * Created by kay on 2017/12/12.
 */

public class SqoopTest {

    public static void sqoopTransfer() {
        //初始化
        String url = "http://192.168.1.200:12000/sqoop/";
        SqoopClient client = new SqoopClient(url);

        Collection arr= client.getConnectors();
        for (MConnector m:arr) {
            System.out.println(m.getLinkConfig());
        }

        //创建一个源链接 JDBC   为链接创建一个占位符
        long fromConnectorId = 2;
        MLink fromLink = client.createLink("generic-jdbc-connector");
        fromLink.setName("jdbc-link" + UUID.randomUUID().toString().substring(0, 10));
        fromLink.setCreationUser("arcgis1009");

        //填入连接配置的值
        MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig();
        fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://192.168.1.28:3306/mydb");
        fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
        fromLinkConfig.getStringInput("linkConfig.username").setValue("root");
        fromLinkConfig.getStringInput("linkConfig.password").setValue("lk123456");
       // fromLinkConfig.getStringInput("dialect.identifierEnclose").setValue("`");

        //保存填充过的连接对象
        Status fromStatus = client.saveLink(fromLink);
        if(fromStatus.canProceed()) {
            System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId());
        } else {
            System.out.println("创建JDBC Link失败");
        }

        //创建一个目的地链接HDFS
        long toConnectorId = 3;
        MLink toLink = client.createLink("hdfs-connector");
        toLink.setName("hdfs-link" + UUID.randomUUID().toString().substring(0, 10));
        toLink.setCreationUser("arcgis1009");
        MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
        toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://192.168.1.200:9000/");
        Status toStatus = client.saveLink(toLink);
        if(toStatus.canProceed()) {
            System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId());
        } else {
            System.out.println("创建HDFS Link失败");
        }

        //创建一个任务
        long fromLinkId = fromLink.getPersistenceId();
        System.out.println("fromLinkId: "+fromLinkId);
        long toLinkId = toLink.getPersistenceId();
        System.out.println("toLinkId: "+toLinkId);

        MJob job = client.createJob(fromLinkId, toLinkId);
        job.setName("kay-job" + UUID.randomUUID());
        job.setCreationUser("arcgis1009");

        //设置源链接任务配置信息 from
        MFromConfig fromJobConfig = job.getFromJobConfig();
        fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("mydb");
        fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("user");
        fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");


        //to
        MToConfig toJobConfig = job.getToJobConfig();
        toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/user/tmp"+ UUID.randomUUID());
        toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");
        toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE");
       // toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(true);

        //设置驱动配置-------如果是mapreduce,就是mapper的数量
        MDriverConfig driverConfig = job.getDriverConfig();
        driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);

        // 保存填充过的连接对象
        Status status = client.saveJob(job);
        if(status.canProceed()) {
            System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
        } else {
            System.out.println("JOB创建失败。");
        }


        //启动任务
        long jobId = job.getPersistenceId();
        System.out.println(jobId);


        MSubmission submission = client.startJob(jobId);
        System.out.println("JOB提交状态为 : " + submission.getStatus());
        while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
            System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
            //三秒报告一次进度
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("JOB执行结束... ...");
        System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
        Counters counters = submission.getCounters();
        if(counters != null) {
            System.out.println("计数器:");
            for(CounterGroup group : counters) {
                System.out.print("\t");
                System.out.println(group.getName());
                for(Counter counter : group) {
                    System.out.print("\t\t");
                    System.out.print(counter.getName());
                    System.out.print(": ");
                    System.out.println(counter.getValue());
                }
            }
        }
        if(submission.getExceptionInfo() != null) {
            System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
        }
        System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕");
    }

    public static void main(String[] args) throws Exception {
        sqoopTransfer();
    }
}

Sqoop 1.4.6 Demo: HDFS 远程,MySQL本地,测试成功
注意"--bindir","./src/main/resources", 是将数据库表生成的映射对象文件 放到的目录,之前没有这句命令,程序报找不到 user 类,查找之下被生成在了项目根路径下面,后来在stackoverflow上看到要加上这句shell指令,但是还有一个小问题是,第一次执行还是会报 找不到 user class,原因是第一次还没生成,第二次运行目录下已存在user.class,user.java,user.jar 三个文件 ,则不会报错,也就是说要先生成这些映射文件。 不知道别人是怎么做,如果要动态生成加导入怎么解决的?

package com.kay.transfer;

import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.tool.SqoopTool;
import org.apache.sqoop.util.OptionsFileUtil;
/**
 * Created by kay on 2017/12/12.
 */
public class Test {
    private static int importDataFromMysql() throws Exception {
        String[] args = new String[] {
                "--bindir","./src/main/resources",
                "--connect","jdbc:mysql://localhost:3306/mydb",
                "--driver","com.mysql.jdbc.Driver",
                "-username","root",
                "-password","root",
                "--table","user",
                "-m","1",
                "--target-dir","java_import_user9"
        };

        String[] expandArguments = OptionsFileUtil.expandArguments(args);
        SqoopTool tool = SqoopTool.getTool("import");

        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.1.200:9000");//设置HDFS服务地址
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);

        @SuppressWarnings("deprecation")
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);
        return Sqoop.runSqoop(sqoop, expandArguments);
    }

    public static void main(String[] args) throws Exception {
        importDataFromMysql();
    }
}
相关TAG标签
上一篇:LNMP应用之搭建个人博客详情
下一篇:云计算技术之数据结构笔记
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站