Coder Social home page Coder Social logo

spark-rabbitmq's Introduction

[Coverage Status] (https://coveralls.io/github/Stratio/spark-rabbitmq?branch=master)

RabbitMQ Spark Streaming Receiver

RabbitMQ-Receiver is a library that allows the user to read data with Apache Spark Streaming from RabbitMQ.

Requirements

This library requires Spark 2.0+, Scala 2.11+, RabbitMQ 3.5+

Using the library

There are two ways of using RabbitMQ-Receiver library:

The first one is to add the next dependency in your pom.xml:

<dependency>
  <groupId>com.stratio.receiver</groupId>
  <artifactId>spark-rabbitmq</artifactId>
  <version>LATEST</version>
</dependency>

The other one is to clone the full repository and build the project:

git clone https://github.com/Stratio/spark-rabbitmq.git
mvn clean install

This library includes two implementations for consuming messages from RabbitMQ with Spark Streaming:

Build

mvn clean package

Distributed Approach

This advanced consumer has been implemented extending the Spark InputDStream class. With this approach the user can consume message from multiple rabbitMQ clusters or multiple rabbitMQ queues. In addition is possible to parallelize the consumption in one node starting more than one consumer, one for each Spark RDD Partition.

  • One executor with multiple parallelized consumer from one queue

Single-Parallelized

  • One executor or more with multiple consumers from multiple queues

Multiple-Parallelized

  • Cluster consumer

Cluster

The consumption options establish the number of partitions of the RDD generated by the RabbitMQDStream, each Spark node consumes messages and these messages define the data that are included in the partitions. Is not necessary to save the data into the Spark Block Manager. The future transformations and actions use the data stored on each executor.

When the Streaming starts the time window is divided, to consume and to compute, by default the time for consuming messages from rabbitMQ is 0.9 times the Spark Window. It is possible to limit the time in order to have better performance, in the configuration the user can choose the "maxReceiveTime" in milliseconds. In addition is possible to limit the number of consumed messages with the configuration parameter "maxMessagesPerPartition"

This receiver has optimized the RDD functions count and countAprox.

Each executor has one connection pool and are reused on each streaming batch window in order to have better performance. The actual kafka direct approach implemented by Spark does not have one connection pool, this provoke that on each iteration, the RDDs create a new kafka connection.

This consumer has a limitation, the minimum storage level selected for this RabbitMQDStream is MEMORY_ONLY, the user can't select NONE, because on each Spark action the RDD will be re-computed

Scala API

  • String
val receiverStream = RabbitMQUtils.createDistributedStream[String](sparkStreamingContext, params, distributedKeys)
  • Generic user Type
val receiverStream = RabbitMQUtils.createDistributedStream[R](sparkStreamingContext, params, distributedKeys, Array[Byte] => R))

Java API

JavaReceiverInputDStream receiverStream = RabbitMQUtils.createJavaDistributedStream[R](javaSparkStreamingContext, params, JFunction[Array[Byte], R]);

Spark Parameters Options

Parameter Description Optional
maxMessagesPerPartition Maximum number of messages Yes
levelParallelism Num. of partitions by executor Yes (default: 1)
MaxReceiveTime Max time to receive messages Yes (default: 0) (auto)
rememberDuration Remember duration for Spark Dstreams Yes (default: 60s)

Receiver-based Approach

This is a basic consumer, when the Streaming Context starts Spark run one process in one executor for consuming messages from RabbitMQ. This consumer has one singleton consumer instance for consuming messages asynchronously, on each spark window the consumer receives messages and saves the blocks received inside the Spark Block Memory. All the data is replicated to other nodes. The receiver extends one Akka Actor, this makes that the receiver-base approach implementation has the Akka dependency. In future versions of Spark the Akka dependency will be removed.

Scala API

  • String
val receiverStream = RabbitMQUtils.createStream[String](sparkStreamingContext, params)
  • Generic user Type
val receiverStream = RabbitMQUtils.createStream[R](sparkStreamingContext, params, Array[Byte] => R)

Java API

JavaReceiverInputDStream receiverStream = RabbitMQUtils.createJavaStream[R](javaSparkStreamingContext, params, JFunction[Array[Byte], R]);

RabbitMQ Parameters Options

Parameter Description Optional
hosts RabbitMQ hosts Yes (default: localhost)
virtualHost RabbitMQ virtual Host Yes
queueName Queue name Yes
exchangeName Exchange name Yes
exchangeType Exchange type Yes
routingKeys Routing keys comma separated Yes
userName RabbitMQ username Yes
password RabbitMQ password Yes
durable durable Yes (default: true)
exclusive exclusive Yes (default: false)
autoDelete autoDelete Yes (default: false)
ackType basic/auto Yes (default: basic)
fairDispatch fairDispatch Yes (default: false)
prefetchCount prefetchCount Yes (default: 1)
storageLevel Apache Spark storage level Yes (default: MEMORY_ONLY)
x-max-length RabbitMQ queue property Yes
x-message-ttl RabbitMQ queue property Yes
x-expires RabbitMQ queue property Yes
x-max-length-bytes RabbitMQ queue property Yes
x-dead-letter-exchange RabbitMQ queue property Yes
x-dead-letter-routing-key RabbitMQ queue property Yes
x-max-priority RabbitMQ queue property Yes

License

Licensed to STRATIO (C) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The STRATIO (C) licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

spark-rabbitmq's People

Contributors

aalfonso-stratio avatar ahvargas avatar anistal avatar avalcepina avatar becaresss avatar compae avatar croofec avatar danielcsant avatar dcarroza-stratio avatar fjavierjimenez avatar gschiavon avatar kevinmellott91 avatar maimonoded avatar nelsou avatar sgomezg avatar stephaneseng avatar vetler avatar witokondoria avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-rabbitmq's Issues

Error creating channel and connection: connection is already closed due to connection error; cause: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 1

spark-rabbitmq version - 0.5.1
spark version - 2.1.0 (scala version - 2.11.8)
rabbitmq version - 3.5.6

I'm using Distributed approach for streaming -

    List<JavaRabbitMQDistributedKey> distributedKeys = new      LinkedList<JavaRabbitMQDistributedKey>();

    distributedKeys.add(new JavaRabbitMQDistributedKey(prop.getProperty("queue.name"),
            new ExchangeAndRouting(prop.getProperty("queue.exchange")),
            rabbitMqConParams
    ));

    Function<Delivery, String> messageHandler = new Function<Delivery, String>() {

        public String call(Delivery message) {
            return new String(message.getBody());
        }
    };

    JavaInputDStream<String> receiverStream =
            RabbitMQUtils.createJavaDistributedStream(streamCtx, String.class, distributedKeys, rabbitMqConParams, messageHandler);

