将 Informix 数据引入到 Spark 中,第 1 部分

收集数据

结合其他数据源来利用数据

简要背景

如今,有关 Spark 的文献有很多,似乎我需要多花一点时间来介绍这款叫做 Informix®的老产品。Informix 于 1980 年作为关系数据库系统 (RDS) 应运而生,后来很快成为了 UNIX 系统上的一个参考关系数据库管理系统 (RDBMS)。IBM 分两期(2001 年和 2005 年)收购了 Informix 公司,并向这个令人印象深刻的数据管理产品组合添加了同名的数据库。幸运的是,有人会说,得益于 Informix 高度忠诚且积极的用户群,Informix 在 IBM 产品组合中仍然非常活跃,给该产品带来了诸如 XML 和 JSON 支持、NoSQL 集成之类的大量创新,使其成为了第一个企业混合数据库。Informix 是一个面向事务性应用程序的优秀数据库,而且在沃尔玛、思科、家得宝、DHL 等公司中依然很活跃。您每次在全球最大的连锁超市或最喜爱的橙色主题家装商店购买商品时,交易都会记录在每个地方的 Informix 数据库中。该数据会定期返回到阿肯色州和乔治亚州进行整合。

尽管最近几年添加了内存中处理支持,但借助 Informix Warehouse Accelerator(或 IWA),世界正朝异构支持和面向数据湖的架构发展。这是 Apache Spark 的一个绝佳发展机会,您可能想知道“我如何将我的 Informix 数据库中的数据卸载到 Spark 中?”

在本教程中,您将学习如何从 Informix 收集数据。在本系列的第 2 部分中,您将学习如何添加其他数据源并分析该数据。

要跟随本教程,需要安装以下程序:

  • Spark 2.1.1
  • Informix 12.10.FC8
  • Java™ 1.8.0_60-b27
  • Informix JDBC 驱动程序 4.10.8.1
  • MacOS Sierra 10.12.5

备注:所有代码都在 GitHub 中。

将 customer 添加到我的数据帧中

图 1. 将 customer 添加到数据帧中

点击查看大图

在第一小节,您将连接到著名的样本数据库 stores_demo 的 customer 表。

语法非常简单,但有一点需要注意:请记得在连接中使用DELIMIDENT=Y,以确保正确构建了 SQL 查询。进一步了解 DELIMIDENT。

以下代码摘自 BasicCustomerLoader.java。

首先,连接到本地模式(或您的集群)来创建一个 Spark 会话。

  SparkSession spark = SparkSession
      .builder()
      .appName("Stores Customer")
      .master("local")
      .getOrCreate();

从该表中读取数据并存储在数据帧中。

  Dataset<Row> df = spark
    .read()
    .format("jdbc")
    .option(
      "url",
      "jdbc:informix-sqli://[::1]:33378/stores_demo:IFXHOST=lo_informix1210;DELIMIDENT=Y")
    .option("dbtable", "customer")
    .option("user", "informix")
    .option("password", "in4mix")
    .load();
  df.printSchema();
  df.show(5);

您将获取模式。

 |-- customer_num: long (nullable = false)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- company: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- phone: string (nullable = true)

然后获取数据(仅显示了前 5 行):

+------------+---------------+---------------+--------------------+-------...
|customer_num|          fname|          lname|             company|       ...
+------------+---------------+---------------+--------------------+-------...
|         101|Ludwig         |Pauli          |All Sports Supplies |213 Ers…
|         102|Carole         |Sadler         |Sports Spot         |785 Gea…
|         103|Philip         |Currie         |Phil's Sports       |654 Pop…
|         104|Anthony        |Higgins        |Play Ball!          |East Sh…
|         105|Raymond        |Vector         |Los Altos Sports    |1899 La…
+------------+---------------+---------------+--------------------+-------...

将整个数据库转储到 Spark 中

图 2. 将整个数据库转储到 Spark 中

点击查看大图

JDBC Metadata API 列出了所有表,然后您逐个将它们载入 Spark 中。仅加载表和视图,拒绝加载系统表、同义词、别名等。

但是,导入数据时,Spark 无法识别 Informix 中存在的一些不透明的数据类型。为了避免这种情况,需要为 Informix 创建一种特定的方言。在数据传入(和传出)和设置一些参数时,JDBC 方言对 Spark 很有帮助。

构建翻译方言

以下代码摘自 net.jgp.labs.informix2spark.utils 包中的 InformixJdbcDialect.java。

您需要来自 Spark 和 Scala 的一些包。

import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;

import scala.Option;

dialect 类继承自 JdbcDialect,但您不会覆盖所有方法。

public class InformixJdbcDialect extends JdbcDialect {

主方法是 canHandle,它基于 JDBC URL,确定这是否使用了正确的方言。在本例中,可以检查 URL 是否以 jdbc:informix-sqli 开头,这是一个表明我们使用的是否是 Informix 数据库的良好指标。

