Coder Social home page Coder Social logo

filodb / filodb Goto Github PK

View Code? Open in Web Editor NEW
1.4K 1.4K 224.0 32.29 MB

Distributed Prometheus time series database

License: Apache License 2.0

Scala 96.95% Shell 0.15% Java 2.81% ANTLR 0.08%
database distributed kafka metrics prometheus reactive scala time-series

filodb's Issues

Ability to restart ingestion with different schema

Hi, I am testing the update existing data in FiloDB. I understand there is version concept in it, but by default, every push will overwrite the version 0 by default, so I expect the new data will OverWrite the old data.

I am testing this on one Linux box, running DSE 4.8.5 (with C* 2.1.13) and Spark 1.5.2 with FiloDB 0.2.1-master trunk. I am generating 1M emails, using the first 2 characters as the partition key, domain of the email as segment key, and unique email address as the row key.

  1. Starting Spark 1.5.2, disabling Tungsten sort, and running it as local
    [yzhang@yzhang-linux data]$ ~/spark/bin/spark-shell --conf spark.sql.tungsten.enabled=false --conf spark.sql.shuffle.partitions=1 --jars /home/yzhang/source/FiloDB/spark/target/scala-2.10/filodb-spark-assembly-0.2.1-SNAPSHOT.jar,/home/yzhang/lib/commons-csv-1.2.jar,/home/yzhang/lib/spark-csv_2.10-1.4.0.jar --driver-memory 3G --master local[2]

  2. Load the CSV file, and save them into the FiloDB
    scala>val csvDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/home/yzhang/data/poc2_1m.csv")
    val df = csvDF.withColumn("PartId", substring($"Email", 0, 2)).sort($"PartId").drop("PartId")
    df: org.apache.spark.sql.DataFrame = [Email: string, Domain: string, GlobalStop: int, RestrictedAddress: int, RestrictedDomain: int, RoleAddress: int, Active: int]
    scala> df.write.format("filodb.spark").option("dataset", "poc2").option("row_keys", "Email").option("segment_key", "Domain").option("partition_keys", ":stringPrefix Email 2").mode(SaveMode.Overwrite).save()

  3. The data saved without any problem, and I can query it.
    scala> val poc2 = sqlContext.read.format("filodb.spark").option("dataset", "poc2").load()
    poc2: org.apache.spark.sql.DataFrame = [RestrictedDomain: int, RestrictedAddress: int, Email: string, GlobalStop: int, RoleAddress: int, Domain: string, Active: int]
    scala> poc2.count
    res1: Long = 1000000
    scala> poc2.registerTempTable("poc2")
    scala> sqlContext.sql("select count(*) from poc2 where GlobalStop = 1").show()
    +------+
    | _c0|
    +------+
    |500034|
    +------+

  4. Now I use the same data, but I overwrite the GlobalStop field all to "1", and tried to save again:
    scala>val csvDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/home/yzhang/data/poc2_1m.csv")
    scala> def udfSetOne=udf((input:Int) => 1)
    scala> val updatedDF = csvDF.withColumn("newGlobalStop", udfSetOne(csvDF("GlobalStop"))).drop("GlobalStop").withColumnRenamed("newGlobalStop","GlobalStop").withColumn("PartId", substring($"Email", 0, 2)).sort($"PartId").drop("PartId")
    scala> updatedDF.printSchema
    root
    |-- Email: string (nullable = true)
    |-- Domain: string (nullable = true)
    |-- RestrictedAddress: integer (nullable = true)
    |-- RestrictedDomain: integer (nullable = true)
    |-- RoleAddress: integer (nullable = true)
    |-- Active: integer (nullable = true)
    |-- GlobalStop: integer (nullable = true)

So what I did is to read the same csv file again, but this time, I use a UDF to create a new column named "newGlobalStop" with value to 1, drop old "globalStop", and rename "newGlobalStop" to "globalStop"
So I created a new Dateframe, same schema as before, but with "globalStop" column with value as 1 ONLY.

scala> updatedDF.registerTempTable("df")
scala> sqlContext.sql("select count(*) from df where GlobalStop = 1").show()
+-------+
| _c0|
+-------+
|1000000|
+-------+