I keep getting

    2017-01-31 12:48:10,376 ERROR [Executor task launch worker-4] executor.Executor: Exception in task 0.0 in stage 2.0 (TID 7)
    java.util.concurrent.TimeoutException
        at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:76)
        at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:110)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:372)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:583)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:508)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:501)
        at org.apache.spark.streaming.rabbitmq.consumer.Consumer.close(Consumer.scala:132)
        at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.close(RabbitMQRDD.scala:234)
        at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745
   
    2017-01-31 12:48:10,424 INFO  [Executor task launch worker-4] executor.Executor: Running task 0.1 in stage 2.0 (TID 8)
    2017-01-31 12:48:10,432 WARN  [Executor task launch worker-4] consumer.Consumer: Failed to createChannel connection is already closed due to connection error; cause: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 1. Remove connection 
    2017-01-31 12:48:10,432 WARN  [Executor task launch worker-4] storage.BlockManager: Putting block rdd_3_0 failed due to an exception
    2017-01-31 12:48:10,432 WARN  [Executor task launch worker-4] storage.BlockManager: Block rdd_3_0 could not be removed as it was not found on disk or in memory
    2017-01-31 12:48:10,435 ERROR [Executor task launch worker-4] executor.Executor: Exception in task 0.1 in stage 2.0 (TID 8)
    org.apache.spark.SparkException: Error creating channel and connection: connection is already closed due to connection error; cause: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 1
        at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.apply(Consumer.scala:211)
        at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getConsumer(RabbitMQRDD.scala:243)
        at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.<init>(RabbitMQRDD.scala:166)
        at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD.compute(RabbitMQRDD.scala:143)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
        at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Anyone had this issue before? Any suggestions on how to solve it.

Thanks
Akhila.

Messages get unacked after few hours (but still pulled)

Everything is working fine and after few hours my messages (still pulled from the queue) are never acked ...

Queued messages
Ready   0
Unacked 439,923
Total       439,923

Here are the logs

16/07/21 14:37:00 INFO distributed.RabbitMQRDD: Receiving data in Partition 0 from  [queue: xx-sparkstreaming-currentrec, exchange: logs, exchangeType: topic , routingKeys: #, connectionParams: hosts -> 10.98.251.205:5672 , vHost -> xx , virtualHost -> xx , maxReceiveTime -> 4500 , userName -> xx , maxMessagesPerPartition -> 500 , password -> xxxxxxxxxx, withFairDispatch: false]
16/07/21 14:37:05 INFO distributed.RabbitMQRDD: ******* Received 2 messages by Partition : 0  before close Channel ******
16/07/21 14:37:10 INFO storage.MemoryStore: ensureFreeSpace(4392) called with curMem=49765876, maxMem=1111511531
16/07/21 14:37:10 INFO storage.MemoryStore: Block rdd_23164_0 stored as values in memory (estimated size 4.3 KB, free 1012.6 MB)
16/07/21 14:37:10 INFO storage.MemoryStore: ensureFreeSpace(7328) called with curMem=49770268, maxMem=1111511531
16/07/21 14:37:10 INFO storage.MemoryStore: Block rdd_23165_0 stored as values in memory (estimated size 7.2 KB, free 1012.5 MB)
16/07/21 14:37:10 INFO storage.MemoryStore: ensureFreeSpace(1545) called with curMem=49777596, maxMem=1111511531
16/07/21 14:37:10 INFO storage.MemoryStore: Block rdd_23212_0 stored as bytes in memory (estimated size 1545.0 B, free 1012.5 MB)
16/07/21 14:37:10 INFO storage.MemoryStore: ensureFreeSpace(1626) called with curMem=49779141, maxMem=1111511531
16/07/21 14:37:10 INFO storage.MemoryStore: Block rdd_23245_0 stored as bytes in memory (estimated size 1626.0 B, free 1012.5 MB)
16/07/21 14:37:10 INFO executor.Executor: Finished task 0.0 in stage 15799.0 (TID 55155). 2178 bytes result sent to driver
16/07/21 14:37:10 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 55160
16/07/21 14:37:10 INFO executor.Executor: Running task 5.0 in stage 15799.0 (TID 55160)
16/07/21 14:37:10 INFO spark.CacheManager: Partition rdd_23245_5 not found, computing it
16/07/21 14:37:10 INFO spark.CacheManager: Partition rdd_23217_0 not found, computing it
16/07/21 14:37:10 INFO spark.CacheManager: Partition rdd_23185_0 not found, computing it
16/07/21 14:37:10 INFO spark.CacheManager: Partition rdd_23184_0 not found, computing it
16/07/21 14:37:11 INFO distributed.RabbitMQRDD: Receiving data in Partition 0 from  [queue: xx-sparkstreaming-currentrec, exchange: logs, exchangeType: topic , routingKeys: #, connectionParams: hosts -> 10.98.251.205:5672 , vHost -> xx , virtualHost -> xx , maxReceiveTime -> 4500 , userName -> xx , maxMessagesPerPartition -> 500 , password -> xxxxxxxxxx, withFairDispatch: false]
16/07/21 14:37:16 INFO distributed.RabbitMQRDD: ******* Received 1 messages by Partition : 0  before close Channel ******

Any ideas ?

Basic ackType problems -- queue

When I set actType to basic , the MQ queue got blocked after a while , but when I set it to auto , the app works well. What 's causing this ?

Username Password Authentication not supported

Hi,
The RabbitMQUtils.createJavaStream* API's does not seem to support Authentication mechanisms.
I have created the Stream using

RabbitMQUtils.createJavaStreamFromAQueue(
                ssc, rabbitmqProps.get("host"), Integer.parseInt(rabbitmqProps.get("port")), 
                rabbitmqProps.get("queueName"), StorageLevel.MEMORY_AND_DISK_SER_2());

And I get the error

com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

Is there any way I can configure the username / password for rabbitmq . Am i missing something ?

com.rabbitmq.client.ShutdownSignalException: connection error

I am getting this exception and not sure where the problem lies. I have checked that the all the nodes in the cluster can telnet to the rabbitmq Server. userid and password along with other rabbit params are triple checked. Please lmk if there are any hints:

16/12/27 15:48:06 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 3388@ip-10-97-25-249 16/12/27 15:48:06 INFO SignalUtils: Registered signal handler for TERM 16/12/27 15:48:06 INFO SignalUtils: Registered signal handler for HUP 16/12/27 15:48:06 INFO SignalUtils: Registered signal handler for INT 16/12/27 15:48:06 INFO SecurityManager: Changing view acls to: yarn,qa2 16/12/27 15:48:06 INFO SecurityManager: Changing modify acls to: yarn,qa2 16/12/27 15:48:06 INFO SecurityManager: Changing view acls groups to: 16/12/27 15:48:06 INFO SecurityManager: Changing modify acls groups to: 16/12/27 15:48:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, qa2); groups with view permissions: Set(); users with modify permissions: Set(yarn, qa2); groups with modify permissions: Set() 16/12/27 15:48:07 INFO TransportClientFactory: Successfully created connection to /10.97.25.249:34195 after 98 ms (0 ms spent in bootstraps) 16/12/27 15:48:07 INFO SecurityManager: Changing view acls to: yarn,qa2 16/12/27 15:48:07 INFO SecurityManager: Changing modify acls to: yarn,qa2 16/12/27 15:48:07 INFO SecurityManager: Changing view acls groups to: 16/12/27 15:48:07 INFO SecurityManager: Changing modify acls groups to: 16/12/27 15:48:07 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, qa2); groups with view permissions: Set(); users with modify permissions: Set(yarn, qa2); groups with modify permissions: Set() 16/12/27 15:48:07 INFO TransportClientFactory: Successfully created connection to /10.97.25.249:34195 after 2 ms (0 ms spent in bootstraps) 16/12/27 15:48:07 INFO DiskBlockManager: Created local directory at /mnt/yarn/usercache/qa2/appcache/application_1482249989935_0011/blockmgr-0c34224d-a858-4992-b0ca-2d22857c106c 16/12/27 15:48:07 INFO MemoryStore: MemoryStore started with capacity 1038.8 MB 16/12/27 15:48:08 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://[email protected]:34195 16/12/27 15:48:08 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 16/12/27 15:48:08 INFO Executor: Starting executor ID 2 on host 10.97.25.249 16/12/27 15:48:08 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41012. 16/12/27 15:48:08 INFO NettyBlockTransferService: Server created on 10.97.25.249:41012 16/12/27 15:48:08 INFO BlockManager: external shuffle service port = 7337 16/12/27 15:48:08 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(2, 10.97.25.249, 41012) 16/12/27 15:48:08 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(2, 10.97.25.249, 41012) 16/12/27 15:48:08 INFO BlockManager: Registering executor with local external shuffle service. 16/12/27 15:48:08 INFO TransportClientFactory: Successfully created connection to /10.97.25.249:7337 after 8 ms (0 ms spent in bootstraps) 16/12/27 15:48:08 INFO CoarseGrainedExecutorBackend: Got assigned task 0 16/12/27 15:48:08 INFO CoarseGrainedExecutorBackend: Got assigned task 1 16/12/27 15:48:08 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 16/12/27 15:48:08 INFO Executor: Running task 0.0 in stage 2.0 (TID 1) 16/12/27 15:48:08 INFO TorrentBroadcast: Started reading broadcast variable 1 16/12/27 15:48:08 INFO TransportClientFactory: Successfully created connection to /10.97.25.249:41270 after 1 ms (0 ms spent in bootstraps) 16/12/27 15:48:08 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 7.2 KB, free 1038.8 MB) 16/12/27 15:48:08 INFO TorrentBroadcast: Reading broadcast variable 1 took 143 ms 16/12/27 15:48:08 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 17.6 KB, free 1038.8 MB) 16/12/27 15:48:08 INFO TorrentBroadcast: Started reading broadcast variable 0 16/12/27 15:48:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.2 KB, free 1038.8 MB) 16/12/27 15:48:08 INFO TorrentBroadcast: Reading broadcast variable 0 took 13 ms 16/12/27 15:48:08 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 21.9 KB, free 1038.8 MB) 16/12/27 15:48:12 WARN ForgivingExceptionHandler: An unexpected connection driver error occured (Exception message: Connection reset) 16/12/27 15:48:12 WARN BlockManager: Putting block rdd_2_0 failed due to an exception 16/12/27 15:48:12 WARN BlockManager: Block rdd_2_0 could not be removed as it was not found on disk or in memory 16/12/27 15:48:12 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 1) java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:381) at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:62) at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:900) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:859) at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:681) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.org$apache$spark$streaming$rabbitmq$consumer$Consumer$$addConnection(Consumer.scala:266) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$$anonfun$1.apply(Consumer.scala:258) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$$anonfun$1.apply(Consumer.scala:258) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:59) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.getChannel(Consumer.scala:258) at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.apply(Consumer.scala:201) at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getConsumer(RabbitMQRDD.scala:230) at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.<init>(RabbitMQRDD.scala:165) at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD.compute(RabbitMQRDD.scala:142) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.rabbitmq.client.ShutdownSignalException: connection error at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ... 48 more Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:209) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288) at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:91) at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:571) ... 1 more