  @Override
  public boolean canHandle(String url) {
    return url.startsWith("jdbc:informix-sqli");
  }

第二个方法是 getCatalystType,它基于从 JDBC 驱动程序检索的数据类型,返回 Catalyst 能理解的数据类型。此列表包含 stores_demo中的所有数据类型。如果您的应用程序要使用更多数据类型,您需要在这里添加它们。

@Override
  public Option<DataType> getCatalystType(int sqlType,
      String typeName, int size, MetadataBuilder md) {
    if (typeName.toLowerCase().compareTo("calendar") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo(
        "calendarpattern") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo(
        "se_metadata") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo(
        "sysbldsqltext") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().startsWith("timeseries")) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo("st_point") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo("tspartitiondesc_t") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    return Option.empty();
  }

请注意,此方法返回一个 Option,后者来自 Scala。

获取表的列表

是时候检查所有表了。

以下代码摘自 net.jgp.labs.informix2spark.l100 包中的 DatabaseLoader.java。为便于阅读,我删除了异常处理。

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;

import net.jgp.labs.informix2spark.utils.Config;
import net.jgp.labs.informix2spark.utils.ConfigManager;
import net.jgp.labs.informix2spark.utils.InformixJdbcDialect;
import net.jgp.labs.informix2spark.utils.K;

public class DatabaseLoader {

  private List<String> getTables(Connection connection) {
    List<String> tables = new ArrayList<>();

获取连接的元数据。

    DatabaseMetaData md;
    md = connection.getMetaData();

从这里查询这些表。此语法返回所有表。

    ResultSet rs;
    rs = md.getTables(null, null, "%", null);

现在可以像浏览正常的结果集一样浏览元数据结果集。

 while (rs.next()) {

表名称位于第 3 列。

 String tableName = rs.getString(3);

表类型位于第 4 列。

      String tableType = rs.getString(4).toLowerCase();
      System.out.print("Table [" + tableName + "] ... ");

仅保留表和视图。其他类型包括系统表、全局临时表、本地临时表、别名和同义词。

      if (tableType.compareTo("table") == 0
          || tableType.compareTo("view") == 0) {
        tables.add(tableName);
        System.out.println("is in (" + tableType + ").");
      } else {
        System.out.println("is out (" + tableType + ").");
      }
    }

    return tables;
  }
}

使用 CPU 周期和内存

现在您需要的表的列表已经有了,那么就可以整合所有工作,并创建一个数据帧图。

以下代码摘自 net.jgp.labs.informix2spark.l100 包中的 DatabaseLoader.java。为便于阅读,我删除了异常处理和一些条件测试。

 private void start() {

在本地模式下连接到 Spark。

    SparkSession spark = SparkSession
        .builder()
        .appName("Stores Data")
        .master("local")
        .getOrCreate();

这个包含构建器的小配置对象简化了连接的管理。

    Config config = ConfigManager.getConfig(K.INFORMIX);
    Connection connection = config.getConnection();

获取所有表。

    List<String> tables = getTables(connection);
    if (tables.isEmpty()) {
      return;
    }

定义方言并在 Spark 中注册它。

    JdbcDialect dialect = new InformixJdbcDialect();
    JdbcDialects.registerDialect(dialect);

准备好数据帧图。该图按表名称和数据帧来建立索引。

 Map<String, Dataset<Row>> database = new HashMap<>();

检查所有表。

    for (String table : tables) {
      System.out.print("Loading table [" + table
          + "] ... ");

按照与之前相同的原则,每次仅处理一个不同的表。

    Dataset<Row> df = spark
      	.read()
  	    .format("jdbc")
  	    .option("url", config.getJdbcUrl())
  	    .option("dbtable", table)
  	    .option("user", config.getUser())
  	    .option("password", config.getPassword())
  	    .option("driver", config.getDriver())
  	    .load();
      database.put(table, df);
      System.out.println("done");
    }

挑选一个随机表(这里显示了 state),分析它,然后输出前 5 行。

    System.out.println("We have " + database.size()
        + " table(s) in our database");
    Dataset<Row> df = database.get("state");

    df.printSchema();
    System.out.println("Number of rows in state: " + df
        .count());
    df.show(5);
  }

执行程序来获得以下信息。

Table [sysaggregates] ... is out (system table).
Table [sysams] ... is out (system table).
…
Table [call_type] ... is in (table).
Table [catalog] ... is in (table).
Table [classes] ... is in (table).
Table [cust_calls] ... is in (table).
Table [customer] ... is in (table).
Table [customer_ts_data] ... is in (table).
Table [employee] ... is in (table).
Table [ext_customer] ... is in (table).
Table [items] ... is in (table).
Table [manufact] ... is in (table).
Table [orders] ... is in (table).
Table [se_metadatatable] ... is in (table).
Table [se_views] ... is in (table).
…
Loading table [customer] ... done
Loading table [customer_ts_data] ... done
Loading table [employee] ... done
Loading table [ext_customer] ... done
Loading table [items] ... done
Loading table [manufact] ... done
Loading table [orders] ... done
Loading table [se_metadatatable] ... done
Loading table [se_views] ... done
Loading table [state] ... done
Loading table [stock] ... done
…
We have 45 table(s) in our database

root
 |-- code: string (nullable = true)
 |-- sname: string (nullable = true)

仅显示“state”表的前 5 行:

+----+---------------+
|code|          sname|
+----+---------------+
|  AK|Alaska         |
|  HI|Hawaii         |
|  CA|California     |
|  OR|Oregon         |
|  WA|Washington     |
+----+---------------+

继续前行

您现在已准备好对此数据执行分析,但这是另一堂课的内容。

在本教程中,我们使用了标准 Java 代码,以及标准 JDBC 方法。此代码是为 Informix 而设计的。但是,也可以针对 DB2 等其他优秀数据库进行相应调整® - 只需花费几分钟。

感谢 Pradeep Natarajan,他现在就职于 HCL,但仍在研究 Informix,当我遇到奇怪问题时,他总是在我身边。

继续探索

是白的 我是一个勤奋的爬虫~~
{{uname}}

{{meta.replies}} 条回复
写下第一个评论!

-----------到底了-----------