2

Kotlin API for Apache Spark: Streaming, Jupyter, and More

 1 year ago
source link: https://blog.jetbrains.com/big-data-tools/2022/05/26/kotlin-api-for-apache-spark-streaming-jupyter-and-more/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Kotlin

Kotlin API for Apache Spark: Streaming, Jupyter, and More

Pasha Finkelshteyn
Pasha Finkelshteyn May 26, 2022

Hello, fellow data engineers! It’s Pasha here, and today I’m going to introduce you to the new release of Kotlin API for Apache Spark. It’s been a long time since the last major release announcements, mainly because we wanted to avoid bothering you with minor improvements. But today’s announcement is huge!

First, let me remind you what the Kotlin API for Apache Spark is and why it was created. Apache Spark is a framework for distributed computations. It is usually used by data engineers for solving different tasks, for example for the ETL process. It supports multiple languages straight out of the box: Java, Scala, Python, and R. We at JetBrains are committed to supporting one more language for Apache Spark – Kotlin, as we believe it can combine multiple pros from other language APIs while avoiding their cons.

If you don’t want to read and just want to try it out – here is the link to the repo:

Repository

Otherwise, let’s begin our overview.

Spark Streaming

For a long time, we’d supported only one API from Apache Spark: Dataset API. While it’s widely popular, we can’t just ignore the fact that there is at least one more trendy extension to Apache Spark: Spark Streaming. Its name is self-explanatory, but just to be sure that we’re on the same page, allow me to elaborate a little.

Spark Streaming is a solution to build streaming processing systems using Spark. Contrary to other stream processing solutions, Spark Streaming works with micro-batches. When reading data from a source, instead of working on one element at a time, it reads all data in defined time frames or “batches” (for example, it might read everything available each 100ms).

There are multiple core entities in Spark Streaming:

  1. Discretized stream (DStream) represents a continuous stream of data. It can be created from an input source (socket, Kafka, or even a text file) or from another DStream. DStream is represented as a sequence of RDDs (Resilient Distributed Dataset).
  2. Spark streaming context (StreamingContext) is the main entry point for working with DStreams. Its main goal is to provide us with different methods for creating streams from different sources.

As you might already know, we have a special withSpark function in our core API (you can view it here, for example). Of course, for Streaming, we have something similar: withSparkStreaming. It has some defaults that we think are reasonable. You can take a look at them here if you want.

The very basic sample usage will look like this:

withSparkStreaming { // this: KSparkStreamingSession
   val lines: JavaReceiverInputDStream<string> = TODO() // create some string stream, for example, from socket
   val words: JavaDStream<string> = TODO() // some transformation
   words.foreachRDD { rdd: JavaRDD<string>, _: Time ->
      withSpark(rdd) { // this: KSparkSession
         val dataframe: Dataset<testrow> = rdd.map { TestRow(it) }.toDS()
         dataframe
            .groupByKey { it.word }
            .count()
            .show()
      }
   }
}
xxxxxxxxxx
withSparkStreaming { // this: KSparkStreamingSession
   val lines: JavaReceiverInputDStream<string> = TODO() // create some string stream, for example, from socket
   val words: JavaDStream<string> = TODO() // some transformation
   words.foreachRDD { rdd: JavaRDD<string>, _: Time ->
      withSpark(rdd) { // this: KSparkSession
         val dataframe: Dataset<testrow> = rdd.map { TestRow(it) }.toDS()
         dataframe
            .groupByKey { it.word }
            .count()
            .show()
      }
   }
}

What can we see here? We create a Spark Streaming context with the call withSparkStreaming. Many useful things are available inside it, for example, a withSpark function that will obtain or create a Spark Session. It also provides access to the ssc variable (which stands for – you guessed it – “spark streaming context”).

As you can see, no other non-obvious abstractions are involved. We can work with the RDDs, JavaDStreams, etc. that we are familiar with.