Is there a way to retrieve the Routing key along with the message from Rabbit?

Hi,

I have a situation where I need to read off a rabbit queue where the routing key contains a wildcard indicating a device id ... i.e. "gps.#" . # represents an alphanumeric id. The message itself does not contain this device id. When I read the RDD I don't see a way to extract the routing key. Is there a way to extract the routing key for each message?

Multiple queue priority

Is there a way of consuming from multiple queues with a distributed stream according to a priority order e.g.

Two queues: primary and secondary. We prefer to consume primary but will take messages from secondary when there's spare capacity.

To implement we could check the number of messages consumed from the primary queue against a fraction of maxMessagesPerPartition on a corresponding time fraction of the stream batch size. With a stream batch size of 10 seconds and maxMessagesPerPartition = 1000, we would check the stream for priority every second, expecting to see 100 messages. We could limit each stream fraction to 100 messages and compare actual consumption against expected. Where we have spare capacity in any stream fraction (consumed from primary), we could consume from the secondary queue on the next.

Is this possible? Happy to write it if you'd accept as a PR.

org.apache.spark.SparkException: Error receiving data from RabbitMQ with error: charsetName

What's the reason for this error? Any advice? Thanks !
the original error message is :
16/08/01 17:03:21 ERROR executor.Executor: Exception in task 5.0 in stage 0.0 (TID 5) org.apache.spark.SparkException: Error receiving data from RabbitMQ with error: charsetName at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getNext(RabbitMQRDD.scala:207) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

Error creating channel and connection: Unknown channel number 77

Everything is in the stacktrace, I got:

Error creating channel and connection: connection is already closed due to connection error; cause: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 77

any idea ?

2016-08-25 14:53:30,030 WARN TaskSetManager: Lost task 5.0 in stage 8211.0 (TID 58525, datanode006): org.apache.spark.SparkException: Error creating channel and connection: connection is already closed due to connection error; cause: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 77
    at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.apply(Consumer.scala:203)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getConsumer(RabbitMQRDD.scala:229)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.<init>(RabbitMQRDD.scala:164)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD.compute(RabbitMQRDD.scala:141)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Mark spark dependencies as provided

Hi All,

Could you please mark spark-streaming and spark-sql dependencies as provided? Actually, when anyone will try to run application, all spark dependencies will be available in classpath.

Thank you.

Support maximum micro batch size

The connector will aggressively empty the queue for every micro batch. Meaning RabbitMQ is no longer doing any work as a queue and instead Spark has to handle peaks by queuing batches. This is fine if Spark is running stable, but throughput peaks can mean for us a very long queue in Spark. Unfortunately this exposes us to more risk of data loss:

  • Data is safer sitting on the Rabbit queue than having to rely on checkpointing for a restart.
  • The system is far more likely to crash when the queue is large i.e. you are setting yourself up for data loss on every crash.
  • If the system crashes, it can become impossible to restart it as the first batch will completely empty the queue, causing another high throughput crash.

A solution would be to provide an option of limiting the maximum micro batch size that RabbitMQ connector is allowed to ack from RabbitMQ.

Problem trying to configure 'x-message-ttl'

I am currently trying to configure the "x-message-ttl" param in my receiver, but I am getting the following error:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'xxxxx' in vhost '/': received '10000' but current is '10000', class-id=50, method-id=10).

The rabbitParams I'm using are:

