Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / web / Apache

Avoiding Kotlin Minefields in Apache Beam

0.00/5 (No votes)
18 Jun 2020CPOL5 min read 4.1K  
How to avoid Kotlin minefields in Apache Beam
In this post, you will learn about some of the unique interactions between Kotlin and Beam which will help you avoid some of the potential landmine gotchas that could arise when you are getting started, so you can focus on the great experience between the two technologies.

Avoiding Kotlin Minefields in Apache Beam

Without a doubt, the Java SDK is the most popular and full featured of the languages supported by Apache Beam and if you bring the power of Java's modern, open-source cousin Kotlin into the fold, you'll find yourself with a wonderful developer experience. As with most great relationships, not everything is perfect, and the Beam-Kotlin one isn't totally exempt.

This post will cover some of the unique interactions between the two technologies and help you avoid some of the potential landmine gotchas that could arise when you are getting started, so you can focus on the great experience between Kotlin and Beam.

Declaring Anonymous ParDos / DoFns

When scouring the web looking for examples, it’s fairly common to see something like the following that creates an anonymous DoFn to be used within a ParDo:

Java
lines.apply("Extract Words", ParDo.of(new DoFn<String, String>() { ... }));  

A simple conversion to Kotlin would yield the following:

Java
lines.apply("Extract Words", ParDo.of(DoFn<String, String>() { ... }))  

However, you’ll find that this causes type erasure to occur and Beam will complain about it. Instead in order to implement an anonymous function, you must indicate that an object is inheriting from the DoFn explicitly:

Java
lines.apply("Extract Words", ParDo.of(object : DoFn<String, String>() { ... }))  

Defining TupleTags

TupleTags can be invaluable and necessary if you are dealing with transforms or operations that deal with multiple types, however you may find that issues bubble up related to the declarations of these that cause you to either explicitly require a Coder to be defined via the setCoder() function after retrieving a specific tag.

A dead giveaway would be the following error:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Transform.out1 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: Unable to provide a Coder for V. Building a Coder using a registered CoderProvider failed.

If you encounter this, it’s very likely that you defined your TupleTags as follows:

Java
val userTag = TupleTag<KV<String, User>>()  

Unfortunately, as with most issues in Kotlin, type erasure can be a problem. To avoid this issue, you need to be as explicit as possible when defining a TupleTag and use the object implements pattern as seen below:

Java
val usersTag = object: TupleTag<KV<String, User>>() {} 

The use of the object and trailing open-close curly braces allow the specific types to not be lost when attempting to read from the tag.

IntelliJ Generated Overrides

One of the most appealing features of IntelliJ is the ability to allow the IDE to generate any missing overrides for you when implementing or inheriting from another class / interface. Due to Kotlin’s typechecking system, this can be a challenge since Kotlin explicitly uses a ? character to denote nullability, but Beam will want you to ensure that the types match exactly.

Consider the following function:

Java
class ExampleTransform: PTransform<PCollection<KV<String, Test>>, PCollectionTuple>() {  
    // Omitted for brevity
}

You know that you need to perform some type of operation here, so you take advantage of your IDE and allow it to generate the appropriate overrides:

Avoiding Kotlin Minefields in Apache Beam

When you do this, you’ll see that nullable instances of all of your types will be added, explicitly the parameters:

Java
// Notice the trailing ? after the type definition the input
override fun expand(input: PCollection<KV<String, Test>>?): PCollectionTuple {  
    TODO("Not yet implemented")
}

Beam is extremely explicit with regards to typing and nullability, so you’ll want to ensure that the PCollection in this case is not decorated with the nullable operator:

Java
override fun expand(input: PCollection<KV<String, Test>>): PCollectionTuple {  
    TODO("Not yet implemented")
}

Iterables, But Which Ones?

Both Java and Kotlin have notions of an Iterable interface for working with collections of items, however when leveraging them via a grouping/batching operation such as the GroupIntoBatchs transform, a Kotlin-Java JVM disconnect can occur between the types.

Java
pipeline  
    .apply("Batch Items", GroupIntoBatches.ofSize<Key, Value>(100))
    .apply("Apply Batching Transform", ParDo.of(SomeTransform.transform()))

You may encounter an error that looks like the following:

ProcessContext argument must have type DoFn<Iterable<? extends Value>, Result<? extends Value>>.ProcessContext.

**You can resolve this by adding a @JvmWildcard annotation to the type of the iterable (and not the iterable itself on the DoFn) by changing this:

Java
class SomeTransform: DoFn<KV<Key, Iterable<Value>>, KV<Key, Value>>(){  
  // Omitted for brevity 
}

to this:

Java
class SomeTransform: DoFn<KV<Key, Iterable<@JvmWildcard Value>>, KV<Key, Value>>(){  
  // Omitted for brevity
}

This hint to the JVM should allow it to determine the correct version of the interface to use and be serialized/deserialized by the Beam programming model.

Writing Pipeline Tests

Testing, particularly unit testing, is extremely important when writing Beam applications (and obviously always), however there are two major gotchas in the testing department that you should be aware of when working with Kotlin, namely:

  • Defining Your Pipeline
  • Apply PAsserts
  • Running Pipeline Tests

Defining Your Pipeline

Since the native PAsserts that are applied when writing unit tests against Beam pipelines rely on native Java code, they will require a bit of annotations when being used in Beam. You can use the following as an example for how to construct one:

Java
@get:Rule
@Transitive
val testPipeline: TestPipeline = TestPipeline.create() 

All of your individual unit tests can share this pipeline, but you should consider writing it exactly as above since both the @get:Rule and @Transitive annotations are required, as is the explicit type declaration (e.g. : TestPipeline).

Applying PAsserts

The PAssert library comes bundled with Beam and will allow you to write tests explicitly targeting PCollection objects (e.g., you can write assertions against the contents of them, verify their contents, etc.). These generally will just “work” as expected, however one particular caveat comes when using the .satisfies() function:

Java
PAssert.that(numbers).satisfies { elements ->  
    assertTrue(elements.contains(42))
}

You’ll find that this will not work since the satisfies() function explicitly expects a Java Void to be returned. Since this doesn’t exist within Kotlin, you’ll be required to explicitly place a null at the end of the body of the function:

Java
PAssert.that(numbers).satisfies { elements ->  
    assertTrue(elements.contains(42))
    null // Required
}

Running Pipeline Tests

After potential gotcha can come when attempting to actually execute or run the tests themselves. You have to ensure that the pipeline itself is explicitly run, to completion, after the PAssert is defined:

Java
PAssert.that(numbers).containsInAnyOrder(42)  
testPipeline.run().waitUntilFinish()  

Since the PAssert is constructed as part of the dynamic acyclic graph that executes the pipeline, it must be declared prior to running the tests. You’ll also find that you won’t be able to debug any of the ParDo level operations if you are missing the run() declaration.

Missing a Gotcha?

If you've been working with Apache Beam and Kotlin, I'd love to hear of any specific gotchas or use-cases that you ran into and how you overcame them!

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)