I was able to setup scala stream collector, beam enrich and BigQuery loader on GCP. The problem is that BigQuery loader’s Dataflow job rises this error:
Operation ongoing in step StreamingInserts/StreamingWriteTables/StreamingWrite for at least 05m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) at java.util.concurrent.FutureTask.get(FutureTask.java:191) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:816) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:881) at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:143) at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:115) at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
And this makes me wonder does the BigQuery loader create a bq table schema? If not is there any schema I can use?
Note that I ended up running that and then copying the schema into a new BigQuery table since this utility does not set up partitioning. So if you want that, or any other configuration on your BQ table, you’ll need to manually create the table. To do that, you need that base schema.
You can use Google Cloud CLI to get that schema from a table you previous created with the Mutator tool with bq show command. Something like:
bq show --schema --format=prettyjson --project_id=your-project-id-000000 bq-instance-000000:db_name.table_name > snowplow-schema.json
Then you can use that when creating a new BigQuery table from the web console or CLI.
Then you’ll want to have that other command called listen in the Mutator utility running as it will pick up messages from the PubSub subscription (if you have one from following the docs, if you do not, then it would not be able to update schema) and handle your schema updates as needed.