val rabbitParams =  Map(
      "hosts" -> "xxxxx",
      "queueName" -> "xxxxx",
      "exchangeName" -> "xxxxx",
      "userName" -> "xxxxx",
      "password" -> "xxxxx",
      "storageLevel" -> "MEMORY_AND_DISK_SER_2",
      "durable" -> "false",
      "x-message-ttl" -> "10000"
    )

Can anyone help me?

Queue is created without binding to exchange.

I have an issue where the queue gets created but the queue never gets bound to the exchange.
If I manually login to RabbitMQ's web portal I can add the queue to the correct exchange & everything works perfectly. I was hoping this would get re-established automatically when starting the spark job.

I've debugged the app and it appears to be passing in the correct key value ('exchangeName', 'MyExchange')
Any ideas why this is occurring?
-Will

NoSuchMethodError com.google.common.util.concurrent.RateLimiter.acquire()V

I am using spark 1.5.1 with cassandra connector 2.1 and the latest Rabbitmq-Receiver.
There is already a few confusions w.r.t spark and cassandra guava library clash. spark needs guava 14.jar and cassandra connector needs guava 15+. I have set the library path in spark in order to pickup the guava 16 jar. However, I seem to be getting the above mentioned error from RabbitMQInputDStream line 97 i.e " store(new Predef.String(delivery.getBody))"

The full stack trace is as follows

2015-12-04 18:25:16 INFO  RabbitMQReceiver:143 - created new connection and channel
2015-12-04 18:25:16 INFO  RabbitMQReceiver:66 - onStart, Connecting..
2015-12-04 18:25:16 INFO  RabbitMQReceiver:125 - declaring direct queue
2015-12-04 18:25:16 INFO  ReceiverSupervisorImpl:59 - Called receiver onStart
2015-12-04 18:25:16 INFO  ReceiverSupervisorImpl:59 - Waiting for receiver to be stopped
2015-12-04 18:25:16 INFO  RabbitMQReceiver:88 - RabbitMQ Input waiting for messages
2015-12-04 18:25:16 INFO  RabbitMQReceiver:90 - start consuming data
2015-12-04 18:25:16 INFO  RabbitMQReceiver:94 - waiting for data
2015-12-04 18:25:16 INFO  RabbitMQReceiver:96 - storing data
2015-12-04 18:25:16 ERROR RabbitMQReceiver:85 - Got this unknown exception: java.lang.NoSuchMethodError: com.google.common.util.concurrent.RateLimiter.acquire()V
java.lang.NoSuchMethodError: com.google.common.util.concurrent.RateLimiter.acquire()V
        at org.apache.spark.streaming.receiver.RateLimiter.waitToPush(RateLimiter.scala:42)
        at org.apache.spark.streaming.receiver.BlockGenerator.addData(BlockGenerator.scala:160)
        at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushSingle(ReceiverSupervisorImpl.scala:118)
        at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:119)
        at com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:97)
        at com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69)
2015-12-04 18:25:16 INFO  RabbitMQReceiver:104 - it has been stopped
2015-12-04 18:25:16 WARN  ReceiverSupervisorImpl:92 - Restarting receiver with delay 2000 ms: Trying to connect again
2015-12-04 18:25:16 INFO  ReceiverSupervisorImpl:59 - Stopping receiver with message: Restarting receiver with delay 2000ms: Trying to connect again:
2015-12-04 18:25:16 INFO  RabbitMQReceiver:79 - onStop, doing nothing.. relaxing...
2015-12-04 18:25:16 INFO  ReceiverSupervisorImpl:59 - Called receiver onStop
2015-12-04 18:25:16 INFO  ReceiverSupervisorImpl:59 - Deregistering receiver 0
2015-12-04 18:25:16 ERROR ReceiverTracker:75 - Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again

Any pointers on resolving this issue . ? I have tried using guava 18.0 , guava 19.* jars as well . But the error seems to remain .

build fails for Spark 2.0 due to removal of org.apache.spark.Logging and akka

We were planning to shift to Spark 2.0 along with their dependencies. So, currently we are using this Connector. Which internally imports org.apache.spark.Logging . But in the Spark 2.0 it has been made private. Morover, the support for streaming connectors like Akka also have been removed So, what could be the work around to build it for Spark 2.0

spark streaming checkpoint

Do I have to do checkpoint with RabbitMQUtils.createDistributedStream? If so,How can I do it? Is RabbitMQUtils.createDistributedStream only put in MAIN method?

Random queue name "amq.gen-X678K5...."

Hello,

I'm using the current trunk and I'm having a strange behaviour.

When I run Spark Streaming with the RabbitMQ receiver, it creates a random queuename "amq.gen-X678K5...." and read from it. It doesn't read the one I specified in the queueName ...

Any idea ?

Issue with vHost vs virtualHost

Documentation says:
virtualHosts: RabbitMQ virtual Host
vHost: RabbitMQ vHost

Are they the same ? Cause I lost 1 hour using vHost instead of virtualHost :)

Receive message properties

This receiver only consumes the body portion of the message but we often need to read some extra metadata from a RabbitMQ message like the headers or the routing key. It would be great if we could get a DStream of a RabbitMQMessage with message properties and headers.

I can submit a PR if you want.

Read from a durable queue

Hi, I'm using the 0.1.0_RELEASE version. When I try to read from a durable queue, it throws the exception:
ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.io.IOException

and it is caused by:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'xxxxx' in vhost '/' not equivalent, class-id=50, method-id=10)

In our use case, it is important to keep the queue durable. Is it possible to read from a durable queue with the current release of rabbitmq-receiver?

Thanks

Where to find a full Scala or Java code example like for the previous version ?

Hi,

  • As a simple user I am, the following Java code example line as described from your current README isn't enough :

JavaReceiverInputDStream receiverStream = RabbitMQUtils.createJavaDistributedStream[R]](javaSparkStreamingContext, params, JFunction[Array[Byte], R]);

Can you tell where is it possible to find a deeper example ?

Kind Regards,

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Logging

Hello Everyone,

I'm seeking your expert advice on the issue I'm having... I am receiving the following exception at the run time while creating java stream. Does anyone know how to fix it? Appreciate your advice.

Version of Rabbitmq I am running is 3.6.1
Version of Apache Spark is 2.0.1
Java version is 1.8

2016-11-17 22:40:36 DEBUG ClosureCleaner:58 -  +++ closure <function1> (org.apache.spark.streaming.rabbitmq.RabbitMQUtils$$anonfun$3) is now cleaned +++
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Logging
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.apache.spark.streaming.rabbitmq.RabbitMQUtils$.createStream(RabbitMQUtils.scala:44)
	at org.apache.spark.streaming.rabbitmq.RabbitMQUtils$.createJavaStream(RabbitMQUtils.scala:98)
	at org.apache.spark.streaming.rabbitmq.RabbitMQUtils.createJavaStream(RabbitMQUtils.scala)
	at com.test.Runner.main(Runner.java:49)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 16 more
2016-11-17 22:40:36 INFO  SparkContext:54 - Invoking stop() from shutdown hook

My pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>StreamingTest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.stratio.receiver</groupId>
            <artifactId>spark-rabbitmq_1.6</artifactId>
            <version>0.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.6.5</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.5</version>
        </dependency>
    </dependencies>

</project>

My Java Class

package com.test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils;

import java.util.HashMap;
import java.util.Map;

public class Runner {
    public static void main(String[] args) {
        try {
            JavaStreamingContext javaStreamingContext = new JavaStreamingContext(new SparkConf().setAppName("JavaRabbitMQConsumer").setMaster("local[3]"), new Duration(10000));
            Map<String, String> params = new HashMap<String, String>();
            params.put("hosts", "localhost");
            params.put("queueName", "text-queue");
            params.put("exchangeName", "message-processor-exchange");
            params.put("vHost", "/");
            params.put("userName", "guest");
            params.put("password", "guest");

            Function<byte[], String> messageHandler = new Function<byte[], String>() {

                public String call(byte[] message) {
                    return new String(message);
                }
            };

            // Receiving exception here
            JavaReceiverInputDStream<String> messages =
                    RabbitMQUtils.createJavaStream(javaStreamingContext, String.class, params, messageHandler);

            messages.print();

            javaStreamingContext.start();
            javaStreamingContext.awaitTermination();

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

Please let me know if you need more details

Thanks,

Artem

java.lang.VerifyError: Inconsistent stackmap frames at branch target 152

Environment - Scala 2.11.8, spark 2.1.0 (with spark-core_2.11, spark-streaming_2.11), spark-rabbitmq 0.5.1.

When I run my Spark Streaming job using spark-submit on my standalone spark instance , I get below error -
2017-01-09 12:32:17 ERROR Executor:91 - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.VerifyError: Inconsistent stackmap frames at branch target 152
Exception Details:
Location:
akka/dispatch/Mailbox.processAllSystemMessages()V @152: getstatic
Reason:
Type top (current frame, locals[9]) is not assignable to 'akka/dispatch/sysmsg/SystemMessage' (stack map, locals[9])
Current Frame:
bci: @131
flags: { }
locals: { 'akka/dispatch/Mailbox', 'java/lang/InterruptedException', 'akka/dispatch/sysmsg/SystemMessage', top, 'akka/dispatch/Mailbox', 'java/lang/Throwable', 'java/lang/Throwable' }
stack: { integer }
Stackmap Frame:
bci: @152
flags: { }
locals: { 'akka/dispatch/Mailbox', 'java/lang/InterruptedException', 'akka/dispatch/sysmsg/SystemMessage', top, 'akka/dispatch/Mailbox', 'java/lang/Throwable', 'java/lang/Throwable', top, top, 'akka/dispatch/sysmsg/SystemMessage' }
stack: { }
Bytecode:
0x0000000: 014c 2ab2 0132 b601 35b6 0139 4db2 013e
0x0000010: 2cb6 0142 9900 522a b600 c69a 004b 2c4e
0x0000020: b201 3e2c b601 454d 2db9 0148 0100 2ab6
0x0000030: 0052 2db6 014b b801 0999 000e bb00 e759
0x0000040: 1301 4db7 010f 4cb2 013e 2cb6 0150 99ff
0x0000050: bf2a b600 c69a ffb8 2ab2 0132 b601 35b6
0x0000060: 0139 4da7 ffaa 2ab6 0052 b600 56b6 0154
0x0000070: b601 5a3a 04a7 0091 3a05 1905 3a06 1906
0x0000080: c100 e799 0015 1906 c000 e73a 0719 074c
0x0000090: b200 f63a 08a7 0071 b201 5f19 06b6 0163
0x00000a0: 3a0a 190a b601 6899 0006 1905 bf19 0ab6
0x00000b0: 016c c000 df3a 0b2a b600 52b6 0170 b601
0x00000c0: 76bb 000f 5919 0b2a b600 52b6 017a b601
0x00000d0: 80b6 0186 2ab6 018a bb01 8c59 b701 8e13
0x00000e0: 0190 b601 9419 09b6 0194 1301 96b6 0194
0x00000f0: 190b b601 99b6 0194 b601 9ab7 019d b601
0x0000100: a3b2 00f6 3a08 b201 3e2c b601 4299 0026
0x0000110: 2c3a 09b2 013e 2cb6 0145 4d19 09b9 0148
0x0000120: 0100 1904 2ab6 0052 b601 7a19 09b6 01a7
0x0000130: a7ff d62b c600 09b8 0109 572b bfb1
Exception Handler Table:
bci [290, 307] => handler: 120
Stackmap Table:
append_frame(@13,Object[#231],Object[#177])
append_frame(@71,Object[#177])
chop_frame(@102,1)
full_frame(@120,{Object[#2],Object[#231],Object[#177],Top,Object[#2],Object[#177]},{Object[#223]})
full_frame(@152,{Object[#2],Object[#231],Object[#177],Top,Object[#2],Object[#223],Object[#223],Top,Top,Object[#177]},{})
append_frame(@173,Object[#357])
full_frame(@262,{Object[#2],Object[#231],Object[#177],Top,Object[#2]},{})
same_frame(@307)
same_frame(@317)

at akka.dispatch.Mailboxes.<init>(Mailboxes.scala:33)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:628)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:109)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:100)
at org.apache.spark.streaming.rabbitmq.receiver.RabbitMQReceiver.onStart(RabbitMQInputDStream.scala:57)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:607)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2021)
at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2021)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I see this issue is also reported here -
https://issues.apache.org/jira/browse/SPARK-16978.

I started to get this error when I upgraded from spark-rabbitmq_1.6 to latest version(0.5.1) and spark upgraded from spark-1.6.1 to spark-2.1.0

Below is my pom.xml file and if I remove the guava shade plugin, Streaming job runs fine. But I need this plugin.

<properties>
    <slf4j.version>1.7.7</slf4j.version>
    <log4j.version>1.2.17</log4j.version>
    <mapr.hbase.version>5.0.0-mapr</mapr.hbase.version>
    <guava.version>19.0</guava.version>
</properties>
....
<dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>${guava.version}</version>
 </dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.1.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
   <groupId>com.stratio.receiver</groupId>
   <artifactId>spark-rabbitmq</artifactId>
   <version>0.5.1</version>
 </dependency>
 </dependencies>
<build>
.....
.....
<plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.4.3</version>
        <executions>
            <execution>
                <phase>package</phase>
                <goals>
                    <goal>shade</goal>
                </goals>
                <configuration>
  <relocations>
  <relocation>
    <pattern>com.google</pattern>
    <shadedPattern>shadeio</shadedPattern>
   <includes>
      <include>com.google.**</include>
  </includes>
  </relocation>
</relocations>
<transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                    </transformers>

Has anyone hit this issue before. If yes, how did you solve it.

Thanks
Akhila.

Spark-RabbitMQ V4 is not loaded in maven repository.

Hi,
Spark-RabbitMQ V4 is not loaded in maven repository.

What are the operations required to use version 4 ?

I add the next dependency in my pom.xml:

<dependency>
  <groupId>com.stratio.receiver</groupId>
  <artifactId>spark-rabbitmq</artifactId>
  <version>LATEST</version>
</dependency>

I tried a gitclone , a build that added the jar in my local repository. In my java project maven tells me it is not com.stration so I tried achieve a mvn install : install -file . The mvn clean installseems to understand the operation but the declaration of import eclispe com.stratio.receiver.RabbitMQUtils does not work: "do not resolved "

Merci !

Distributed Approach: 3% of messages lost

Hello guys,

When we have less than 5.000 messages / minutes, it works fine ...
But as soon as the traffic increase, I got some message lost (~3%)

Description

  1. I pushed 40.000 messages in the queue.
  2. The RabbitMQ receiver (Distributed Approach) received the 40.000 messages verified by summing the 9 log lines corresponding to
    https://github.com/Stratio/Spark-RabbitMQ/blob/8d485890e7e0ea6ee03faddfd1ad80e10840af14/src/main/scala/org/apache/spark/streaming/rabbitmq/distributed/RabbitMQRDD.scala#L217
  3. but than, if I count the number of message in my RDD, rabbitMQStream.count() I got only 39.843 messages ...
    No error in the logs ! I have no idea where my missing lines are :/

I tried to play with all the options (ackType, fairDispatch, prefetchCount, maxReceiveTime, maxRate, levelParallelism, maxMessagesPerPartition & backpressure). None fixed the problem.

Has anyone encountered the same error and how did you fix it ?

BTW, the old Receiver-based Approach works fine !!!
Duplicate of #57 ?

Unable to read from a queue with arguments set

Hi.

It seem the Stream can not read from a queue with x-max-length set.

The error I'm getting is :

ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again

If I do not set the x-max-length of the queue. Everything works well.

Actually we tried to set "x-message-ttl", and the same error occurs.

Everything works when we do not set any arguments when creating the (durable) queue.

Problem receiving an Array(Byte) using Scala API

Using this line of code the receiver works fine:
val receiverStream = RabbitMQUtils.createStream[String](ssc, rabbitParams)

But when I try to receive a Array(Byte) with the following code:
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
I get the following message:
ambiguous reference to overloaded definition, both method createStream in object RabbitMQUtils of type [R >: String <: String](ssc: org.apache.spark.streaming.StreamingContext, params: Map[String,String])(implicit evidence$2: Manifest[R])org.apache.spark.streaming.dstream.ReceiverInputDStream[String] and method createStream in object RabbitMQUtils of type (ssc: org.apache.spark.streaming.StreamingContext, params: Map[String,String])org.apache.spark.streaming.dstream.ReceiverInputDStream[Array[Byte]] match argument types (org.apache.spark.streaming.StreamingContext,scala.collection.immutable.Map[String,String])

object stratio is not a member of package com

Versions:- Spark: 1.6.0 | Scala: 2.10.5
spark-shell --packages com.stratio.receiver:spark-rabbitmq_1.6:0.3.0

Action:

scala> import com.stratio.receiver.RabbitMQUtils
<console>:25: error: object stratio is not a member of package com
         import com.stratio.receiver.RabbitMQUtils
                    ^
scala>

I've tried clenaing sbt caches/jars etc. Even when I use sbt to build the manage dependency, it throws same error.

build.sbt file:

name := "rabbit_spark_cassandra"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-sql"  % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.6.0"
libraryDependencies += "org.apache.cassandra" % "cassandra-all" % "3.5"
libraryDependencies += "com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.5.0-M2"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
libraryDependencies += "com.stratio.receiver" % "spark-rabbitmq_1.6" % "0.3.0"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

Error while executing sbt package:

[info]  [SUCCESSFUL ] com.stratio.receiver#spark-rabbitmq_1.6;0.3.0!spark-rabbitmq_1.6.jar (21988ms)
[info] Done updating.
[info] Compiling 2 Scala sources to /Users/rootcss/<path>/target/scala-2.10/classes...
[error] /Users/rootcss/<path>/src/main/scala/RabbitmqSpark.scala:3: object stratio is not a member of package com
[error] import com.stratio.receiver.RabbitMQUtils
[error]            ^
[error] /Users/rootcss/<path>/src/main/scala/RabbitmqSpark.scala:24: not found: value RabbitMQUtils
[error]     val receiverStream = RabbitMQUtils.createStream(ssc, Map(
[error]                          ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 59 s, completed 20 Jul, 2016 12:07:12 PM

Not able to read from a queue using Spark Streaming

I am trying to write a simple "Hello World" kind of application using spark streaming and RabbitMq, in which Apache Spark Streaming will read message from RabbitMq and print it in the console.

The message is sent to the rabbitmq via the simple code below:-

package helloWorld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

  private final static String QUEUE_NAME = "hello1";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World! is a code. Hi Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
}

I am trying to read messages via Apache Streaming as shown below:-

package rabbitmq.example;

import java.util.*;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.stratio.receiver.RabbitMQUtils;

public class RabbitMqEx {

    public static void main(String[] args) {
        System.out.println("Creating    Spark   Configuration");
        SparkConf conf = new SparkConf();
        conf.setAppName("RabbitMq Receiver Example");
        conf.setMaster("local[2]");

        System.out.println("Retreiving  Streaming   Context from    Spark   Conf");
JavaStreamingContext streamCtx = new JavaStreamingContext(conf,
                Durations.seconds(2));

         Map<String, String>rabbitMqConParams = new HashMap<String, String>();
         rabbitMqConParams.put("host", "localhost");
         rabbitMqConParams.put("queueName", "hello1");
        System.out.println("Trying to connect to RabbitMq");
         JavaReceiverInputDStream<String> receiverStream =
         RabbitMQUtils.createJavaStream(streamCtx, rabbitMqConParams);      
        receiverStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
            @Override
            public Void call(JavaRDD<String> arg0) throws Exception {
                System.out.println("Value Received " + arg0.count());
                return null;
            }
        });
        streamCtx.start();
        streamCtx.awaitTermination();
    }
}

But it is not able to read anything from the queue. The console output looks like the below:-

15/11/18 13:20:45 INFO SparkContext: Running Spark version 1.5.2
15/11/18 13:20:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/11/18 13:20:45 WARN Utils: Your hostname, jabong1143 resolves to a loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface wlan0)
15/11/18 13:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/11/18 13:20:45 INFO SecurityManager: Changing view acls to: jabong
15/11/18 13:20:45 INFO SecurityManager: Changing modify acls to: jabong
15/11/18 13:20:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jabong); users with modify permissions: Set(jabong)
15/11/18 13:20:46 INFO Slf4jLogger: Slf4jLogger started
15/11/18 13:20:46 INFO Remoting: Starting remoting
15/11/18 13:20:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42978]
15/11/18 13:20:46 INFO Utils: Successfully started service 'sparkDriver' on port 42978.
15/11/18 13:20:46 INFO SparkEnv: Registering MapOutputTracker
15/11/18 13:20:46 INFO SparkEnv: Registering BlockManagerMaster
15/11/18 13:20:46 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-9309b35f-a506-49dc-91ab-5c340cd3bdd1
15/11/18 13:20:46 INFO MemoryStore: MemoryStore started with capacity 947.7 MB
15/11/18 13:20:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-736f4b9c-764c-4b85-9b37-1cece102c95a/httpd-29196fa0-eb3f-4b7d-97ad-35c5325b09e5
15/11/18 13:20:46 INFO HttpServer: Starting HTTP Server
15/11/18 13:20:46 INFO Utils: Successfully started service 'HTTP file server' on port 37150.
15/11/18 13:20:46 INFO SparkEnv: Registering OutputCommitCoordinator
15/11/18 13:20:52 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/11/18 13:20:52 INFO SparkUI: Started SparkUI at http://192.168.1.3:4040
15/11/18 13:20:52 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
15/11/18 13:20:52 INFO Executor: Starting executor ID driver on host localhost
15/11/18 13:20:52 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 47306.
15/11/18 13:20:52 INFO NettyBlockTransferService: Server created on 47306
15/11/18 13:20:52 INFO BlockManagerMaster: Trying to register BlockManager
15/11/18 13:20:52 INFO BlockManagerMasterEndpoint: Registering block manager localhost:47306 with 947.7 MB RAM, BlockManagerId(driver, localhost, 47306)
15/11/18 13:20:52 INFO BlockManagerMaster: Registered BlockManager
Trying to connect to RabbitMq
15/11/18 13:20:53 INFO ReceiverTracker: Starting 1 receivers
15/11/18 13:20:53 INFO ReceiverTracker: ReceiverTracker started
15/11/18 13:20:53 INFO ForEachDStream: metadataCleanupDelay = -1
15/11/18 13:20:53 INFO RabbitMQInputDStream: metadataCleanupDelay = -1
15/11/18 13:20:53 INFO RabbitMQInputDStream: Slide time = 2000 ms
15/11/18 13:20:53 INFO RabbitMQInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/11/18 13:20:53 INFO RabbitMQInputDStream: Checkpoint interval = null
15/11/18 13:20:53 INFO RabbitMQInputDStream: Remember duration = 2000 ms
15/11/18 13:20:53 INFO RabbitMQInputDStream: Initialized and validated com.stratio.receiver.RabbitMQInputDStream@5d00adc2
15/11/18 13:20:53 INFO ForEachDStream: Slide time = 2000 ms
15/11/18 13:20:53 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/11/18 13:20:53 INFO ForEachDStream: Checkpoint interval = null
15/11/18 13:20:53 INFO ForEachDStream: Remember duration = 2000 ms
15/11/18 13:20:53 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@4c132773
15/11/18 13:20:53 INFO RecurringTimer: Started timer for JobGenerator at time 1447833054000
15/11/18 13:20:53 INFO JobGenerator: Started JobGenerator at 1447833054000 ms
15/11/18 13:20:53 INFO JobScheduler: Started JobScheduler
15/11/18 13:20:53 INFO StreamingContext: StreamingContext started
15/11/18 13:20:53 INFO DAGScheduler: Got job 0 (start at RabbitMqEx.java:38) with 1 output partitions
15/11/18 13:20:53 INFO DAGScheduler: Final stage: ResultStage 0(start at RabbitMqEx.java:38)
15/11/18 13:20:53 INFO ReceiverTracker: Receiver 0 started
15/11/18 13:20:53 INFO DAGScheduler: Parents of final stage: List()
15/11/18 13:20:53 INFO DAGScheduler: Missing parents: List()
15/11/18 13:20:53 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556), which has no missing parents
15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(46496) called with curMem=0, maxMem=993735475
15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 45.4 KB, free 947.7 MB)
15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(15206) called with curMem=46496, maxMem=993735475
15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 14.8 KB, free 947.6 MB)
15/11/18 13:20:53 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:47306 (size: 14.8 KB, free: 947.7 MB)
15/11/18 13:20:53 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
15/11/18 13:20:53 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556)
15/11/18 13:20:53 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/11/18 13:20:53 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, NODE_LOCAL, 2729 bytes)
15/11/18 13:20:53 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/11/18 13:20:53 INFO RecurringTimer: Started timer for BlockGenerator at time 1447833053800
15/11/18 13:20:53 INFO BlockGenerator: Started BlockGenerator
15/11/18 13:20:53 INFO BlockGenerator: Started block pushing thread
15/11/18 13:20:53 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.1.3:42978
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Starting receiver
15/11/18 13:20:53 INFO RabbitMQReceiver: Rabbit host addresses are :localhost
15/11/18 13:20:53 INFO RabbitMQReceiver: Address localhost
15/11/18 13:20:53 INFO RabbitMQReceiver: creating new connection and channel
15/11/18 13:20:53 INFO RabbitMQReceiver: No virtual host configured
15/11/18 13:20:53 INFO RabbitMQReceiver: created new connection and channel
15/11/18 13:20:53 INFO RabbitMQReceiver: onStart, Connecting..
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStart
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
15/11/18 13:20:53 INFO RabbitMQReceiver: declaring direct queue
15/11/18 13:20:53 ERROR RabbitMQReceiver: Got this unknown exception: java.io.IOException
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
    at com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126)
    at com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86)
    at com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 5 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
    at java.lang.Thread.run(Thread.java:745)
15/11/18 13:20:53 INFO RabbitMQReceiver: it has been stopped
15/11/18 13:20:53 ERROR RabbitMQReceiver: error on close channel, ignoring
15/11/18 13:20:53 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Trying to connect again
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Trying to connect again: 
15/11/18 13:20:53 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing...
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStop
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Deregistering receiver 0
15/11/18 13:20:53 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again
15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopped receiver 0
15/11/18 13:20:54 INFO JobScheduler: Added jobs for time 1447833054000 ms
15/11/18 13:20:54 INFO JobScheduler: Starting job streaming job 1447833054000 ms.0 from job set of time 1447833054000 ms
Value Received BlockRDD[1] at ReceiverInputDStream at RabbitMQInputDStream.scala:33
15/11/18 13:20:54 INFO JobScheduler: Finished job streaming job 1447833054000 ms.0 from job set of time 1447833054000 ms
15/11/18 13:20:54 INFO JobScheduler: Total delay: 0.031 s for time 1447833054000 ms (execution: 0.007 s)
15/11/18 13:20:54 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/11/18 13:20:54 INFO InputInfoTracker: remove old batch metadata: 
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver again
15/11/18 13:20:55 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.1.3:42978
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver
15/11/18 13:20:55 INFO RabbitMQReceiver: Rabbit host addresses are :localhost
15/11/18 13:20:55 INFO RabbitMQReceiver: Address localhost
15/11/18 13:20:55 INFO RabbitMQReceiver: creating new connection and channel
15/11/18 13:20:55 INFO RabbitMQReceiver: No virtual host configured
15/11/18 13:20:55 INFO RabbitMQReceiver: created new connection and channel
15/11/18 13:20:55 INFO RabbitMQReceiver: onStart, Connecting..
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStart
15/11/18 13:20:55 INFO RabbitMQReceiver: declaring direct queue
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Receiver started again
15/11/18 13:20:55 ERROR RabbitMQReceiver: Got this unknown exception: java.io.IOException
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
    at com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126)
    at com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86)
    at com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 5 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
    at java.lang.Thread.run(Thread.java:745)
