The Scala language is widely used in Spark and machine learning. Even though Spark is designed for massive parallel processing across large number of nodes, it also allows us to run it in local mode within a JVM.
The Scala programs are compiled into Java bytecode, so we should be able to run it with local Spark in a Java user-defined function (Java UDF) on Db2 for z/OS. I demonstrate this here by modifying the example SparkPi.scala function and running it as a Java UDF on Db2 for z/OS..
IBM Open Data Analytics for z/OS (IzODA) includes the IBM Z version of Spark. You'll need to install IzODA to get the Spark jar files. Because we are running Spark in local mode, we just need to include the Spark jar files in the CLASSPATH.
Here are the steps:
-
First modify the SparkPi.scala function to start the local Spark session and add a UDF method as the interface method for the UDF, and also add the variable returnval to propagate the result string to the UDF method.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples
import org.apache.spark.sql.SparkSession
import scala.math.random
/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("Spark Pi")
.getOrCreate()
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x * x + y * y <= 1) 1 else 0
}.reduce(_ + _)
println(s"Pi is roughly ${4.0 * count / (n - 1)}")
returnVal = s"Pi is roughly ${4.0 * count / (n - 1)}"
spark.stop()
}
var returnVal: String = null
def udf(parm1: String): String = {
val args: Array[String] = Array(parm1)
main(args)
return returnVal
}
}
// scalastyle:on println
-
Next compile the SparkPi.scala program and build the sparkPi.jar file with a Scala IDE. Then use binary mode to transfer the sparkPi.jar to the /tmp directory on z/OS.
-
Modify the Db2 envfile.txt file, and add the sparkPi.jar and Spark jar files into the CLASSPATH. You'll need to customize the following sample for your system.
CLASSPATH=/usr/include/java_classes/gxljapi.jar:/usr/include/java_classes/gxljosrgImpl.jar:/usr/lpp/db2/devbase/base/classes/dsntrvfj.jar:/tmp/sparkPi.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/scala-library-2.11.8.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/spark-sql_2.11-2.3.4.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/spark-core_2.11-2.3.4.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/hadoop-common-2.7.7.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/spark-network-common_2.11-2.3.4.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/commons-lang3-3.5.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/slf4j-api-1.7.16.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/slf4j-log4j12-1.7.16.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/log4j-1.2.17.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/commons-configuration-1.6.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/guava-14.0.1.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/commons-lang-2.6.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/hadoop-auth-2.7.7.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/commons-collections-3.2.2.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/metrics-core-3.1.5.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/spark-catalyst_2.11-2.3.4.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/spark-kvstore_2.11-2.3.4.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/spark-launcher_2.11-2.3.4.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/spark-network-shuffle_2.11-2.3.4.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/netty-all-4.1.43.Final.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/kryo-shaded-3.0.3.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/chill_2.11-0.8.4.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/spark-unsafe_2.11-2.3.4.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/json4s-ast_2.11-3.2.11.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/scala-xml_2.11-1.0.5.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/javax.servlet-api-3.1.0.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/jersey-container-servlet-core-2.22.2.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/jersey-server-2.22.2.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/jackson-annotations-2.10.1.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/jackson-databind-2.10.1.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/jackson-core-2.10.1.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/jackson-module-paranamer-2.10.1.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/jackson-module-scala_2.11-2.10.1.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/metrics-json-3.1.5.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/paranamer-2.8.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/xbean-asm5-shaded-4.4.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/lz4-java-1.4.0.jar:/usr/lpp/izoda/v110/spark/spark23x/jars/commons-logging-1.1.3.jar
LIBPATH=/usr/lib/java_runtime:/usr/lpp/devbase/base/lib/libdsntrvfd.so
STEPLIB=DB2A.SDSNLOAD
-
Create the Java UDF on the Db2 for z/OS.You can use the following statement to create the sparkPi UDF.
CREATE FUNCTION sparkPi(value VARCHAR(2048))
RETURNS VARCHAR(4096)
EXTERNAL NAME 'org.apache.spark.examples.SparkPi.udf'
LANGUAGE JAVA
PARAMETER STYLE JAVA
PARAMETER CCSID UNICODE
VARIANT
NO SQL
WLM ENVIRONMENT WLMJAVA
PROGRAM TYPE SUB;
-
Now it is time to test the sparkPi UDF. To test the UDF, you can just bring up SPUFI and issue following query.
SELECT SPARKPI('10') FROM SYSIBM.SYSDUMMY1;
The result
I ran the query on my SPUFI, and I got following output.
Pi is roughly 3.1418151418151417
My sparkPi Java UDFJ also writes stdout and stderr to the following files:
javaout.txt
Begin...
Pi is roughly 3.1418151418151417
End...
jaraerr.txt
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/02/19 23:48:26 INFO SparkContext: Running Spark version 2.3.4
20/02/19 23:48:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/02/19 23:48:27 INFO SparkContext: Submitted application: Spark Pi
20/02/19 23:48:27 INFO SecurityManager: Changing view acls to: SYSDSP
20/02/19 23:48:27 INFO SecurityManager: Changing modify acls to: SYSDSP
20/02/19 23:48:27 INFO SecurityManager: Changing view acls groups to:
20/02/19 23:48:27 INFO SecurityManager: Changing modify acls groups to:
20/02/19 23:48:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(SYSDSP); groups with view permissions: Set(); users with modify permissions: Set(SYSDSP); groups with modify permissions: Set()
20/02/19 23:48:35 WARN MacAddressUtil: Failed to find a usable hardware address from the network interfaces; using random bytes: 1a:e5:f0:eb:19:76:63:c6
20/02/19 23:48:35 INFO Utils: Successfully started service 'sparkDriver' on port 1025.
20/02/19 23:48:35 INFO SparkEnv: Registering MapOutputTracker
20/02/19 23:48:35 INFO SparkEnv: Registering BlockManagerMaster
20/02/19 23:48:35 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/02/19 23:48:35 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/02/19 23:48:36 INFO DiskBlockManager: Created local directory at /SYSTEM/tmp/blockmgr-d9040f2d-fafc-4eb5-bc51-03986aeb63bc
20/02/19 23:48:36 INFO MemoryStore: MemoryStore started with capacity 117.2 MB
20/02/19 23:48:36 INFO SparkEnv: Registering OutputCommitCoordinator
20/02/19 23:48:40 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/02/19 23:48:40 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://HOST.vmec.svl.ibm.com:4040
20/02/19 23:48:41 INFO Executor: Starting executor ID driver on host localhost
20/02/19 23:48:41 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 1026.
20/02/19 23:48:41 INFO NettyBlockTransferService: Server created on HOST.vmec.svl.ibm.com:1026
20/02/19 23:48:41 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/02/19 23:48:41 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, HOST.vmec.svl.ibm.com, 1026, None)
20/02/19 23:48:41 INFO BlockManagerMasterEndpoint: Registering block manager HOST.vmec.svl.ibm.com:1026 with 117.2 MB RAM, BlockManagerId(driver, HOST.vmec.svl.ibm.com, 1026, None)
20/02/19 23:48:41 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, HOST.vmec.svl.ibm.com, 1026, None)
20/02/19 23:48:41 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, HOST.vmec.svl.ibm.com, 1026, None)
20/02/19 23:48:42 INFO SparkContext: Starting job: reduce at SparkPi.scala:42
20/02/19 23:48:42 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:42) with 10 output partitions
20/02/19 23:48:42 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:42)
20/02/19 23:48:42 INFO DAGScheduler: Parents of final stage: List()
20/02/19 23:48:42 INFO DAGScheduler: Missing parents: List()
20/02/19 23:48:42 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:38), which has no missing parents
20/02/19 23:48:42 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1904.0 B, free 117.2 MB)
20/02/19 23:48:43 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1264.0 B, free 117.2 MB)
20/02/19 23:48:43 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on HOST.vmec.svl.ibm.com:1026 (size: 1264.0 B, free: 117.2 MB)
20/02/19 23:48:43 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1039
20/02/19 23:48:43 INFO DAGScheduler: Submitting 10 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:38) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
20/02/19 23:48:43 INFO TaskSchedulerImpl: Adding task set 0.0 with 10 tasks
20/02/19 23:48:43 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7853 bytes)
20/02/19 23:48:43 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/02/19 23:48:43 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 910 bytes result sent to driver
20/02/19 23:48:43 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7853 bytes)
20/02/19 23:48:43 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
20/02/19 23:48:43 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 284 ms on localhost (executor driver) (1/10)
20/02/19 23:48:43 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 867 bytes result sent to driver
20/02/19 23:48:43 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 7853 bytes)
20/02/19 23:48:43 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
20/02/19 23:48:43 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 84 ms on localhost (executor driver) (2/10)
20/02/19 23:48:43 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 867 bytes result sent to driver
20/02/19 23:48:43 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, executor driver, partition 3, PROCESS_LOCAL, 7853 bytes)
20/02/19 23:48:43 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
20/02/19 23:48:43 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 79 ms on localhost (executor driver) (3/10)
20/02/19 23:48:43 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 867 bytes result sent to driver
20/02/19 23:48:43 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, localhost, executor driver, partition 4, PROCESS_LOCAL, 7853 bytes)
20/02/19 23:48:43 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
20/02/19 23:48:43 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 84 ms on localhost (executor driver) (4/10)
20/02/19 23:48:43 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 867 bytes result sent to driver
20/02/19 23:48:43 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5, localhost, executor driver, partition 5, PROCESS_LOCAL, 7853 bytes)
20/02/19 23:48:43 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 71 ms on localhost (executor driver) (5/10)
20/02/19 23:48:43 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
20/02/19 23:48:43 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 867 bytes result sent to driver
20/02/19 23:48:43 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6, localhost, executor driver, partition 6, PROCESS_LOCAL, 7853 bytes)
20/02/19 23:48:43 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 60 ms on localhost (executor driver) (6/10)
20/02/19 23:48:43 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
20/02/19 23:48:44 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 867 bytes result sent to driver
20/02/19 23:48:44 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7, localhost, executor driver, partition 7, PROCESS_LOCAL, 7853 bytes)
20/02/19 23:48:44 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 69 ms on localhost (executor driver) (7/10)
20/02/19 23:48:44 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
20/02/19 23:48:44 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 867 bytes result sent to driver
20/02/19 23:48:44 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8, localhost, executor driver, partition 8, PROCESS_LOCAL, 7853 bytes)
20/02/19 23:48:44 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 58 ms on localhost (executor driver) (8/10)
20/02/19 23:48:44 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
20/02/19 23:48:44 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 867 bytes result sent to driver
20/02/19 23:48:44 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9, localhost, executor driver, partition 9, PROCESS_LOCAL, 7853 bytes)
20/02/19 23:48:44 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 60 ms on localhost (executor driver) (9/10)
20/02/19 23:48:44 INFO Executor: Running task 9.0 in stage 0.0 (TID 9)
20/02/19 23:48:44 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 867 bytes result sent to driver
20/02/19 23:48:44 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 59 ms on localhost (executor driver) (10/10)
20/02/19 23:48:44 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:42) finished in 1.440 s
20/02/19 23:48:44 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
20/02/19 23:48:44 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:42, took 1.516106 s
20/02/19 23:48:44 INFO SparkUI: Stopped Spark web UI at http://HOST.vmec.svl.ibm.com:4040
20/02/19 23:48:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/02/19 23:48:44 INFO MemoryStore: MemoryStore cleared
20/02/19 23:48:44 INFO BlockManager: BlockManager stopped
20/02/19 23:48:44 INFO BlockManagerMaster: BlockManagerMaster stopped
20/02/19 23:48:44 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/02/19 23:48:44 INFO SparkContext: Successfully stopped SparkContext
#Db2forz/OS#db2z/os#Db2Znews