The above proves the new DF with only 1 as GlobalStop value

  1. Now write them to FiloDB again, to the same dataset:
    scala> updatedDF.write.format("filodb.spark").option("dataset", "poc2").option("row_keys", "Email").option("segment_key", "Domain").option("partition_keys", ":stringPrefix Email 2").mode(SaveMode.Overwrite).save()

  2. So far, I didn't get any error, so I use a new dataset overwrite the old one, with 1 as only value in one field.

  3. But when I query "poc2" dataset again from FiloDB, I didn't get the new value as I expected:
    scala> val poc2 = sqlContext.read.format("filodb.spark").option("dataset", "poc2").load()
    poc2: org.apache.spark.sql.DataFrame = [RestrictedDomain: int, RestrictedAddress: int, Email: string, GlobalStop: int, RoleAddress: int, Domain: string, Active: int]
    scala> poc2.registerTempTable("poc2")
    scala> sqlContext.sql("select count(*) from poc2 where GlobalStop = 1").show()
    +------+
    | _c0|
    +------+
    |500034|
    +------+

It looks like my overwrite of 2nd time just being ignored.

Did I do something wrong, or I misunderstand how the data can be updated in FiloDB?

Thanks

Yong

Futures timed out after [5 seconds]

I am trying to test FiloDB 0.2 with a 6 nodes C* (DSE 4.8.5) cluster, running Spark 1.5.2.

The 100k sample data coming with FiloDB works fine, but when I tried to load 50M data of our use case, with a dataset I come out of POC, I got the following error message when trying to do this step:

newDF.write.format("filodb.spark").option("dataset", "poc1").option("row_keys", "Email").option("segment_key","Domain").option("partition_keys","Partition").mode(SaveMode.Overwrite).save()

java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at filodb.spark.FiloRelation$.parse(FiloRelation.scala:36)
at filodb.spark.FiloRelation$.actorAsk(FiloRelation.scala:42)
at filodb.spark.package$FiloContext.createNewDataset(package.scala:165)
at filodb.spark.package$FiloContext.createOrUpdateDataset(package.scala:216)
at filodb.spark.package$FiloContext.saveAsFilo(package.scala:268)
at filodb.spark.DefaultSource.createRelation(DefaultSource.scala:53)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:25)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:30)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:32)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:34)
at $iwC$$iwC$$iwC$$iwC.(:36)
at $iwC$$iwC$$iwC.(:38)
at $iwC$$iwC.(:40)
at $iwC.(:42)
at (:44)
at .(:48)
at .()
at .(:7)
at .()

I am not sure this related to our C* settings, but it looks like a timeout from the FiloDB, but I don't see any configuration in the application.conf can adjust it.

What setting I should change for this?

Thanks

CLI does not ingest all rows

From Gitter channel:

i found out some interesting things:
First: i used filo-cli to import data, check filo-cli.log i found:

[2015-11-06 19:47:03,113] INFO  f.c.DatasetCoordinatorActor [filo-cli-akka.actor.default-dispatcher-9] - MemTable active table rows: 44999
[2015-11-06 19:47:03,113] INFO  f.c.DatasetCoordinatorActor [filo-cli-akka.actor.default-dispatcher-9] - MemTable flushing rows: 55000

Then: i use spark-shell to check how many records in dataset gdelt and got 55000 records. It equals with 55k records was flushed by memTable. It seem memTable belongs to filo-cli process and it doesn't flush active table rows on exit. So we lost 45k records.

I'll bet it's more of a timing problem. RowSource sends Flush message to NodeCoordinatorActor, but it may not flush everything. The CLI is not persistent, and quits before everything is flushed. Instead, let's implement a FinishIngestion message, which flushes everything at the end, including the active table....

Automatic segment length determination

Currently, each segment has some fixed length, and it is hard wired and not configurable. This obviously won't work for generic data ingestion. For example, for time series ingestion you want to be able to fit a certain number of points per segment. Or, maybe what you want is for each segment to correspond to a certain deviceId, or subpart of the deviceId time series. So, this has to be