15/11/18 13:20:55 INFO RabbitMQReceiver: it has been stopped
15/11/18 13:20:55 ERROR RabbitMQReceiver: error on close channel, ignoring
15/11/18 13:20:55 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Trying to connect again
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Trying to connect again: 
15/11/18 13:20:55 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing...
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStop
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Deregistering receiver 0
15/11/18 13:20:55 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again
15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopped receiver 0
15/11/18 13:20:56 INFO JobScheduler: Added jobs for time 1447833056000 ms
15/11/18 13:20:56 INFO JobScheduler: Starting job streaming job 1447833056000 ms.0 from job set of time 1447833056000 ms

Doing list_queues list the following:-

sudo rabbitmqctl list_queues
Listing queues ...
hello1  2

However I can read from the queue using a simple java receiver as mentioned here.

Environment

  • RabbitMq Version - 3.5.6
  • Spark 1.5.2
  • Java 8 (Update 66)

Only Strings are supported

Current;y, only String events are supported due to
store(new String(delivery.getBody)) // in RabbitMQInputDStream.scala
it's better to provide generic ByteArray wrapper to give end user a chance to identify object type.
Because of, non string objects can't be transferred now

Missing data

Hello,

i use this rabbitmq receivers but i have some problems. I used a java project ( spring-amqp ) for publish somes Avro messages in RabbitMq and consume it with scala-spark project.