The withSpark function inside withSparkStreaming is slightly different from the one you’re familiar with. It can find the right Spark Session from the SparkConf of the ssc variable or (as seen in the example) from an RDD. However, you still get a KSparkSession context which can give you the ability to create Datasets or broadcast variables, etc. But in contrast to its batching counterpart, the Spark Session won’t be closed at the end of the withSpark block. Lastly, it behaves similarly to the Kotlin run function because it returns the last line of its contents.

You can find more examples on our GitHub and more detailed documentation is available on our wiki.

Jupyter support

A Kotlin kernel for Jupyter has existed for some time already. You can perform experiments with different Kotlin for Data Science tools, such as multik (library for multi-dimensional arrays in Kotlin) or KotlinDL (a Deep Learning API written in Kotlin and inspired by Keras, working on top of TensorFlow), and others described in the Kotlin documentation. But we are aware that Jupyter is quite popular among data engineers, too, so we’ve added support for the Kotlin API for Apache Spark as well. In fact, all you need to do to start using it in your notebook is put %use spark in your notebook’s cell. You can see an example on our GitHub.

The main features you will find in this support are autocompletion and table rendering. When you use %use spark, all of the notebook cells are automagically wrapped into one implicit withSpark block, which gives you access to all the sugar we provide.

The aforementioned Spark Streaming is supported too. To use it, all you need to do is add %use spark-streaming. Of course, all the features of dynamic execution are supported – for example, tables will update automatically.

Please be aware that Jupyter support for Kotlin API for Apache Spark is experimental and may have some limitations. One such limitation we are aware of: You cannot mix batching and streaming in the same notebook – withSparkStreaming doesn’t work inside of withSpark block. Don’t hesitate to provide us with examples if something doesn’t seem to behave as it should for you. We’re always happy to help!

You can find an example of a notebook with streaming here.

Of course, it works in Datalore too. Here is an example notebook, as well as an example notebook for streaming. In case you’re not aware of what Datalore is: It’s an online environment for Jupyter notebooks, developed by JetBrains.

A bit more information on Jupyter integration can be found on our wiki.

Deprecating c in favor of t

While preparing this release, we noticed that our own reinvented tuples (called ArityN) aren’t actually that effective when used extensively, and it’s more effective to reuse Scala Tuples. So, we’ve deprecated our factory c method in favor of the factory t method. The semantics are totally the same, but the method creates a native Scala Tuple instead of Kotlin one.

In Kotlin-esque fashion, here are some useful extension methods for tuples:

val a: Tuple2<Int, Long> = tupleOf(1, 2L) // explicit tupleOf, same convention as with `listOf`
val b: Tuple3<String, Double, Int> = t("test", 1.0, 2) // `t` as an alternative to `c`
val c: Tuple3<Float, String, Int> = 5f X "aaa" X 1 // infix function which creates tuple of 3 elements

tupleOf(1) + 2 == tupleOf(1, 2) // '+'-syntax can be used to extend tuples
xxxxxxxxxx
val a: Tuple2<Int, Long> = tupleOf(1, 2L) // explicit tupleOf, same convention as with `listOf`
val b: Tuple3<String, Double, Int> = t("test", 1.0, 2) // `t` as an alternative to `c`
val c: Tuple3<Float, String, Int> = 5f X "aaa" X 1 // infix function which creates tuple of 3 elements
tupleOf(1) + 2 == tupleOf(1, 2) // '+'-syntax can be used to extend tuples

There is much more information about the new Tuples API available in the wiki.

We have also changed the structure of the documentation.

The readme grew too large to easily digest, so we’ve split it up and put its contents into the wiki on GitHub.

Conclusion

These are the most important changes to the Kotlin API for Apache Spark version 1.1. As usual, you can find the release on Maven Central. If you have any ideas or if you need help or support, please contact us on Slack or GitHub issues.

Also, please give a warm welcome to Jolan Rensen, who contributed to the project for some time and now works at JetBrains. He’s the main person in charge of maintaining the project, while yours truly only tries to help him out wherever he can. Jolan, we’re thrilled to have you with us!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK