I ran this on Databricks CE in a notebook and it produces output to a delta table. I am using
.format("rate")approach.val streamingQuery = aggregatesDF.writeStream .format("delta") .foreachBatch(upsertToDelta _) .outputMode("update") .start()Running this, however, produces no output! It stops after one invocation, but the table remains empty.
Reason is?
Surely not a CE limitation? Error?
- Can this processing mode not be run in a cell?
- Volume issue?
- This brings the question to mind, can Trigger Once only be used in the Databricks environment? I am assuming I can run this as a jar under Linux.
Could it be a bug, here goes:
import org.apache.spark.sql.streaming.Trigger val streamingQuery = aggregatesDF.writeStream .trigger(Trigger.Once()) .format("delta") .foreachBatch(upsertToDelta _) .outputMode("update") .start()
.format("rate"), could that be the issue? Which is handy for prototyping.
CodePudding user response:
Trigger.Once isn't limited to Databricks - it's a standard functionality of Spark Structured Streaming. But the problem is that it requires a data source that has a history as it triggers processing of the data since last execution, and rate source don't have a history, always start from the beginning. It's easy to show:
df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "1.cp") \
.outputMode("append").save("1.parquet")
spark.read.parquet("1.parquet").show()
--------- -----
|timestamp|value|
--------- -----
--------- -----
If you want to continue to use rate for experimenting, it's better to create an additional table that will be a buffer between rate and your code. Something like this:
# Create a buffer table
df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "buffer.cp") \
.format("delta").outputMode("append").save("1.delta")
# Use buffer table
bufferDF = spark.read.stream.format("delta").load("1.delta")
aggregatesDF = bufferDF....
streamingQuery = aggregatesDF.writeStream
.trigger(once=True)
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
P.S. It makes no sense use .format("delta") together with .foreachBatch - the latter takes precedence.