Let me take an example with simple 'String' message but in my real case i publish avro messages and decode it and save it on HDFS in Parquet Format with PairRddFunction and i lose ~1% of my data .

@Autowired
private RabbitTemplate rabbitTemplate;
...

for (int i = 0; i < 2000000; i++) {
rabbitTemplate.convertAndSend(exchangeSessionActivity, routingKeySessionActivity, RandomStringUtils.randomAlphanumeric(255));
}

and i use distributed receivers to read it transform it and write it on Parquet format in HDFS.

def main(args: Array[String]): Unit = {
    val appName: String = getClass.getSimpleName

    val sparkConf = new SparkConf()
      .setAppName(appName)
      .setIfMissing(SparkConstant.CONF_SPARK_MASTER, settings.sparkMaster)

    // Create the context with a 10 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(10))


    val rabbitMQConnection = Map(
      "hosts" -> "192.168.152.130",
      "queueName" -> "test",
      ConfigParameters.VirtualHostKey -> "/decisionnel",
      "userName" -> "admin",
      "password" -> "password"
    )

    val distributedKeysSa = Seq(
      RabbitMQDistributedKey(
        "test",
        new ExchangeAndRouting(),
        rabbitMQConnection
      )
    )

    val receiverStream = RabbitMQUtils.createDistributedStream[String](ssc, distributedKeysSa, rabbitMQConnection)


    val totalEvents = ssc.sparkContext.accumulator(0L, "Number of events received")
    val totalEventsPair = ssc.sparkContext.accumulator(0L, "Number of events received")

    receiverStream.foreachRDD((rdd, ms) => {
      val pairRdd = rdd.map((null, _)) // Transform RDD to RDD[(K, V)] for have acces of PairRDDFunctions
          // .saveAsHadoopFile(....) save it on HDFS

      val rddCount = rdd.count()
      val pairRddCount = pairRdd.count()

      totalEvents += rddCount
      totalEventsPair += pairRddCount

      println("\n ---------------->")

      println(s"TOTAL EVENTS : \t $totalEvents")
      println(s"TOTAL EVENTS Pair: \t $totalEventsPair")

      if (rddCount != pairRddCount) {
        println("Missing Data")
      }

      println("\n <----------------")
    })

    ssc.start()
    ssc.awaitTermination()
  }

