, "price" -> "B.price", "state" -> "B.state").asInstanceOf[Map[Any, Any]] dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched( col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
Get more about usage:
https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala
7. Support Flink streaming write to CarbonData
Use Case: Carbonata needs to be integrated with fault-tolerant streaming dataflow engines like Apache Flink, where users can build a flink streaming job and use flink sink to write data to carbon through CarbonSDK. Flink sink will generate table stage files, data from stage files can be inserted to the carbon table by carbon Insert stage command, by making them visible for query.
Example:
spark.sql(""" CREATE TABLE test_flink (stringField string, intField int, shortField short) STORED AS carbondata """) // create flink streaming environment StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment() environment.setParallelism(1) environment.enableCheckpointing(2000L) environment.setRestartStrategy(RestartStrategies.noRestart()) DataStreamSource<OUT> stream = environment.addSource(////DataSource like Kafka/////) // create carbon sdk writer factory with LOCAL/S3/OBS builder CarbonWriterFactory factory = CarbonWriterFactory.builder("Local").build(dbName,tableName,tablePath, tableProperties,writerProperties,carbonProperties) // create stream sink and add it to stream StreamingFileSink<IN> streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build() stream.addSink(streamSink) // execute flink streaming job which generate’s stage files environment.execute()
Get more about usage:
https://github.com/apache/carbondata/blob/master/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
8: Add segment