a. Configurable, or
b. better yet, automatically determined.

GDELT data population fails

I am trying to follow the steps to try FiloDB. filo-cli init succeeded.

I am stuck in next step. The error looks more Cassandra specific. But any config in spark to fix this problem?

scala> csvDF.write.format("filodb.spark").
| option("dataset", "gdelt").
| option("sort_column", "GLOBALEVENTID").
| mode(SaveMode.Overwrite).save()
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_4_piece0 on localhost:36922 in memory (size: 19.3 KB, free: 534.5 MB)
16/01/03 11:16:14 INFO package$: Truncating dataset gdelt
16/01/03 11:16:14 INFO package$: Creating dataset Dataset(gdelt,List(Projection(0,gdelt,GLOBALEVENTID,false,List(),10000)),_partition,{"chunkSize":1000,"segmentSize":"10000"})...
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_5_piece0 on localhost:36922 in memory (size: 1842.0 B, free: 534.5 MB)
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_6_piece0 on localhost:36922 in memory (size: 19.3 KB, free: 534.5 MB)
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_7_piece0 on localhost:36922 in memory (size: 1834.0 B, free: 534.5 MB)
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_8_piece0 on localhost:36922 in memory (size: 19.3 KB, free: 534.5 MB)
16/01/03 11:16:14 INFO BlockManagerInfo: Removed broadcast_9_piece0 on localhost:36922 in memory (size: 3.7 KB, free: 534.5 MB)
16/01/03 11:16:14 INFO package$: Dataset gdelt created successfully...
16/01/03 11:16:14 INFO package$: Read schema for dataset gdelt = Map(ActionGeo_CountryCode -> Column(ActionGeo_CountryCode,gdelt,0,StringColumn,FiloSerializer,false,false), Actor1Geo_FullName -> Column(Actor1Geo_FullName,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Name -> Column(Actor2Name,gdelt,0,StringColumn,FiloSerializer,false,false), ActionGeo_ADM1Code -> Column(ActionGeo_ADM1Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2CountryCode -> Column(Actor2CountryCode,gdelt,0,StringColumn,FiloSerializer,false,false), Actor1Name -> Column(Actor1Name,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Religion2Code -> Column(Actor2Religion2Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor1KnownGroupCode -> Column(Actor1KnownGroupCode,gdelt,0,StringColumn,FiloSerializer,false,false), ActionGeo_Type -> Column(ActionGeo_Type,gdelt,0,IntColumn,FiloSerializer,false,false), Actor1Geo_Type -> Column(Actor1Geo_Type,gdelt,0,IntColumn,FiloSerializer,false,false), Actor1Religion1Code -> Column(Actor1Religion1Code,gdelt,0,StringColumn,FiloSerializer,false,false), EventCode -> Column(EventCode,gdelt,0,IntColumn,FiloSerializer,false,false), Actor1Geo_Long -> Column(Actor1Geo_Long,gdelt,0,DoubleColumn,FiloSerializer,false,false), Actor1EthnicCode -> Column(Actor1EthnicCode,gdelt,0,StringColumn,FiloSerializer,false,false), GLOBALEVENTID -> Column(GLOBALEVENTID,gdelt,0,IntColumn,FiloSerializer,false,false), NumMentions -> Column(NumMentions,gdelt,0,IntColumn,FiloSerializer,false,false), Actor2KnownGroupCode -> Column(Actor2KnownGroupCode,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Code -> Column(Actor2Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Geo_Lat -> Column(Actor2Geo_Lat,gdelt,0,DoubleColumn,FiloSerializer,false,false), FractionDate -> Column(FractionDate,gdelt,0,DoubleColumn,FiloSerializer,false,false), Actor2Geo_Long -> Column(Actor2Geo_Long,gdelt,0,DoubleColumn,FiloSerializer,false,false), Actor1Code -> Column(Actor1Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor1Type2Code -> Column(Actor1Type2Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Geo_FullName -> Column(Actor2Geo_FullName,gdelt,0,StringColumn,FiloSerializer,false,false))
16/01/03 11:16:14 INFO package$: Matching columns - Set(ActionGeo_CountryCode, Actor1Geo_FullName, Actor2Name, ActionGeo_ADM1Code, Actor2CountryCode, Actor1Name, Actor2Religion2Code, Actor1KnownGroupCode, ActionGeo_Type, Actor1Geo_Type, Actor1Religion1Code, EventCode, Actor1Geo_Long, Actor1EthnicCode, GLOBALEVENTID, NumMentions, Actor2KnownGroupCode, Actor2Code, Actor2Geo_Lat, FractionDate, Actor2Geo_Long, Actor1Code, Actor1Type2Code, Actor2Geo_FullName)
Missing columns - Set(DATEADDED, Actor2Geo_Type, IsRootEvent, Actor2EthnicCode, Actor2Type2Code, Actor2Type1Code, NumSources, AvgTone, Actor1Type3Code, Actor1Geo_Lat, NumArticles, ActionGeo_FullName, Year, Actor1Geo_FeatureID, ActionGeo_Long, Actor2Geo_ADM1Code, GoldsteinScale, Actor2Geo_CountryCode, Actor1Geo_ADM1Code, Actor1Religion2Code, EventBaseCode, MonthYear, ActionGeo_Lat, ActionGeo_FeatureID, Actor2Religion1Code, QuadClass, Actor1Type1Code, Actor2Type3Code, Actor1Geo_CountryCode, EventRootCode, _partition, SQLDATE, Actor1CountryCode, Actor2Geo_FeatureID)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
16/01/03 11:16:16 ERROR phantom: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
filodb.core.StorageEngineException: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
at filodb.cassandra.Util$ResultSetToResponse$$anonfun$toResponse$1.applyOrElse(Util.scala:19)
at filodb.cassandra.Util$ResultSetToResponse$$anonfun$toResponse$1.applyOrElse(Util.scala:18)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:59)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:117)
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:120)
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186)
at com.datastax.driver.core.RequestHandler.access$2300(RequestHandler.java:45)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:754)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:590)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:994)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:916)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.epoll.EpollSocketChannel$EpollSocketUnsafe.epollInReady(EpollSocketChannel.java:722)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:326)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:264)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more
Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency SERIAL (1 replica were required but only 0 acknowledged the write)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:62)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:40)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:243)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:225)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
... 11 more