If you execute this code, sometime you will see the log "Missing Data" without any reason or warn or something like that (cf misssing-data.png 67156 instead of 67157).
missing-data

For me totalEvents it's ok but totalEventsPair miss some data

I'm looking for this problems during 3 days but i didn't find any solutions.

For your information, i try to activate Write Ahead Logs, Checkpointing, Back-pressure ...
I use

  • Spark 1.6.1
  • Scala 2.10.5
  • com.stratio.receiver.spark-rabbitmq_1.6 v0.3.0
  • RabbitMq 3.6.0, Erlang R16B03

Tx for your work ๐Ÿ‘

Error while creating RabbitMQReceiver

I'm attempting to consume messages from an existing RabbitMQ queue, and am getting the following error messages. Does anybody know what this may indicate?

15/08/14 12:54:07 ERROR RabbitMQReceiver: Got this unknown exception: java.util.NoSuchElementException: None.get
15/08/14 12:54:07 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again

My Spark Streaming app is pretty basic; the code for this is below.

def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("streaming-sample-app")
      .setIfMissing("spark.master", "local[*]")
    val ssc = new StreamingContext(conf, Seconds(3))

    val receiverStream = RabbitMQUtils.createStream(ssc, Map(
      "host" -> "dev.company.com",
      "queueName" -> "messaging",
      "vHost" -> "logging",
      "username" -> "username",
      "password" -> "password"
    ))

    receiverStream.start()
    receiverStream.foreachRDD(r => println(r.count()))

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
  }

Thanks in advance for any assistance - once I get this working I will submit a pull request with an example usage.

Thanks,
Kevin

Problem with dependencies

I get the error object stratio is not a member of package com

I don't understand what is wrong.

Here is my build.sbt

name := "spark-demo"
version := "0.0.1"
scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.6.0",
  "org.apache.spark" % "spark-streaming_2.11" % "1.6.0"
  "com.stratio.receiver" % "spark-rabbitmq" % "0.2.0",
)

Here is my app

package app

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.stratio.receiver.RabbitMQUtils

object Main {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("WordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint(".")

    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world",2)))

    val rabbitParams = Map(
      "host" -> "localhost",
      "queueName" -> "some-queue",
      "exchangeName" -> "amqp.topic",
      "exchangeType" -> "topic",
      "routingKeys" -> "some-topic",
      "username" -> "guest",
      "password" -> "guest"
    )

    val lines = RabbitMQUtils.createStream(ssc, rabbitParams)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
      val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
      state.update(sum)
      (word, sum)
    }

    val stateDstream = wordDstream.mapWithState(
      StateSpec.function(mappingFunc).initialState(initialRDD))

    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Version related question

I am on Spark 1.6.0 (Scala 2.10.5).
What version of spark-rabbitmq do I need? - Want to use the distributed receiver

I tried spark-rabbitmq_1.6:0.3.0 and get the "java.lang.AbstractMethodError". Do let me know if I am using the right version of spark-rabbitmq, so I can rule out version issues.

Thanks in advance.

Spark Python API Support

Hello,

I was wondering if its on the roadmap to integrate the Python API as well, or will you only continue supporting Scala and Java?

Best Regards,
Mohannad

Maven Repository is outdated

Is there a date for the next planned release of this component? It would be nice to have a new version uploaded to the Maven repository for reference in a consumer's pom.xml file.

If you'd like me to create a new version as a pull request, I'd be happy to do so (just let me know).

Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.