Databricks Example
Because Databricks functions as an external system, you can develop their own business logic using Spark within it. In this context, Macrometa serves as both a source (for taking inputs) and a target (for persisting outputs) from the perspective of Databricks.
This example shows you how to read data from a Macrometa stream and write data to a Macrometa collection using the Macrometa Databricks Client Connector.
Read from a Macrometa Stream
Set up your source options:
val sourceOptions = Map(
"regionUrl" -> regionUrl,
"port" -> "6651",
"apikey" -> apikey,
"fabric" -> fabric,
"tenant" -> tenant,
"replication" -> replication,
"stream" -> sourceStream,
"subscriptionName" -> sourceSubscription
)Create a Spark session:
val spark = SparkSession.builder()
.appName("MacrometaCollectionStreamingApp")
.master("local[*]")
.getOrCreate()Read from the Macrometa stream:
val inputStream = spark.readStream
.format("com.macrometa.spark.stream.MacrometaTableProvider")
.options(sourceOptions)
.load()
Write to a Macrometa Collection
Set up your target options:
val targetOptions = Map(
"regionUrl" -> regionUrl,
"apiKey" -> "apikey ",
"fabric" -> fabric,
"collection" -> "<YOUR_TARGET_COLLECTION>",
"batchSize" -> "100"
)Write to the Macrometa collection:
val query = inputStream.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write
.format("com.macrometa.spark.collection.MacrometaTableProvider")
.options(targetOptions)
.mode(SaveMode.Append)
.save()
}
.option("checkpointLocation", "checkpoint")
.start()
- Close the Spark session:
query.awaitTermination()