Support composite sort and partition keys

Today FiloDB only supports sort key from a single column. Let's support multiple component sort keys. This should be relatively contained - at least the core changes, yet be a good introduction to different parts of the code. Change list that comes to mind:

  • Create a new SortKeyHelper that can support multiple column sort keys. Perhaps the following:
    • TwoColumnSortKeyHelper extends SortKeyHelper[(A, B)]
    • ThreeColumnSortKeyHelper etc.
  • Modify MergeStrategy, Projection to support multiple sort keys. It should be a Seq[String], and the earlier columns listed take priority.
  • Figure out how to persist SegmentIDs etc. for multiple column sort keys
  • Support multiple sort keys in Spark and CLIs

Set up document

Is there any link\document, where I can find how to set up steps?

DirectMemory size detection

Right now the MemTable detects OOM using a simple sys.runtime.freeMemory heuristic, but this does not protect against DirectMemory out of memory errors. Need to find a better way to do this.

Make InMemoryColumnStore viable for Spark-only and Spark Streaming apps

There are plenty of Spark apps that could benefit from an in-memory store that can update and filter and scan quickly, with columnar projections (plus have a backup to Cassandra or other permanent store). For example, Spark Streaming apps often need to join data. This ticket is about making InMemoryColumnStore truly useful.

  • Add partitioning info stored on the driver
  • Prune partitions if push down filters only to some partitions
  • Be able to evict older entries or segments for time series
  • Update the README to highlight streaming use cases
  • Require users to distribute data themselves?

See these Spark 1.6.0 tickets that might be relevant:
https://issues.apache.org/jira/browse/SPARK-11410
https://issues.apache.org/jira/browse/SPARK-4849

