filodb / filodb Goto Github PK
View Code? Open in Web Editor NEWDistributed Prometheus time series database
License: Apache License 2.0
Distributed Prometheus time series database
License: Apache License 2.0
Hi, I am testing the update existing data in FiloDB. I understand there is version concept in it, but by default, every push will overwrite the version 0 by default, so I expect the new data will OverWrite the old data.
I am testing this on one Linux box, running DSE 4.8.5 (with C* 2.1.13) and Spark 1.5.2 with FiloDB 0.2.1-master trunk. I am generating 1M emails, using the first 2 characters as the partition key, domain of the email as segment key, and unique email address as the row key.
Starting Spark 1.5.2, disabling Tungsten sort, and running it as local
[yzhang@yzhang-linux data]$ ~/spark/bin/spark-shell --conf spark.sql.tungsten.enabled=false --conf spark.sql.shuffle.partitions=1 --jars /home/yzhang/source/FiloDB/spark/target/scala-2.10/filodb-spark-assembly-0.2.1-SNAPSHOT.jar,/home/yzhang/lib/commons-csv-1.2.jar,/home/yzhang/lib/spark-csv_2.10-1.4.0.jar --driver-memory 3G --master local[2]
Load the CSV file, and save them into the FiloDB
scala>val csvDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/home/yzhang/data/poc2_1m.csv")
val df = csvDF.withColumn("PartId", substring(
df: org.apache.spark.sql.DataFrame = [Email: string, Domain: string, GlobalStop: int, RestrictedAddress: int, RestrictedDomain: int, RoleAddress: int, Active: int]
scala> df.write.format("filodb.spark").option("dataset", "poc2").option("row_keys", "Email").option("segment_key", "Domain").option("partition_keys", ":stringPrefix Email 2").mode(SaveMode.Overwrite).save()
The data saved without any problem, and I can query it.
scala> val poc2 = sqlContext.read.format("filodb.spark").option("dataset", "poc2").load()
poc2: org.apache.spark.sql.DataFrame = [RestrictedDomain: int, RestrictedAddress: int, Email: string, GlobalStop: int, RoleAddress: int, Domain: string, Active: int]
scala> poc2.count
res1: Long = 1000000
scala> poc2.registerTempTable("poc2")
scala> sqlContext.sql("select count(*) from poc2 where GlobalStop = 1").show()
+------+
| _c0|
+------+
|500034|
+------+
Now I use the same data, but I overwrite the GlobalStop field all to "1", and tried to save again:
scala>val csvDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/home/yzhang/data/poc2_1m.csv")
scala> def udfSetOne=udf((input:Int) => 1)
scala> val updatedDF = csvDF.withColumn("newGlobalStop", udfSetOne(csvDF("GlobalStop"))).drop("GlobalStop").withColumnRenamed("newGlobalStop","GlobalStop").withColumn("PartId", substring(
scala> updatedDF.printSchema
root
|-- Email: string (nullable = true)
|-- Domain: string (nullable = true)
|-- RestrictedAddress: integer (nullable = true)
|-- RestrictedDomain: integer (nullable = true)
|-- RoleAddress: integer (nullable = true)
|-- Active: integer (nullable = true)
|-- GlobalStop: integer (nullable = true)
So what I did is to read the same csv file again, but this time, I use a UDF to create a new column named "newGlobalStop" with value to 1, drop old "globalStop", and rename "newGlobalStop" to "globalStop"
So I created a new Dateframe, same schema as before, but with "globalStop" column with value as 1 ONLY.
scala> updatedDF.registerTempTable("df")
scala> sqlContext.sql("select count(*) from df where GlobalStop = 1").show()
+-------+
| _c0|
+-------+
|1000000|
+-------+
The above proves the new DF with only 1 as GlobalStop value
Now write them to FiloDB again, to the same dataset:
scala> updatedDF.write.format("filodb.spark").option("dataset", "poc2").option("row_keys", "Email").option("segment_key", "Domain").option("partition_keys", ":stringPrefix Email 2").mode(SaveMode.Overwrite).save()
So far, I didn't get any error, so I use a new dataset overwrite the old one, with 1 as only value in one field.
But when I query "poc2" dataset again from FiloDB, I didn't get the new value as I expected:
scala> val poc2 = sqlContext.read.format("filodb.spark").option("dataset", "poc2").load()
poc2: org.apache.spark.sql.DataFrame = [RestrictedDomain: int, RestrictedAddress: int, Email: string, GlobalStop: int, RoleAddress: int, Domain: string, Active: int]
scala> poc2.registerTempTable("poc2")
scala> sqlContext.sql("select count(*) from poc2 where GlobalStop = 1").show()
+------+
| _c0|
+------+
|500034|
+------+
It looks like my overwrite of 2nd time just being ignored.
Did I do something wrong, or I misunderstand how the data can be updated in FiloDB?
Thanks
Yong
I am trying to test FiloDB 0.2 with a 6 nodes C* (DSE 4.8.5) cluster, running Spark 1.5.2.
The 100k sample data coming with FiloDB works fine, but when I tried to load 50M data of our use case, with a dataset I come out of POC, I got the following error message when trying to do this step:
newDF.write.format("filodb.spark").option("dataset", "poc1").option("row_keys", "Email").option("segment_key","Domain").option("partition_keys","Partition").mode(SaveMode.Overwrite).save()
java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at filodb.spark.FiloRelation$.parse(FiloRelation.scala:36)
at filodb.spark.FiloRelation$.actorAsk(FiloRelation.scala:42)
at filodb.spark.package$FiloContext.createNewDataset(package.scala:165)
at filodb.spark.package$FiloContext.createOrUpdateDataset(package.scala:216)
at filodb.spark.package$FiloContext.saveAsFilo(package.scala:268)
at filodb.spark.DefaultSource.createRelation(DefaultSource.scala:53)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at
at
at
at
at
at
at
at $iwC.(:42)
at (:44)
at .(:48)
at .()
at .(:7)
at .()
I am not sure this related to our C* settings, but it looks like a timeout from the FiloDB, but I don't see any configuration in the application.conf can adjust it.
What setting I should change for this?
Thanks
From Gitter channel:
i found out some interesting things:
First: i used filo-cli to import data, check filo-cli.log i found:
[2015-11-06 19:47:03,113] INFO f.c.DatasetCoordinatorActor [filo-cli-akka.actor.default-dispatcher-9] - MemTable active table rows: 44999
[2015-11-06 19:47:03,113] INFO f.c.DatasetCoordinatorActor [filo-cli-akka.actor.default-dispatcher-9] - MemTable flushing rows: 55000
Then: i use spark-shell to check how many records in dataset gdelt and got 55000 records. It equals with 55k records was flushed by memTable. It seem memTable belongs to filo-cli process and it doesn't flush active table rows on exit. So we lost 45k records.
I'll bet it's more of a timing problem. RowSource sends Flush
message to NodeCoordinatorActor
, but it may not flush everything. The CLI is not persistent, and quits before everything is flushed. Instead, let's implement a FinishIngestion
message, which flushes everything at the end, including the active table....
Dependency for PR against the Spark repo for a FilodbInputDStream
Currently, each segment has some fixed length, and it is hard wired and not configurable. This obviously won't work for generic data ingestion. For example, for time series ingestion you want to be able to fit a certain number of points per segment. Or, maybe what you want is for each segment to correspond to a certain deviceId, or subpart of the deviceId time series. So, this has to be
a. Configurable, or
b. better yet, automatically determined.
I am trying to follow the steps to try FiloDB. filo-cli init succeeded.
I am stuck in next step. The error looks more Cassandra specific. But any config in spark to fix this problem?
scala> csvDF.write.format("filodb.spark").
| option("dataset", "gdelt").
| option("sort_column", "GLOBALEVENTID").
| mode(SaveMode.Overwrite).save()
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_4_piece0 on localhost:36922 in memory (size: 19.3 KB, free: 534.5 MB)
16/01/03 11:16:14 INFO package$: Truncating dataset gdelt
16/01/03 11:16:14 INFO package$: Creating dataset Dataset(gdelt,List(Projection(0,gdelt,GLOBALEVENTID,false,List(),10000)),_partition,{"chunkSize":1000,"segmentSize":"10000"})...
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_5_piece0 on localhost:36922 in memory (size: 1842.0 B, free: 534.5 MB)
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_6_piece0 on localhost:36922 in memory (size: 19.3 KB, free: 534.5 MB)
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_7_piece0 on localhost:36922 in memory (size: 1834.0 B, free: 534.5 MB)
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_8_piece0 on localhost:36922 in memory (size: 19.3 KB, free: 534.5 MB)
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_9_piece0 on localhost:36922 in memory (size: 3.7 KB, free: 534.5 MB)
16/01/03 11:16:14 INFO package$: Dataset gdelt created successfully...
16/01/03 11:16:14 INFO package$: Read schema for dataset gdelt = Map(ActionGeo_CountryCode -> Column(ActionGeo_CountryCode,gdelt,0,StringColumn,FiloSerializer,false,false), Actor1Geo_FullName -> Column(Actor1Geo_FullName,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Name -> Column(Actor2Name,gdelt,0,StringColumn,FiloSerializer,false,false), ActionGeo_ADM1Code -> Column(ActionGeo_ADM1Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2CountryCode -> Column(Actor2CountryCode,gdelt,0,StringColumn,FiloSerializer,false,false), Actor1Name -> Column(Actor1Name,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Religion2Code -> Column(Actor2Religion2Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor1KnownGroupCode -> Column(Actor1KnownGroupCode,gdelt,0,StringColumn,FiloSerializer,false,false), ActionGeo_Type -> Column(ActionGeo_Type,gdelt,0,IntColumn,FiloSerializer,false,false), Actor1Geo_Type -> Column(Actor1Geo_Type,gdelt,0,IntColumn,FiloSerializer,false,false), Actor1Religion1Code -> Column(Actor1Religion1Code,gdelt,0,StringColumn,FiloSerializer,false,false), EventCode -> Column(EventCode,gdelt,0,IntColumn,FiloSerializer,false,false), Actor1Geo_Long -> Column(Actor1Geo_Long,gdelt,0,DoubleColumn,FiloSerializer,false,false), Actor1EthnicCode -> Column(Actor1EthnicCode,gdelt,0,StringColumn,FiloSerializer,false,false), GLOBALEVENTID -> Column(GLOBALEVENTID,gdelt,0,IntColumn,FiloSerializer,false,false), NumMentions -> Column(NumMentions,gdelt,0,IntColumn,FiloSerializer,false,false), Actor2KnownGroupCode -> Column(Actor2KnownGroupCode,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Code -> Column(Actor2Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Geo_Lat -> Column(Actor2Geo_Lat,gdelt,0,DoubleColumn,FiloSerializer,false,false), FractionDate -> Column(FractionDate,gdelt,0,DoubleColumn,FiloSerializer,false,false), Actor2Geo_Long -> Column(Actor2Geo_Long,gdelt,0,DoubleColumn,FiloSerializer,false,false), Actor1Code -> Column(Actor1Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor1Type2Code -> Column(Actor1Type2Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Geo_FullName -> Column(Actor2Geo_FullName,gdelt,0,StringColumn,FiloSerializer,false,false))
16/01/03 11:16:14 INFO package$: Matching columns - Set(ActionGeo_CountryCode, Actor1Geo_FullName, Actor2Name, ActionGeo_ADM1Code, Actor2CountryCode, Actor1Name, Actor2Religion2Code, Actor1KnownGroupCode, ActionGeo_Type, Actor1Geo_Type, Actor1Religion1Code, EventCode, Actor1Geo_Long, Actor1EthnicCode, GLOBALEVENTID, NumMentions, Actor2KnownGroupCode, Actor2Code, Actor2Geo_Lat, FractionDate, Actor2Geo_Long, Actor1Code, Actor1Type2Code, Actor2Geo_FullName)
Missing columns - Set(DATEADDED, Actor2Geo_Type, IsRootEvent, Actor2EthnicCode, Actor2Type2Code, Actor2Type1Code, NumSources, AvgTone, Actor1Type3Code, Actor1Geo_Lat, NumArticles, ActionGeo_FullName, Year, Actor1Geo_FeatureID, ActionGeo_Long, Actor2Geo_ADM1Code, GoldsteinScale, Actor2Geo_CountryCode, Actor1Geo_ADM1Code, Actor1Religion2Code, EventBaseCode, MonthYear, ActionGeo_Lat, ActionGeo_FeatureID, Actor2Religion1Code, QuadClass, Actor1Type1Code, Actor2Type3Code, Actor1Geo_CountryCode, EventRootCode, _partition, SQLDATE, Actor1CountryCode, Actor2Geo_FeatureID)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
filodb.core.StorageEngineException: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
at filodb.cassandra.Util$ResultSetToResponse$$anonfun$toResponse$1.applyOrElse(Util.scala:19)
at filodb.cassandra.Util$ResultSetToResponse$$anonfun$toResponse$1.applyOrElse(Util.scala:18)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:59)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:117)
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:120)
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186)
at com.datastax.driver.core.RequestHandler.access$2300(RequestHandler.java:45)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:754)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:590)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:994)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:916)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.epoll.EpollSocketChannel$EpollSocketUnsafe.epollInReady(EpollSocketChannel.java:722)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:326)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more
Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:62)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:40)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:243)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:225)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
... 11 more
Today FiloDB only supports sort key from a single column. Let's support multiple component sort keys. This should be relatively contained - at least the core changes, yet be a good introduction to different parts of the code. Change list that comes to mind:
TwoColumnSortKeyHelper extends SortKeyHelper[(A, B)]
ThreeColumnSortKeyHelper
etc.MergeStrategy
, Projection
to support multiple sort keys. It should be a Seq[String]
, and the earlier columns listed take priority.Allow to configure authentication credentials for Cassandra and use them for connection
Is there any link\document, where I can find how to set up steps?
Right now the MemTable detects OOM using a simple sys.runtime.freeMemory heuristic, but this does not protect against DirectMemory out of memory errors. Need to find a better way to do this.
There are plenty of Spark apps that could benefit from an in-memory store that can update and filter and scan quickly, with columnar projections (plus have a backup to Cassandra or other permanent store). For example, Spark Streaming apps often need to join data. This ticket is about making InMemoryColumnStore truly useful.
See these Spark 1.6.0 tickets that might be relevant:
https://issues.apache.org/jira/browse/SPARK-11410
https://issues.apache.org/jira/browse/SPARK-4849
If the two RDDs being joined have partitioners that evaluate as equal, then a 1 to 1 join is enabled instead of a shuffle join. This is something to take advantage of.
Reprojection task (gdelt,0) succeeded: Stream(Success, ?)
. Add more info, like the keyRanges that just got flushed.DatasetCoordinatorActor: handleFailed: add retries for reprojections that fail.
Bonus: Add some state to the reprojector, or delete rows that succeeded, such that the reprojection won't have to write out segments that did successfully write. The only problem with deleting rows is that if there are multiple reprojections off the same memtable, we don't want to affect other reproductions. But we can think about that later.
Drop table option is not available right now. Also getting an exception when trying to recreate table if there is data in it. Recreate table works when there is no data in that table.
./filo-cli-0.2-SNAPSHOT -Dconfig.file=./filodb-element.conf --command create --dataset billing_fact_native2 --columns ge_fiscal_period_name:string,corp_code:string,fleet_number:string,consolidated_billing_source_cd:string,security_level:string,charge_type_desc:string,charge_cd:string,asset_id:string,billing_invoice_id:string,billing_invoice_line_num:string,asset_class_cd:string,asset_make_desc:string,asset_start_dt:string,bill_level2_id_desc:string,bill_level3_id_desc:string,bill_level4_id_desc:string,bill_level5_id_desc:string,bill_level6_id_desc:string,billing_period_id:string,billing_structure_party_id:string,billing_structure_start_dt:string,charge_desc:string,cust_inv_odom_reading_qty:string,fuel_negotiated_discount_amt:string,fuel_product_type_desc:string,fuel_trans_category_desc:string,fuel_trans_loctn_city_name:string,fuel_trans_time:string,ge_fiscal_quarter_name:string,ge_fiscal_year_name:string,ge_unit_num:string,inventory_status_desc:string,license_plate_state_cd:string,line_item_qty:string,model_desc:string,mso_first_name:string,mso_last_name:string,purchase_transaction_dt:string,svcs_net_amt:string,vntg_year:string --partitionKeys corp_code,fleet_number,ge_fiscal_period_name --rowKeys consolidated_billing_source_cd,securitylevel,charge_type_desc,charge_cd,asset_id,billing_invoice_id,billing_invoice_line_num --segmentKey ':string /0' Creating dataset billing_fact_native2... Uncaught exception: java.util.concurrent.TimeoutException: Futures timed out after [5 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at filodb.cli.CsvImportExport$class.parse(CsvImportExport.scala:43) at filodb.cli.CliMain$.parse(CliMain.scala:57) at filodb.cli.CsvImportExport$class.actorAsk(CsvImportExport.scala:49) at filodb.cli.CliMain$.actorAsk(CliMain.scala:57) at filodb.cli.CliMain$.createDatasetAndColumns(CliMain.scala:157) at filodb.cli.CliMain$.main(CliMain.scala:88) at filodb.cli.CliMain$.main(CliMain.scala:57) at com.quantifind.sumac.ArgMain$class.mainHelper(ArgApp.scala:45) at com.quantifind.sumac.ArgMain$class.main(ArgApp.scala:34) at filodb.cli.CliMain$.main(CliMain.scala:57) at filodb.cli.CliMain.main(CliMain.scala)
It is helpful for developers to understand read and write path.
We should look into and explore the use of Tungsten for FiloDB query side and native support for it too.
I'm trying to follow the directions in Using Spark to ingest and query data.
When I run sbt spark/assembly
, on OS X 10.10.5
, Java 1.8.0_60
, and sbt 0.13.7
, using commit e90b7,
sbt spark/assembly` reports:
...
[info] SaveAsFiloTest:
[info] Exception encountered when attempting to run a suite with class name: filodb.spark.SaveAsFiloTest *** ABORTED ***
[info] com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.TransportException: [localhost/127.0.0.1:9042] Cannot connect))
[info] at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:223)
[info] at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)
[info] at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1230)
[info] at com.datastax.driver.core.Cluster.init(Cluster.java:157)
[info] at com.datastax.driver.core.Cluster.connect(Cluster.java:245)
[info] at com.websudos.phantom.connectors.DefaultCassandraManager$$anonfun$initIfNotInited$1.apply(DefaultCassandraManager.scala:113)
[info] at com.websudos.phantom.connectors.DefaultCassandraManager$$anonfun$initIfNotInited$1.apply(DefaultCassandraManager.scala:112)
[info] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
[info] at scala.concurrent.package$.blocking(package.scala:50)
[info] at com.websudos.phantom.connectors.DefaultCassandraManager.initIfNotInited(DefaultCassandraManager.scala:112)
[info] ...
[info] ScalaTest
[info] Run completed in 4 seconds, 343 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 1
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] *** 1 SUITE ABORTED ***
[error] Error: Total 1, Failed 0, Errors 1, Passed 0
[error] Error during tests:
[error] filodb.spark.SaveAsFiloTest
[error] (spark/test:test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 7 s, completed Aug 31, 2015 6:35:36 PM
The second one is perhaps 2-3 days worth of work, and would be valuable especially for InMemoryColumnStore.
[2015-12-19 06:42:31,649] ERROR f.c.r.Scheduler [filo-cli-akka.actor.default-dispatcher-5] - Reprojection task (gdelt,0) failed
com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured columnfamily gdelt_chunkmap
at com.datastax.driver.core.Responses$Error.asException(Responses.java:124) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:120) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.RequestHandler.access$2300(RequestHandler.java:45) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:754) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:576) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:994) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:916) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_51]
[2015-12-19 06:42:32,062] ERROR com.websudos.phantom [pool-9-thread-1] - unconfigured columnfamily gdelt_chunkmap
[2015-12-19 06:42:32,179] ERROR com.websudos.phantom [pool-9-thread-2] - unconfigured columnfamily gdelt_chunkmap
[2015-12-19 06:42:32,308] ERROR com.websudos.phantom [pool-9-thread-1] - unconfigured columnfamily gdelt_chunkmap
based on GDELT, NYC TAXI, and other datasets
Easy to run
This could be as simple as a one liner to enable it in the current MapDBMemTable, plus recovery logic. However, need to benchmark as the MapDBMemTable is already not fast.
My intention here is to conceptually use FiloDB as an index for a Cassandra Table so it becomes more transparent to users/application writers and can be used with existing applications to give them the analytics boost.
From Blender perspective, for now this is the easiest and most straightforward way to work on our storage optimizer and fits in the current blender system without much change.
Hi @velvia,
I just wanted to ask if the ByteIterator
functionality from phantom is of any use whatsoever to you anymore. There's a lot of code we ported in from Akka that we don't really want to maintain and as I recall you took a different approach in the end, so there may be 0 reason for us to keep it in the codebase.
Also, you can extend and create your own columns quite easily, and perhaps that's why it makes little sense to do keep it all in one place, especially since are doing nothing but duplicating already existing concerns far better addressed in different projects.
Regards.
When I try to execute test Streaming test, I get this error
Error:scalac: Class org.apache.avro.reflect.Stringable not found - continuing with a stub.
Allow changing the destination C* Keyspace in the options when writing a DataFrame in Spark. Also when connecting via CLI.
For certain work loads it is important to commit all the writes to FiloDB tables at the end of ETL process. Currently there is no documented option that allows programmer to issue a commit at the end of ETL to trigger flush of any pending memtable data to tables.
Another option is to control memtable flushes if there are no further writes to memtable in past X minutes/secs.
Apache Calcite has what seems like more robust ANSI SQL support and query optimization than Spark. It has Spark integration. Investigate using that as a JDBC driver/server and either using filodb.spark with spark adapter, or writing a FiloDB direct adapter.
Also able to overwrite row key columns during data ingestion with OVERWRITE option.
select \* from datasets where name='billing_fact_native2' ;
name | projectionid | options | partitioncolumns | keycolumns | projectioncolumns | projectionreverse | segmentcolumns
----------------------+--------------+--------------------+----------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+-------------------+----------------
billing_fact_native2 | 0 | {"chunkSize":5000} | corp_code\x01fleet_number\x01ge_fiscal_period_name | consolidated_billing_source_cd\x01securitylevel\x01charge_type_desc\x01charge_cd\x01asset_id\x01billing_invoice_id\x01billing_invoice_line_num | | False | :string /0
Save command:
srcDF.write().format("filodb.spark").option("dataset",targetTable).
option("row_keys", "consolidated_billing_source_cd,security_level,charge_type_desc,charge_cd,asset_id,billing_invoice_id,billing_invoice_line_num").
option("segment_key", ":string /0").
option("partition_keys", "corp_code,fleet_number,ge_fiscal_period_name").
mode(SaveMode.Overwrite).save();
select keycolumns from datasets where name='billing_fact_native2';
## keycolumns
consolidated_billing_source_cd\x01security_level\x01charge_type_desc\x01charge_cd\x01asset_id\x01billing_invoice_id\x01billing_invoice_line_num```
What if someone truncates a table - or inserts data when you don't intend them to? Wouldn't it be nice to have a place where every table DDL/ETL was tracked so you could go back and find out when something happened?
DDL Transactions table:
What queries would someone want on this table? Maybe just expose it to Spark and leave it at that.
The initial segment key filtering feature just implemented is rather limited in scope. It requires a range - either >/>= AND </<=. Let's add some more flexibility.
For Cassandra, since the range scan is really just a client side ALLOW FILTERING on the segment key, in theory we can also support arbitrary conditions, but let's limit ourselves on that a bit.
Hi Evan,
How are we going to track issues for FiloDB? Trello.com boards is one way to do so.
-Kushal.
If the query specifies one partition key, instead of scanning the entire column store, we should use the ReadSegments
API which reads from one partition only. This would be much faster and good for point queries.
We should also think about exposing this to non-Spark query API.
We should look into storing the local FiloDB data directly on Disk (and not in Cassandra). Thhis way we will be able to bypass cassandra (and all the network overhead and serde) while querying.
This will also remove the hard-link between FiloDB and cassandra and FiloDB can then sit behind any data distribution framework or distributed database engine without needing C*.
It will also pave the way for us to harness the memory-mapped reading (one of the core selling points for FlatBuffers) and off-heap processing and possibly SIMD and GPGPU support.
And finally, it will be good step towards native Tungsten bindings.
Low priority ATM
We should be able to support all cassandra data data types. I am specially looking for -
Here are the new features:
Of particular relevance:
sqlContext.read.table("dbName.tableName")
:stringPrefix
- in which case Spark should probably filter again)Currently reads from Spark are just single threaded. This is because the intention is to distribute reads throughout the cluster by splitting up token ranges of keys. scanSegments is passed some parameters, which are empty right now.
We would need a method to distribute ranges of tokens based on the # of nodes and node info, possibly piggyback on Calliope.
I really like this concept, but it's critical for us to be able to create permanent tables via a Hive metastore. Would it be possible to support Hive at some point?
Currently on ingestion, FiloDB data source in Spark will automatically add new columns found in the source data frame to the FiloDB table (but existing columns whose type doesn't match will trigger a SchemaError). This may not be desired in many environments. So, we should either:
The behavior to change would be in the spark module, in packages.scala
.
There are multiple places in the code that calls APIs with a type parameter, switching on column type, like this:
They are all over, like multiple places in MemTable.scala, Dataset.scala, Reprojector.scala. Reduce the boilerplate using this suggestion from @gclaramunt: https://gist.github.com/velvia/4e923bf6eac6b4e8e7a0
We could start by defining a Type K thing in ColumnType
. Working with the various implicit helpers / typeclasses might be interesting.
Right now, ByteBuffers are used everywhere in the ColumnStore
API. ByteBuffers are pretty horrible to work with as they are mutable, and all it takes is one accidental call to mess up the very important data pipeline flow of columnar chunks.
Ideally we want something which is 1) immutable, and 2) can hashCode correctly and be used as keys in HashMaps, yet still allows 3) random access.
Array[Byte]
is mutable, slightly better than ByteBuffer
, but does not hashCode.
Instead, let's look into these alternatives:
ByteVector
typeUse Int's everywhere internally, and do a column name string to ColumnId (Int) translation. This gives us two advantages:
Number of chunks in a partition can impact the performance of reads. Depending on partition strategy and the frequency of data inserted into partition, it is possible that we might end up with fragmented chunks over time.
Ex: Table is partition by transaction month and transactions are loaded every day.
Please document tuning options to manage chunks post insertion.
Hi guys! Working with your project i faced off with problem that cassandra throws exception trying handle too large batch. How can i configure it (mb decrease batch size or sth else)? I havn't found any examples and spending a lot of time with your source has no affect. I solved this problem configuring cassandra config (param : batch_size_fail_threshold_in_kb) but it's not good solution for me. Any ideas?
Based on the just-added JMH performance benchmarks, it seems there is plenty of read performance to be gained by having an in memory caching layer in front of the CassandraColumnStore
. Some thoughts:
class CachingColumnStore(underlying: IndexedChunkColumnStore)
.ScanChunkRowMaps
, because without knowing the underlying store, the scanning parameters could not be translated into chunk row maps. This is fine not to cache though, since it is very very small compared to the size of chunks.
You depend on https://github.com/outworkers/phantom which is licenced under a dubious proprietary licence including such gems as
"You may not publish any tutorials or educational material on phantom, any blog posts or any written published content on phantom without our explicit written consent."
and
"any new clause provided by future updates of this license will apply to all existing versions of phantom irrespective of any timeframe."
Indeed several members of the scala community have received takedown notices of their blog articles.
Explicitly noting in https://github.com/outworkers/phantom#license-and-copyright that "it is not provided under an OSS license." (emphasis mine)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.