es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Tag: SPARK-STREAMING-KAFKA

Apache Spark con kafka stream – Kafka faltante

He estado intentando configurar Apache Spark con kafka y he escrito un programa simple de forma local y está fallando. No puedo entender el problema al depurar. build.gradle.kts implementation (“org.jetbrains.kotlin:kotlin-stdlib:1.4.0”) implementation (“org.jetbrains.kotlinx.spark:kotlin-spark-api-3.0.0_2.12:1.0.0-preview1”) compileOnly(“org.apache.spark:spark-sql_2.12:3.0.0”) implementation(“org.apache.kafka:kafka-clients:3.0.0”) El código de la función principal es: val spark = SparkSession .builder() .master(“local[*]”) .appName(“Ship metrics”).orCreate val . . . Read more

¿Dónde puedo encontrar las dependencias de JavaTestUtils y BatchCounter mencionadas en JavaMapWithStateSuite.java?

Encontré esta clase java https://github.com/apache/spark/blob/master/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java que quiero ejecutar las pruebas JUNIT para familiarizarme con Spark State. Sin embargo, cuando intento poner esta clase java en un nuevo proyecto en mi IDE, no puede resolver estas dos clases (presumiblemente Java?): JavaTestUtils BatchCounter ¿Dónde puedo encontrar estas clases para ejecutar estas pruebas . . . Read more

Azure data bricks spark streaming con autoloader

Mi fuente es Azure Data Factory, que está copiando archivos a containerA –> FolderA, FolderB, FolderC. Estoy utilizando la siguiente sintaxis para usar el cargador automático y leer los archivos a medida que llegan a cualquiera de las carpetas. Ya he realizado el montaje hasta la cuenta de almacenamiento. source . . . Read more

¿Por qué Scala no puede asignar valores a variables externas en foreach?

Este es mi código: val array = Array(“Tom,9”, “Amy,10”) val peopleRDD = sc.parallelize(array) .map(line => { People(line.split(“,”)(0), line.split(“,”)(1).toInt) }) import ssb.implicits._ val df = peopleRDD.toDF() df.createOrReplaceTempView(“people”) val ipMap = new mutable.HashMap[String, String] ssb.sql(“”” |select name from people |”””.stripMargin) .foreach(x => { ipMap.put(“aaa”, x.apply(0).toString) }) for ((key, value) <- ipMap) { . . . Read more

Filas faltantes durante el procesamiento de registros utilizando foreachBatch en Spark Structured Streaming en Databricks.

Soy nuevo en escenarios en tiempo real y necesito crear trabajos de streaming estructurados en Spark en Databricks. Estoy tratando de aplicar algunas validaciones basadas en reglas desde las configuraciones del backend en cada mensaje JSON entrante. Necesito hacer las siguientes acciones en el JSON entrante: Necesito desglosar el JSON . . . Read more