Reprojection / Scheduler fixes

  • Last batch of rows in MemTable at end of ingestion is being orphaned and not flushed properly. Adding a time-based FlushPolicy should fix that.
  • Reprojection succeed/failures look like this right now: Reprojection task (gdelt,0) succeeded: Stream(Success, ?). Add more info, like the keyRanges that just got flushed.
  • Failed reprojection tasks keep getting re-run in the scheduler. Cap the max # of retries?
  • Scheduler logs can be very noisy. Have a way to turn off all the logs, esp for spark-shell users.

Retries and improved Reprojector error handling

DatasetCoordinatorActor: handleFailed: add retries for reprojections that fail.

Bonus: Add some state to the reprojector, or delete rows that succeeded, such that the reprojection won't have to write out segments that did successfully write. The only problem with deleting rows is that if there are multiple reprojections off the same memtable, we don't want to affect other reproductions. But we can think about that later.

Unable to recreate table using CLI when data is present

Drop table option is not available right now. Also getting an exception when trying to recreate table if there is data in it. Recreate table works when there is no data in that table.
./filo-cli-0.2-SNAPSHOT -Dconfig.file=./filodb-element.conf --command create --dataset billing_fact_native2 --columns ge_fiscal_period_name:string,corp_code:string,fleet_number:string,consolidated_billing_source_cd:string,security_level:string,charge_type_desc:string,charge_cd:string,asset_id:string,billing_invoice_id:string,billing_invoice_line_num:string,asset_class_cd:string,asset_make_desc:string,asset_start_dt:string,bill_level2_id_desc:string,bill_level3_id_desc:string,bill_level4_id_desc:string,bill_level5_id_desc:string,bill_level6_id_desc:string,billing_period_id:string,billing_structure_party_id:string,billing_structure_start_dt:string,charge_desc:string,cust_inv_odom_reading_qty:string,fuel_negotiated_discount_amt:string,fuel_product_type_desc:string,fuel_trans_category_desc:string,fuel_trans_loctn_city_name:string,fuel_trans_time:string,ge_fiscal_quarter_name:string,ge_fiscal_year_name:string,ge_unit_num:string,inventory_status_desc:string,license_plate_state_cd:string,line_item_qty:string,model_desc:string,mso_first_name:string,mso_last_name:string,purchase_transaction_dt:string,svcs_net_amt:string,vntg_year:string --partitionKeys corp_code,fleet_number,ge_fiscal_period_name --rowKeys consolidated_billing_source_cd,securitylevel,charge_type_desc,charge_cd,asset_id,billing_invoice_id,billing_invoice_line_num --segmentKey ':string /0' Creating dataset billing_fact_native2... Uncaught exception: java.util.concurrent.TimeoutException: Futures timed out after [5 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at filodb.cli.CsvImportExport$class.parse(CsvImportExport.scala:43) at filodb.cli.CliMain$.parse(CliMain.scala:57) at filodb.cli.CsvImportExport$class.actorAsk(CsvImportExport.scala:49) at filodb.cli.CliMain$.actorAsk(CliMain.scala:57) at filodb.cli.CliMain$.createDatasetAndColumns(CliMain.scala:157) at filodb.cli.CliMain$.main(CliMain.scala:88) at filodb.cli.CliMain$.main(CliMain.scala:57) at com.quantifind.sumac.ArgMain$class.mainHelper(ArgApp.scala:45) at com.quantifind.sumac.ArgMain$class.main(ArgApp.scala:34) at filodb.cli.CliMain$.main(CliMain.scala:57) at filodb.cli.CliMain.main(CliMain.scala)

sbt spark/assembly fails

I'm trying to follow the directions in Using Spark to ingest and query data.

When I run sbt spark/assembly, on OS X 10.10.5, Java 1.8.0_60, and sbt 0.13.7, using commit e90b7,sbt spark/assembly` reports:

...
[info] SaveAsFiloTest:
[info] Exception encountered when attempting to run a suite with class name: filodb.spark.SaveAsFiloTest *** ABORTED ***
[info]   com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.TransportException: [localhost/127.0.0.1:9042] Cannot connect))
[info]   at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:223)
[info]   at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)
[info]   at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1230)
[info]   at com.datastax.driver.core.Cluster.init(Cluster.java:157)
[info]   at com.datastax.driver.core.Cluster.connect(Cluster.java:245)
[info]   at com.websudos.phantom.connectors.DefaultCassandraManager$$anonfun$initIfNotInited$1.apply(DefaultCassandraManager.scala:113)
[info]   at com.websudos.phantom.connectors.DefaultCassandraManager$$anonfun$initIfNotInited$1.apply(DefaultCassandraManager.scala:112)
[info]   at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
[info]   at scala.concurrent.package$.blocking(package.scala:50)
[info]   at com.websudos.phantom.connectors.DefaultCassandraManager.initIfNotInited(DefaultCassandraManager.scala:112)
[info]   ...
[info] ScalaTest
[info] Run completed in 4 seconds, 343 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 1
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] *** 1 SUITE ABORTED ***
[error] Error: Total 1, Failed 0, Errors 1, Passed 0
[error] Error during tests:
[error]     filodb.spark.SaveAsFiloTest
[error] (spark/test:test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 7 s, completed Aug 31, 2015 6:35:36 PM

FiloDB Compaction

  1. On disk compaction of fragmented segments into more unified segments
  2. Ingestion-time compaction of fragmented segments into more complete ones

The second one is perhaps 2-3 days worth of work, and would be valuable especially for InMemoryColumnStore.

Example dataset has issues

[2015-12-19 06:42:31,649] ERROR f.c.r.Scheduler [filo-cli-akka.actor.default-dispatcher-5] - Reprojection task (gdelt,0) failed
com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured columnfamily gdelt_chunkmap
at com.datastax.driver.core.Responses$Error.asException(Responses.java:124) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:120) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.RequestHandler.access$2300(RequestHandler.java:45) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:754) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:576) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:994) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:916) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[filo-cli-0.1-SNAPSHOT:0.1-SNAPSHOT]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_51]
[2015-12-19 06:42:32,062] ERROR com.websudos.phantom [pool-9-thread-1] - unconfigured columnfamily gdelt_chunkmap
[2015-12-19 06:42:32,179] ERROR com.websudos.phantom [pool-9-thread-2] - unconfigured columnfamily gdelt_chunkmap
[2015-12-19 06:42:32,308] ERROR com.websudos.phantom [pool-9-thread-1] - unconfigured columnfamily gdelt_chunkmap

MemTable WAL implementation

This could be as simple as a one liner to enable it in the current MapDBMemTable, plus recovery logic. However, need to benchmark as the MapDBMemTable is already not fast.

Automate ingestion/etl from cassandra column family to FiloDB

My intention here is to conceptually use FiloDB as an index for a Cassandra Table so it becomes more transparent to users/application writers and can be used with existing applications to give them the analytics boost.

From Blender perspective, for now this is the easiest and most straightforward way to work on our storage optimizer and fits in the current blender system without much change.

Is ByteIteratorColumn from phantom in use anywhere?

Hi @velvia,

I just wanted to ask if the ByteIterator functionality from phantom is of any use whatsoever to you anymore. There's a lot of code we ported in from Akka that we don't really want to maintain and as I recall you took a different approach in the end, so there may be 0 reason for us to keep it in the codebase.

Also, you can extend and create your own columns quite easily, and perhaps that's why it makes little sense to do keep it all in one place, especially since are doing nothing but duplicating already existing concerns far better addressed in different projects.

Regards.

Control commits to table via memtable flush

For certain work loads it is important to commit all the writes to FiloDB tables at the end of ETL process. Currently there is no documented option that allows programmer to issue a commit at the end of ETL to trigger flush of any pending memtable data to tables.

Another option is to control memtable flushes if there are no further writes to memtable in past X minutes/secs.

Investigate Apache Calcite integration

Apache Calcite has what seems like more robust ANSI SQL support and query optimization than Spark. It has Spark integration. Investigate using that as a JDBC driver/server and either using filodb.spark with spark adapter, or writing a FiloDB direct adapter.

Able to create table using CLI with wrong row key column which doesn't exist in table

Also able to overwrite row key columns during data ingestion with OVERWRITE option.


select \* from datasets where name='billing_fact_native2' ;

 name                 | projectionid | options            | partitioncolumns                                   | keycolumns                                                                                                                                     | projectioncolumns | projectionreverse | segmentcolumns
----------------------+--------------+--------------------+----------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+-------------------+----------------
 billing_fact_native2 |            0 | {"chunkSize":5000} | corp_code\x01fleet_number\x01ge_fiscal_period_name | consolidated_billing_source_cd\x01securitylevel\x01charge_type_desc\x01charge_cd\x01asset_id\x01billing_invoice_id\x01billing_invoice_line_num |                   |             False |     :string /0

Save command:
srcDF.write().format("filodb.spark").option("dataset",targetTable).
    option("row_keys", "consolidated_billing_source_cd,security_level,charge_type_desc,charge_cd,asset_id,billing_invoice_id,billing_invoice_line_num").
    option("segment_key", ":string /0").
    option("partition_keys", "corp_code,fleet_number,ge_fiscal_period_name").
    mode(SaveMode.Overwrite).save();

select keycolumns from datasets where name='billing_fact_native2';
##  keycolumns

 consolidated_billing_source_cd\x01security_level\x01charge_type_desc\x01charge_cd\x01asset_id\x01billing_invoice_id\x01billing_invoice_line_num```

Logging of FiloDB ETL and transactions

What if someone truncates a table - or inserts data when you don't intend them to? Wouldn't it be nice to have a place where every table DDL/ETL was tracked so you could go back and find out when something happened?

DDL Transactions table:

  • txn_date datetime
  • dataset text
  • txn_type text - create table / reset table definition (segment key/etc.) / truncate / insert / column DDL
  • txn_details text

What queries would someone want on this table? Maybe just expose it to Spark and leave it at that.

Enhanced segment key filtering

The initial segment key filtering feature just implemented is rather limited in scope. It requires a range - either >/>= AND </<=. Let's add some more flexibility.

  • Add support for =. This is equivalent to segmentKey >= A AND segmentKey <= A.
  • Add support for open ended >/>= and </<=.

For Cassandra, since the range scan is really just a client side ALLOW FILTERING on the segment key, in theory we can also support arbitrary conditions, but let's limit ourselves on that a bit.

[Spark] Optimize query for WHERE partitionkey = <value>

If the query specifies one partition key, instead of scanning the entire column store, we should use the ReadSegments API which reads from one partition only. This would be much faster and good for point queries.

We should also think about exposing this to non-Spark query API.

Explore direct, non-networked reads for faster I/O

We should look into storing the local FiloDB data directly on Disk (and not in Cassandra). Thhis way we will be able to bypass cassandra (and all the network overhead and serde) while querying.

This will also remove the hard-link between FiloDB and cassandra and FiloDB can then sit behind any data distribution framework or distributed database engine without needing C*.

It will also pave the way for us to harness the memory-mapped reading (one of the core selling points for FlatBuffers) and off-heap processing and possibly SIMD and GPGPU support.

And finally, it will be good step towards native Tungsten bindings.

Support all Cassandra data types

We should be able to support all cassandra data data types. I am specially looking for -

  1. All the numeric types (BigInt, Float, Decimal ,etc.)
  2. Timestamp
  3. UUID (and special support for TimeUUID)

Upgrade to Spark 1.6

Here are the new features:

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#Apache%20Spark%201.6%20Release%20Notes.html

Of particular relevance:

Make Spark reads distributed

Currently reads from Spark are just single threaded. This is because the intention is to distribute reads throughout the cluster by splitting up token ranges of keys. scanSegments is passed some parameters, which are empty right now.

We would need a method to distribute ranges of tokens based on the # of nodes and node info, possibly piggyback on Calliope.

Support Hive

I really like this concept, but it's critical for us to be able to create permanent tables via a Hive metastore. Would it be possible to support Hive at some point?

Default to not changing dataset schema and definition for SaveMode.Overwrite

Currently on ingestion, FiloDB data source in Spark will automatically add new columns found in the source data frame to the FiloDB table (but existing columns whose type doesn't match will trigger a SchemaError). This may not be desired in many environments. So, we should either:

  • Keep current behavior as the default, but add an option to lock down addition of new columns on ingest, or
  • Disable addition of new columns by default, but add an option to enable it

The behavior to change would be in the spark module, in packages.scala.

Reduce sort key K type parameter boilerplate

There are multiple places in the code that calls APIs with a type parameter, switching on column type, like this:

https://github.com/tuplejump/FiloDB/blob/master/core/src/main/scala/filodb.core/reprojector/MemTable.scala#L146)

They are all over, like multiple places in MemTable.scala, Dataset.scala, Reprojector.scala. Reduce the boilerplate using this suggestion from @gclaramunt: https://gist.github.com/velvia/4e923bf6eac6b4e8e7a0

We could start by defining a Type K thing in ColumnType. Working with the various implicit helpers / typeclasses might be interesting.

[Cleanup] Use Akka ByteString or ByteVector as opposed to ByteBuffer/Array[Byte]

Right now, ByteBuffers are used everywhere in the ColumnStore API. ByteBuffers are pretty horrible to work with as they are mutable, and all it takes is one accidental call to mess up the very important data pipeline flow of columnar chunks.

Ideally we want something which is 1) immutable, and 2) can hashCode correctly and be used as keys in HashMaps, yet still allows 3) random access.

