Connection to Oracle From Spark

For some silly reason there is a has been a fair amount of difficulty in reading and writing to Oracle from Spark when using DataFrames.

SPARK-10648 — Spark-SQL JDBC fails to set a default precision and scale when they are not defined in an oracle schema.

This issue manifests itself when you have numbers in your schema with no precision or scale and attempt to read. They added the following dialect here

/**
 * :: DeveloperApi ::
 * Default Oracle dialect, mapping a nonspecific numeric type to a general decimal type.
 */
@DeveloperApi
case object OracleDialect extends JdbcDialect {
  override def canHandle(url: String): Boolean = 
url.startsWith("jdbc:oracle")
  override def getCatalystType(
      sqlType: Int, typeName: String, size: Int,
 md: MetadataBuilder): Option[DataType] = {
    // Handle NUMBER fields that have no precision/scale in special way
    // because JDBC ResultSetMetaData converts this to 0 precision and -127 scale
    // For more details, please see
    // https://github.com/apache/spark/pull/8780#issuecomment-145598968
    // and
    // https://github.com/apache/spark/pull/8780#issuecomment-144541760
    if (sqlType == Types.NUMERIC && size == 0) {
      // This is sub-optimal as we have to pick a precision/scale in advance whereas the data
      // in Oracle is allowed to have different precision/scale for each value.
      Some(DecimalType(38, 10))
    } else {
      None
    }
  }

This fixes the issue for 1.4.2, 1.5.3 and 1.6.0 (and DataStax Enterprise 4.8.3). However recently there is another issue.

SPARK-12941 — Spark-SQL JDBC Oracle dialect fails to map string datatypes to Oracle VARCHAR datatype

And this lead me to this SO issue. Unfortunately, one of the answers has a fix they claim only works for 1.5.x however, I had no issue porting it to 1.4.1. The solution in Java looked something like the following, which is just a Scala port of the SO answer above ( This is not under warranty and it may destroy your server, but this should allow you to write to Oracle.)

      JdbcDialect dialect = new JdbcDialect() {
            @Override
            public boolean canHandle(String url) {
                return url.startsWith("jdbc:oracle") 
|| url.contains("oracle");
            }

            @Override
            public Option<JdbcType> getJDBCType(DataType dt) {
                if(DataTypes.StringType.sameType(dt)) {
                    return Option.apply(new JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR));
                } else if(DataTypes.BooleanType.sameType(dt)){
                    return Option.apply(new JdbcType("NUMBER(1)", java.sql.Types.NUMERIC));
                } else if(DataTypes.IntegerType.sameType(dt)) {
                    return Option.apply(new JdbcType("NUMBER(10)", java.sql.Types.NUMERIC));
                } else if(DataTypes.LongType.sameType(dt)) {
                    return Option.apply(new JdbcType("NUMBER(19)", java.sql.Types.NUMERIC));
                } else if(DataTypes.DoubleType.sameType(dt)) {
                    return Option.apply(new JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC));
                } else if(DataTypes.FloatType.sameType(dt)) {
                    return Option.apply(new JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC));
                } else if(DataTypes.ShortType.sameType(dt)) {
                    return Option.apply(new JdbcType("NUMBER(5)", java.sql.Types.NUMERIC));
                } else if(DataTypes.ByteType.sameType(dt)) {
                    return Option.apply(new JdbcType("NUMBER(3)", java.sql.Types.NUMERIC));
                } else if(DataTypes.BinaryType.sameType(dt)) {
                    return Option.apply(new JdbcType("BLOB", java.sql.Types.BLOB));
                } else if(DataTypes.TimestampType.sameType(dt)) {
                    return Option.apply(new JdbcType("DATE", Types.DATE));
                } else if(DataTypes.DateType.sameType(dt)) {
                    return Option.apply(new JdbcType("DATE", Types.DATE));
                } else if(DataTypes.createDecimalType().sameType(dt)) { //unlimited
//      return  DecimalType.Fixed(precision, scale)
 => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", 
java.sql.Types.NUMERIC))
                    return Option.apply(
new JdbcType("NUMBER(38,4)", Types.NUMERIC));
                }
                return Option.empty();
            }
        };
        JdbcDialects.registerDialect(dialect);

In the future keep an eye out for more official support in SPARK-12941 and then you can ever forget the hacky workaround above.

About Ryan Svihla

I consider myself a full stack polyglot, and I have been writing a lot of JS and Ruby as of late. Currently, I'm a solutions architect at DataStax
This entry was posted in Spark and tagged , . Bookmark the permalink. Follow any comments here with the RSS feed for this post.