Array[Byte] is mutable, slightly better than ByteBuffer, but does not hashCode.
Instead, let's look into these alternatives:

  1. https://github.com/scodec/scodec-bits - has a ByteVector type
  2. Akka's ByteString - https://github.com/akka/akka/blob/v2.3.5/akka-actor/src/main/scala/akka/util/ByteString.scala - just copy this file so that the core module doesn't need to link in Akka

Use Int as ColumnId type instead of string

Use Int's everywhere internally, and do a column name string to ColumnId (Int) translation. This gives us two advantages:

  1. More compact representation on disk and in memory, faster to handle
  2. Ability for users to rename columns easily

Document compaction process to fragmented chunks

Number of chunks in a partition can impact the performance of reads. Depending on partition strategy and the frequency of data inserted into partition, it is possible that we might end up with fragmented chunks over time.

Ex: Table is partition by transaction month and transactions are loaded every day.

Please document tuning options to manage chunks post insertion.

batch size too large

Hi guys! Working with your project i faced off with problem that cassandra throws exception trying handle too large batch. How can i configure it (mb decrease batch size or sth else)? I havn't found any examples and spending a lot of time with your source has no affect. I solved this problem configuring cassandra config (param : batch_size_fail_threshold_in_kb) but it's not good solution for me. Any ideas?

Caching ColumnStore

Based on the just-added JMH performance benchmarks, it seems there is plenty of read performance to be gained by having an in memory caching layer in front of the CassandraColumnStore. Some thoughts:

  • Would be good for this to be a caching layer independent of underlying store. For example a class CachingColumnStore(underlying: IndexedChunkColumnStore).
  • Cache should be write-through.
  • All the lower level methods could be easily cached, even getSplits(), except for stuff like ScanChunkRowMaps, because without knowing the underlying store, the scanning parameters could not be translated into chunk row maps. This is fine not to cache though, since it is very very small compared to the size of chunks.
    • Alternatively, have a C_-specific caching column store, which keeps track of the token ranges and partitions and can therefore translate token range scanning. This seems potentially fragile though, and would have to handle invalidation if C_ topology changes.
  • Need to think about invalidation
  • Start with an on-heap version, then explore off-heap and Tachyon-based caches

ambiguous licence position

You depend on https://github.com/outworkers/phantom which is licenced under a dubious proprietary licence including such gems as

"You may not publish any tutorials or educational material on phantom, any blog posts or any written published content on phantom without our explicit written consent."

and

"any new clause provided by future updates of this license will apply to all existing versions of phantom irrespective of any timeframe."

Indeed several members of the scala community have received takedown notices of their blog articles.

Explicitly noting in https://github.com/outworkers/phantom#license-and-copyright that "it is not provided under an OSS license." (emphasis mine)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.