You are browsing a read-only backup copy of Wikitech. The live site can be found at wikitech.wikimedia.org

Difference between revisions of "Analytics/Systems/Druid"

From Wikitech-static
Jump to navigation Jump to search
imported>Nuria
imported>Bearloga
m (#raddocs)
(22 intermediate revisions by 11 users not shown)
Line 1: Line 1:
[http://druid.io/ Druid] is an analytics data store, currently (as of August 2016) in experimental use for the upcoming [[Analytics/Data_Lake]].  It is comprised of many services, each of which is fully redundant.  
[http://druid.io/ Druid] is an analytics data store, which WMF began to use in 2016, first for the [[Analytics/Data_Lake|Data Lake]].  It is comprised of many services, each of which is fully redundant.  
== Why Druid. Value Proposition ==


The Analytics team is using a nodejs Web UI application called Pivot as experimental tool to explore Druid's data.
When looking for an analytics columnar datastore we wanted  a product that could fit our use cases and scale and that in the future we could use to support real time ingestion of data. We had several alternatives: Druid, Cassandra, ElasticSearch, and of late, Clickhouse.  All these are open source choices that served our use cases to different degrees.  


= Pivot =
Druid offered the best value proposition:
[Http://pivot.wikimedia.org http://pivot.wikimedia.org] is a user interface for non programatic access to data. Most of the data available in pivot at this time comes from Hadoop. (See also a [https://usercontent.irccloud-cdn.com/file/xuIMGKl0/Screen%20Shot%202017-04-07%20at%2012.18.24%20PM.png snapshot] of available data cubes as of April 2017, with update schedules etc.)


== Access to Pivot ==
* It is designed for analytics so it can handle creation of cubes with many different dimensions without having to have those precomputed (like Cassandra does)
* It has easy loading specs and supports real time ingestion
* It provides front caching that repeated queries benefit from (Clickhouse is designed as a fast datastore for analytics but it doesn't have a frontend cache)
* Druid shipped also with a convenient UI to do basic exploration of data that '''''was''''' also open source: [http://pivot.imply.io/ Pivot] (since replaced by Turnilo)


You need a wikitech login that is in the "wmf" or "nda" LDAP groups. If you don't have it, please create a task like https://phabricator.wikimedia.org/T160662
==Access to Druid data via Turnilo==
See [[Analytics/Systems/Turnilo-Pivot]]


Before requesting access, please make sure you:
== Access to Druid data via Superset ==
* have a functioning Wikitech login. Get one: https://toolsadmin.wikimedia.org/register/
See [[Analytics/Systems/Superset]]
* are an employee or contractor with wmf OR have signed an NDA
Depending on the above, you can request to be added to the wmf group or the nda group. Please indicate the motivation on the task about why you need access and ping the analytics team if you don't hear any feedback soon from the Opsen on duty.


==Administration ==
==Access via command line==
From the stat machines you can query druid via curl.  Write a valid druid query, based on [http://druid.io/blog/2013/11/04/querying-your-data.html this example] into a file, let's say query.druid.json.  Then POST it to druid like this:


=== Logs ===
<code>curl -L -X POST "http://an-druid1001.eqiad.wmnet:8082/druid/v2/?pretty" -H "content-type: application/json" -d @geowiki.druid.json</code>
On stat1001 everybody can read <code>/var/log/pivot/syslog.log</code>


=== Deploy ===
== Ingesting EventLogging data ==
Deployment steps for deployment.eqiad.wmnet:
See [[Analytics/Systems/EventLogging/Schema Guidelines]]


<code>cd /srv/deployment/analytics/pivot/deploy</code>
== Druid Administration ==


<code>git pull</code>
=== Send commands to Druid ===


<code>git submodule update --init</code>
This is an example of how to query druid once you have ssh-ed into one of the hosts:
curl -X POST  'http://localhost:8082/druid/v2/?pretty'  -H 'Content-Type:application/json' -H 'Accept:application/json' -d '{
      "queryType": "timeseries",
      "dataSource": "wmf_netflow",
      "intervals": "2019-09-05T19:11Z/2019-09-06T19:11Z",
      "granularity": "all",
      "aggregations": [
        {
          "name": "__VALUE__",
          "type": "longSum",
          "fieldName": "bytes"
        }
      ]
    }'
</code>


<code>scap deploy</code>
=== Naming convention ===
For homogeneity across systems, underscores <code>_</code> should be used in datasource names and field names instead of hyphens <code>-</code>.


The code that renders https://pivot.eqiad.wmnet is running entirely on stat1001.eqiad.wmnet and it is split in two parts:
=== Restart an Indexing job ===
* an Apache httpd Virtual Host that takes care of Basic Authentication via LDAP Wikitech credentials check.
sudo -u hdfs oozie job \
* a nodejs application deployed via scap and stored in the https://gerrit.wikimedia.org/r/#/admin/projects/analytics/pivot/deploy repo (https://gerrit.wikimedia.org/r/#/admin/projects/analytics/pivot is a submodule).
      -Duser=$USER \
      -Dstart_time=2017-04-01T00:00Z \
      -Dstop_time=2017-06-01T00:00Z \
      -Dqueue_name=production \
      -Drefinery_directory=hdfs://analytics-hadoop$(hdfs dfs -ls -d /wmf/refinery/2017* | tail -n 1 | awk '{print $NF}') \
      -oozie $OOZIE_URL -run -config /srv/deployment/analytics/refinery/oozie/unique_devices/per_project_family/druid/daily/coordinator.properties


== Druid Administration ==
=== Delete segments ===
In order to delete segments in druid the dataset needs to be disabled first, disabling a dataset can be done through the coordinator UI. In the upper right corner there is a link to the "old coordinator UI", click there and there is a link to delete segments given a time interval. Intervals have to be spec-ed out in this format: "2019-09-05T19:11Z/2019-09-06T19:11Z",


=== Delete a data set from deep storage ===
=== Delete segments from deep storage ===


Disable datasource in coordinator (needed before deep-storage deletion)
==== Preparation for deletion ====
This step is not irreversible, data is still present in deep-storage and can reloaded easily
For segments to deleted from deep-storage, they need to be NOT in use in druid historical nodes. There are two ways this can be done:
   curl -X DELETE http://localhost:8081/druid/coordinator/v1/datasources/DATASOURCE_NAME
* A <code>rule</code> is defined for the datasource so that segments are automatically dropped from historical-nodes after a certain duration (this is what we do for webrequest, for instance). The "rule" is visible using druid admin UI. Notice that the datasource is not disabled in that case.
* The datasource is <code>disabled</code> in the coordinator, meaning after deep-storage segments deletion, the entire datasource will be lost.  To disable datasource in the coordinator (reversible, data is still present in deep-storage and can reloaded easily):
   curl -L -X DELETE http://localhost:8081/druid/coordinator/v1/datasources/DATASOURCE_NAME
If you don't have access to the host, use any of the druid hostnames instead of localhost.


Hard-delete deep storage data - Irreversible
==== Actual deletion of deep storage segments ====
After either the rule is applied to the datasource is disabled, you can '''hard-delete''' segments that are not loaded in historical node from the deep storage. Please be careful, he step is '''irreversible.''' There are two documented ways of deleting segments at the hdfs level. The first one involves sending manuially a kill task to the overlord for every interval:


  curl -X 'POST' -H 'Content-Type:application/json' -d "{ \"type\":\"kill\", \"id\":\"kill_task-tiles-poc-`date --iso-8601=seconds`\",\"dataSource\":\"DATASOURCE_NAME\", \"interval\":\"2016-11-01T00:00:00Z/2017-01-04T00:00:00Z\" }" localhost:8090/druid/indexer/v1/task
  curl -L -X 'POST' -H 'Content-Type:application/json' -d "{ \"type\":\"kill\", \"id\":\"kill_task-tiles-poc-`date --iso-8601=seconds`\",\"dataSource\":\"DATASOURCE_NAME\", \"interval\":\"2016-01-01/2017-10-01\" }" localhost:8090/druid/indexer/v1/task
{{Warning|Don't delete data on HDFS manually - it would make it very complicated to clean up druid afterward.}}
{{Warning|Don't delete data on HDFS manually - it would make it very complicated to clean up druid afterward.}}


=== Administration UI ===
=== Overlords Administration UI===
  ssh -N druid1003.eqiad.wmnet -L 8081:druid1003.eqiad.wmnet:8081
{{Warning|Don't use this UI anymore, it works but since Druid 0.19 is it incomplete. Use the coordinator's UI instead (even to check indexation jobs).}}
 
Only one overlord is the active leader at any given moment. The fastest way to figure it out is to try to establish a ssh tunnel like the following to one random Druid node of the target cluster, and then check the UI via browser (using localhost as described below). If you get the wrong overlord, the UI will not show up and you'll get a redirect to the right one.
<syntaxhighlight lang="text">
ssh -N an-druid100X.eqiad.wmnet -L 8090:an-druid100X.eqiad.wmnet:8090
 
http://localhost:8090/console.html
</syntaxhighlight>
 
=== Deletion control ===
Check the dataset does not appear on deep storage directory on hdfs
hdfs dfs -ls /user/druid/deep-storage/<dataset>
 
Have in mind that data can be in deep storage but not be showing up in druid as "rules" on druid admin UI might have disabled data entirely
 
=== Coordinators Administration UI ===
Process to connect (explanation below):
  ssh -N an-druid1001.eqiad.wmnet -L 8081:localhost:8081
 
Check http://localhost:8081/, if it works right away, great.  Otherwise, it will redirect you to an-druidXXXX.eqiad.wmnet:8081.  Use that to re-tunnel:
ssh -N an-druidXXXX.eqiad.wmnet -L 8081:localhost:8081
 
This is because only one coordinator is the active leader at any given moment. Guessing like this is probably the easiest way to find the active leader, but you can also do it by ssh-ing into the cluster.
 
=== Indexing Logs ===
Located at "/srv/druid/indexing-logs"
 
=== Safe restart of MiddleManagers when running Real time Indexing jobs ===
In order to avoid confusion for the Overlord, it is nice to drain jobs from a MiddleManager before restarting via the following command:
<syntaxhighlight lang="bash">
# Generic case
curl -X POST http://hostname:8091/druid/worker/v1/disable
 
# Example
curl -X POST http://an-druid1001.eqiad.wmnet:8091/druid/worker/v1/disable
</syntaxhighlight>
 
The current indexing jobs assigned to a MiddleManager can be found checking the Overlord's console via ssh tunnel + browser to localhost:8090:
 
<syntaxhighlight lang="bash">
# ssh tunnel to the current Overlord leader
ssh -L 8090:localhost:8090 hostname -N
 
# example
ssh -L 8090:localhost:8090 an-druid1001.eqiad.wmnet -N
</syntaxhighlight>


http://localhost:8081/#/datasources/pageviews-hourly
Please note that the Overlord runs on every Druid node, but only one is the leader for every given moment. After the creation of the ssh tunnel (you can start from a host picked up at random in the cluster), you'll get a redirect in the browser to the current hostname of the leader (when trying to access localhost:8090) if you didn't pick the right one. This is of course a quick and dirty procedure, but it works :)


You are free to restart the MiddleManager when you see that it is not running any indexing job. This will automatically put it back into enabled state.


=== Full Restart of services ===
=== Full Restart of services ===
To restart all druid services, you must restart each service on each Druid node individually.  It is best to do them one at a time, but the order does not particularly matter.
To restart all druid services, you must restart each service on each Druid node individually.  It is best to do them one at a time, but the order does not particularly matter.


Note that Druid is still in an experimental, and does not yet have much WMF operational experience behind it.
NOTE: druid-historical can take a while to restart up, as it needs to re-read indexes.
 
NOTE2: if you are running Real time indexing jobs, please check the above paragraph before proceeding.


<source lang="bash">
<syntaxhighlight lang="bash">
# for each Druid node (druid100[123]):
# for each Druid node (an-druid100[1-5]):
service druid-broker restart
service druid-broker restart
service druid-coordinator restart
service druid-coordinator restart
Line 66: Line 141:
service druid-middlemanager restart
service druid-middlemanager restart
service druid-overlord restart
service druid-overlord restart
</source>
</syntaxhighlight>


Bash snippet to automate the restart:<syntaxhighlight lang="bash">
Bash snippet to automate the restart:
<syntaxhighlight lang="bash">
#!/bin/bash
#!/bin/bash
set -x
set -x
Line 81: Line 157:
sudo service druid-historical restart
sudo service druid-historical restart
sudo service druid-historical status
sudo service druid-historical status
sleep 5
sleep 120 # check that historical startup finishes in /var/log/druid/historical.log
sudo service druid-middlemanager restart
sudo service druid-middlemanager restart
sudo service druid-middlemanager status
sudo service druid-middlemanager status
Line 89: Line 165:
</syntaxhighlight>
</syntaxhighlight>


We intend to also run a dedicated Zookeeper cluster for druid on the druid nodes.  For now (August 2016), druid uses the main Zookeeper cluster on conf100[123].  In the future, when the Druid nodes run Zookeeper, you may also want to restart Zookeeper on each node.
We also co-locate a Zookeeper ensemble with Druid. At the moment the Zookeeper servers run on on nodes: an-druid100[1-3].


<source lang="bash">
<syntaxhighlight lang="bash">
service zookeeper restart
service zookeeper restart
</source>
</syntaxhighlight>
 
=== Removing hosts/ taking hosts out of service from cluster ===
[[File:Coordinator dynamic config.png|thumb|right|alt=Selecting coordinator dynamic configuration|Selecting coordinator dynamic configuration]]
[[File:Decommissioning druid historical nodes.png|thumb|right|alt=Decommissioning druid historical nodes|Decommissioning druid historical nodes]]
 
There are two ways to remove Druid services from an active cluster before turning off the services.
 
1: Drain the middlemanager that you wish to stop. e.g. SSH to the host and run:
 
<syntaxhighlight lang="bash">
curl -X POST http://localhost:8091/druid/worker/v1/disable
</syntaxhighlight>
 
2: Use the Dynamic Configuration API of the coordinator to set nodes into <code>decommissioningNodes</code> mode.
 
This is possible to do by sending a POST request to http://localhost:8081/druid/coordinator/v1/config and this is described [[https://druid.apache.org/docs/latest/configuration/index.html#dynamic-configuration|here]].
 
However, the Druid documentation recommends using the coordinator web interface for setting dynamic configuration parameters.
 
From the top-right corner cog menu, select ''Coordinator Dynamic Config''
 
Once the historical disk cache is drained, the middlemanager is not running any jobs, and the overlord is not targeted by any scheduled jobs, it is safe to stop the services.
 
=== Handling alarms for unavailable segments ===
We have alarms for both clusters related to the number of segments that the Coordinators see as unavailable, namely that should be loaded by the Historical daemons but for some reason they are not. If this happens, please check the logs in /var/log/druid/historical.log and see if anything is ongoing.
 
=== Regular indexations through Oozie ===
 
* Regular indexations are made via oozie jobs ([https://github.com/wikimedia/analytics-refinery/tree/master/oozie/pageview/druid/daily example]).
* For testing, the datasource-name to be indexed can be provided as an oozie parameter (-Ddruid_datasource=test_datasource)
* To prevent reindexing production data, a non-hdfs user will be prevented to index datasources with name not starting with "test_"
 
== One-off indexing data into Druid ==
 
=== Why ===
You may not want to write a whole oozie job if you're not planning to load data periodically on the datasource. An example of this was the archiving of [[Analytics/Systems/Geoeditors#Accessing the old Geowiki data|Geowiki data]]. Since it was all old data, there was no need to ingest more new data in the future. Assuming you want to load some data from HDFS, you need to create an ingestion spec json file and put it in your home folder in a machine with access to the druid hosts (like stat1007).
 
=== The ingestion spec ===
At the refinery repository we have a few ingestion spec templates that you can use to fill the gaps. [https://github.com/wikimedia/analytics-refinery/blob/master/oozie/pageview/druid/monthly/load_pageview_monthly.json.template This is the one] for the ingestion of the <code>pageview_hourly</code> datasource. The Druid documentation also has a [http://druid.io/docs/latest/tutorials/tutorial-batch.html step by step tutorial] to generate your spec when loading from hadoop.
 
=== Format of your data ===
From experience, Druid prefers columnar data in the form of a json file formatted like this:
 
<code>{"time": "2015-09-01T00:00:00Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}</code>
 
<code>{"time": "2015-09-01T01:00:00Z", "url": "/", "user": "bob", "latencyMs": 11}</code>
 
<code>{"time": "2015-09-01T01:30:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}</code>
 
...but tsv/csv files are also fine! Make sure to specify the format, along with names of your columns, in your spec:
"parseSpec" : {
  "format" : "tsv",
  "dimensionsSpec" : {
    "dimensions" : [
      "project",
      "country",
      "cohort"
    ]
  },
  "delimiter": "\t",
  "columns": [
    "project",
    "country",
    "cohort",
    "month",
    "count",
    "ts"
  ],
  "timestampSpec" : {
    "format" : "auto",
    "column" : YOUR_TIME_COLUMN
  }
}
 
=== Segments ===
Druid uses your dataset's time dimension to partition the data into segments. You should specify in the ingestion spec the timespan of your dataset, as well as the granularity of your data:
 
"granularitySpec" : {
  "type" : "uniform",
  "segmentGranularity" : "month",
  "queryGranularity" : "none",
  "intervals" : ["START_DATE/END_DATE"] // The / slash to separate intervals is important
}
 
In the <code>parseSpec</code> mentioned above you can detail the format that your time column uses, but unless you're using non-standard timestamps, you can leave the format as <code>auto</code>, but remember to specify the name of the column.
 
=== Sending the indexation task ===
==== POSTing your spec ====
To initialize the ingestion you need to POST the json spec you just created to a druid overlord. Use this curl command if you want:
curl -L -X 'POST' -H 'Content-Type:application/json' -d @YOUR_INGESTION_SPEC.json http://druid1003.eqiad.wmnet:8090/druid/indexer/v1/task --dump-header -
 
You might need to use :
unset http_proxy && curl <blah>
 
If you are running these commands from stats machines
 
The `--dump-header -` argument helps you make sure that your request was responded with a 200 OK code. If you get a different code, you may want to check that your JSON is validated, since usually the task fails after it has been accepted by the overlord. If you have a 200, you may monitor your task with the overlord console.
 
Other example:
curl -X POST -H "content-type: application/json" -d '
{
  "filter": {
    "field": {
      "value": "Unknown",
      "type": "selector",
      "dimension": "continent"
    },
    "type": "not"
  },
  "granularity": "all",
  "postAggregations": [],
  "metric": "count",
  "intervals": "2018-06-17T00:00:00+00:00/2018-07-17T19:27:53+00:00",
  "queryType": "topN",
  "dimension": "continent",
  "dataSource": "pageviews_daily",
  "threshold": 50000,
  "aggregations": [
    {
      "type": "count",
      "name": "count"
    }
  ]
}
' 'http://an-druid1001.eqiad.wmnet:8082/druid/v2/?pretty'
 
==== Monitoring the task ====
To access the overlord console, from your local machine create an SSH tunnel pointing at the overlord
ssh -N an-druid1001.eqiad.wmnet -L 8081:an-druid1002.eqiad.wmnet:8081
 
Enter localhost:8081 and check out the '''indexing''' tab. Your task should be listed there. It's recommended to follow the log link the moment you see it in the console, '''since the task disappears the moment it's either successful or fails'''.
 
If the task fails and the log link from the overlord console says nothing (which happens when no data has started loading), don't despair: that doesn't mean no log has been produced. Log into the druid host you used to send the task (in the case above, <code>druid1003</code>).
 
ssh an-druid1003.eqiad.wmnet
ls /var/lib/druid/indexing-logs
 
Your task should be logged there with a timestamp in the title. '''If it's not''', there's still hope: a druid middle manager may have moved your task to a different druid host. You can find out which one in <code>/var/log/druid/middlemanager.log</code>. Or you can just SSH into any of the other four hosts (currently the hosts are: <code>an-druid100[1-5]</code>) and it'll probably be there.
 
=== Troubleshooting ===
The Druid documentation has a pretty nice ingestion troubleshooting section, so give that a shot. If everything fails, dig in the logs of the overlords (<code>/var/log/druid</code>), since there will be definitely information about your task there.
 
== Realtime indexation to Druid ==
Druid offers Kafka [https://druid.apache.org/docs/latest/development/extensions-core/kafka-ingestion Supervisors] to allow real time indexation from Kafka to Druid. This is particularly useful for datasets like <code>wmf_netflow</code> (and in the past also banner impressions).
 
The json specs for our current supported jobs are stored in refinery under druid/kafka. To kick off the real time indexation it is sufficient to send a POST to the Druid overlord like the following:<syntaxhighlight lang="bash">
curl -L -X POST -H 'Content-Type: application/json' -d '{
  "type": "kafka",
  [..cut..]
  }
}' http://an-druid1001.eqiad.wmnet:8090/druid/indexer/v1/supervisor
</syntaxhighlight>Or even simpler:<syntaxhighlight lang="bash">
curl -L -X POST -H 'Content-Type: application/json' -d @PATH_OF_THE_SPEC.json http://an-druid1001.eqiad.wmnet:8090/druid/indexer/v1/supervisor
</syntaxhighlight>If you want to update an existing running supervisor, it is sufficient to POST the updated JSON spec to the overlord. It will take care of stopping the current one and re-launching the new one.
 
== See also ==
* [[Analytics/Systems/Hive to Druid]]
* [https://speakerdeck.com/druidio/analytics-at-wikipedia-with-druid "Analytics at Wikipedia with Druid"] (slides from a [https://conferences.oreilly.com/strata/strata-ny-2017/public/schedule/detail/60986 presentation] by Andrew Otto at Strata NY 2017)
*[https://gist.github.com/bearloga/c311cdcd3a61f4435b4b006cf119c30e Ingestion spec for a gzipped CSV]

Revision as of 21:38, 21 September 2021

Druid is an analytics data store, which WMF began to use in 2016, first for the Data Lake. It is comprised of many services, each of which is fully redundant.

Why Druid. Value Proposition

When looking for an analytics columnar datastore we wanted a product that could fit our use cases and scale and that in the future we could use to support real time ingestion of data. We had several alternatives: Druid, Cassandra, ElasticSearch, and of late, Clickhouse. All these are open source choices that served our use cases to different degrees.

Druid offered the best value proposition:

  • It is designed for analytics so it can handle creation of cubes with many different dimensions without having to have those precomputed (like Cassandra does)
  • It has easy loading specs and supports real time ingestion
  • It provides front caching that repeated queries benefit from (Clickhouse is designed as a fast datastore for analytics but it doesn't have a frontend cache)
  • Druid shipped also with a convenient UI to do basic exploration of data that was also open source: Pivot (since replaced by Turnilo)

Access to Druid data via Turnilo

See Analytics/Systems/Turnilo-Pivot

Access to Druid data via Superset

See Analytics/Systems/Superset

Access via command line

From the stat machines you can query druid via curl. Write a valid druid query, based on this example into a file, let's say query.druid.json. Then POST it to druid like this:

curl -L -X POST "http://an-druid1001.eqiad.wmnet:8082/druid/v2/?pretty" -H "content-type: application/json" -d @geowiki.druid.json

Ingesting EventLogging data

See Analytics/Systems/EventLogging/Schema Guidelines

Druid Administration

Send commands to Druid

This is an example of how to query druid once you have ssh-ed into one of the hosts:

curl -X POST  'http://localhost:8082/druid/v2/?pretty'  -H 'Content-Type:application/json' -H 'Accept:application/json' -d '{
     "queryType": "timeseries",
     "dataSource": "wmf_netflow",
     "intervals": "2019-09-05T19:11Z/2019-09-06T19:11Z",
     "granularity": "all",
     "aggregations": [
       {
         "name": "__VALUE__",
         "type": "longSum",
         "fieldName": "bytes"
       }
     ]
   }'

Naming convention

For homogeneity across systems, underscores _ should be used in datasource names and field names instead of hyphens -.

Restart an Indexing job

sudo -u hdfs oozie job \
     -Duser=$USER \
      -Dstart_time=2017-04-01T00:00Z \
      -Dstop_time=2017-06-01T00:00Z \
      -Dqueue_name=production \
      -Drefinery_directory=hdfs://analytics-hadoop$(hdfs dfs -ls -d /wmf/refinery/2017* | tail -n 1 | awk '{print $NF}') \
      -oozie $OOZIE_URL -run -config /srv/deployment/analytics/refinery/oozie/unique_devices/per_project_family/druid/daily/coordinator.properties

Delete segments

In order to delete segments in druid the dataset needs to be disabled first, disabling a dataset can be done through the coordinator UI. In the upper right corner there is a link to the "old coordinator UI", click there and there is a link to delete segments given a time interval. Intervals have to be spec-ed out in this format: "2019-09-05T19:11Z/2019-09-06T19:11Z",

Delete segments from deep storage

Preparation for deletion

For segments to deleted from deep-storage, they need to be NOT in use in druid historical nodes. There are two ways this can be done:

  • A rule is defined for the datasource so that segments are automatically dropped from historical-nodes after a certain duration (this is what we do for webrequest, for instance). The "rule" is visible using druid admin UI. Notice that the datasource is not disabled in that case.
  • The datasource is disabled in the coordinator, meaning after deep-storage segments deletion, the entire datasource will be lost. To disable datasource in the coordinator (reversible, data is still present in deep-storage and can reloaded easily):
 curl -L -X DELETE http://localhost:8081/druid/coordinator/v1/datasources/DATASOURCE_NAME

If you don't have access to the host, use any of the druid hostnames instead of localhost.

Actual deletion of deep storage segments

After either the rule is applied to the datasource is disabled, you can hard-delete segments that are not loaded in historical node from the deep storage. Please be careful, he step is irreversible. There are two documented ways of deleting segments at the hdfs level. The first one involves sending manuially a kill task to the overlord for every interval:

curl -L -X 'POST' -H 'Content-Type:application/json' -d "{ \"type\":\"kill\", \"id\":\"kill_task-tiles-poc-`date --iso-8601=seconds`\",\"dataSource\":\"DATASOURCE_NAME\", \"interval\":\"2016-01-01/2017-10-01\" }" localhost:8090/druid/indexer/v1/task

Overlords Administration UI

Only one overlord is the active leader at any given moment. The fastest way to figure it out is to try to establish a ssh tunnel like the following to one random Druid node of the target cluster, and then check the UI via browser (using localhost as described below). If you get the wrong overlord, the UI will not show up and you'll get a redirect to the right one.

ssh -N an-druid100X.eqiad.wmnet -L 8090:an-druid100X.eqiad.wmnet:8090

http://localhost:8090/console.html

Deletion control

Check the dataset does not appear on deep storage directory on hdfs

hdfs dfs -ls /user/druid/deep-storage/<dataset>

Have in mind that data can be in deep storage but not be showing up in druid as "rules" on druid admin UI might have disabled data entirely

Coordinators Administration UI

Process to connect (explanation below):

ssh -N an-druid1001.eqiad.wmnet -L 8081:localhost:8081

Check http://localhost:8081/, if it works right away, great. Otherwise, it will redirect you to an-druidXXXX.eqiad.wmnet:8081. Use that to re-tunnel:

ssh -N an-druidXXXX.eqiad.wmnet -L 8081:localhost:8081

This is because only one coordinator is the active leader at any given moment. Guessing like this is probably the easiest way to find the active leader, but you can also do it by ssh-ing into the cluster.

Indexing Logs

Located at "/srv/druid/indexing-logs"

Safe restart of MiddleManagers when running Real time Indexing jobs

In order to avoid confusion for the Overlord, it is nice to drain jobs from a MiddleManager before restarting via the following command:

# Generic case
curl -X POST http://hostname:8091/druid/worker/v1/disable

# Example
curl -X POST http://an-druid1001.eqiad.wmnet:8091/druid/worker/v1/disable

The current indexing jobs assigned to a MiddleManager can be found checking the Overlord's console via ssh tunnel + browser to localhost:8090:

# ssh tunnel to the current Overlord leader
ssh -L 8090:localhost:8090 hostname -N

# example
ssh -L 8090:localhost:8090 an-druid1001.eqiad.wmnet -N

Please note that the Overlord runs on every Druid node, but only one is the leader for every given moment. After the creation of the ssh tunnel (you can start from a host picked up at random in the cluster), you'll get a redirect in the browser to the current hostname of the leader (when trying to access localhost:8090) if you didn't pick the right one. This is of course a quick and dirty procedure, but it works :)

You are free to restart the MiddleManager when you see that it is not running any indexing job. This will automatically put it back into enabled state.

Full Restart of services

To restart all druid services, you must restart each service on each Druid node individually. It is best to do them one at a time, but the order does not particularly matter.

NOTE: druid-historical can take a while to restart up, as it needs to re-read indexes.

NOTE2: if you are running Real time indexing jobs, please check the above paragraph before proceeding.

# for each Druid node (an-druid100[1-5]):
service druid-broker restart
service druid-coordinator restart
service druid-historical restart
service druid-middlemanager restart
service druid-overlord restart

Bash snippet to automate the restart:

#!/bin/bash
set -x
set -e

sudo service druid-broker restart
sudo service druid-broker status
sleep 5
sudo service druid-coordinator restart
sudo service druid-coordinator status
sleep 5
sudo service druid-historical restart
sudo service druid-historical status
sleep 120 # check that historical startup finishes in /var/log/druid/historical.log
sudo service druid-middlemanager restart
sudo service druid-middlemanager status
sleep 5
sudo service druid-overlord restart
sudo service druid-overlord status

We also co-locate a Zookeeper ensemble with Druid. At the moment the Zookeeper servers run on on nodes: an-druid100[1-3].

service zookeeper restart

Removing hosts/ taking hosts out of service from cluster

File:Coordinator dynamic config.png
Selecting coordinator dynamic configuration
File:Decommissioning druid historical nodes.png
Decommissioning druid historical nodes

There are two ways to remove Druid services from an active cluster before turning off the services.

1: Drain the middlemanager that you wish to stop. e.g. SSH to the host and run:

curl -X POST http://localhost:8091/druid/worker/v1/disable

2: Use the Dynamic Configuration API of the coordinator to set nodes into decommissioningNodes mode.

This is possible to do by sending a POST request to http://localhost:8081/druid/coordinator/v1/config and this is described [[1]].

However, the Druid documentation recommends using the coordinator web interface for setting dynamic configuration parameters.

From the top-right corner cog menu, select Coordinator Dynamic Config

Once the historical disk cache is drained, the middlemanager is not running any jobs, and the overlord is not targeted by any scheduled jobs, it is safe to stop the services.

Handling alarms for unavailable segments

We have alarms for both clusters related to the number of segments that the Coordinators see as unavailable, namely that should be loaded by the Historical daemons but for some reason they are not. If this happens, please check the logs in /var/log/druid/historical.log and see if anything is ongoing.

Regular indexations through Oozie

  • Regular indexations are made via oozie jobs (example).
  • For testing, the datasource-name to be indexed can be provided as an oozie parameter (-Ddruid_datasource=test_datasource)
  • To prevent reindexing production data, a non-hdfs user will be prevented to index datasources with name not starting with "test_"

One-off indexing data into Druid

Why

You may not want to write a whole oozie job if you're not planning to load data periodically on the datasource. An example of this was the archiving of Geowiki data. Since it was all old data, there was no need to ingest more new data in the future. Assuming you want to load some data from HDFS, you need to create an ingestion spec json file and put it in your home folder in a machine with access to the druid hosts (like stat1007).

The ingestion spec

At the refinery repository we have a few ingestion spec templates that you can use to fill the gaps. This is the one for the ingestion of the pageview_hourly datasource. The Druid documentation also has a step by step tutorial to generate your spec when loading from hadoop.

Format of your data

From experience, Druid prefers columnar data in the form of a json file formatted like this:

{"time": "2015-09-01T00:00:00Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}

{"time": "2015-09-01T01:00:00Z", "url": "/", "user": "bob", "latencyMs": 11}

{"time": "2015-09-01T01:30:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}

...but tsv/csv files are also fine! Make sure to specify the format, along with names of your columns, in your spec:

"parseSpec" : {
  "format" : "tsv",
  "dimensionsSpec" : {
    "dimensions" : [
      "project",
      "country",
      "cohort"
    ]
  },
  "delimiter": "\t",
  "columns": [
    "project",
    "country",
    "cohort",
    "month",
    "count",
    "ts"
  ],
  "timestampSpec" : {
    "format" : "auto",
    "column" : YOUR_TIME_COLUMN
  }
}

Segments

Druid uses your dataset's time dimension to partition the data into segments. You should specify in the ingestion spec the timespan of your dataset, as well as the granularity of your data:

"granularitySpec" : {
  "type" : "uniform",
  "segmentGranularity" : "month",
  "queryGranularity" : "none",
  "intervals" : ["START_DATE/END_DATE"] // The / slash to separate intervals is important
}

In the parseSpec mentioned above you can detail the format that your time column uses, but unless you're using non-standard timestamps, you can leave the format as auto, but remember to specify the name of the column.

Sending the indexation task

POSTing your spec

To initialize the ingestion you need to POST the json spec you just created to a druid overlord. Use this curl command if you want:

curl -L -X 'POST' -H 'Content-Type:application/json' -d @YOUR_INGESTION_SPEC.json http://druid1003.eqiad.wmnet:8090/druid/indexer/v1/task --dump-header -

You might need to use :

unset http_proxy && curl <blah> 

If you are running these commands from stats machines

The `--dump-header -` argument helps you make sure that your request was responded with a 200 OK code. If you get a different code, you may want to check that your JSON is validated, since usually the task fails after it has been accepted by the overlord. If you have a 200, you may monitor your task with the overlord console.

Other example:

curl -X POST -H "content-type: application/json" -d '
{
 "filter": {
   "field": {
     "value": "Unknown",
     "type": "selector",
     "dimension": "continent"
   },
   "type": "not"
 },
 "granularity": "all",
 "postAggregations": [],
 "metric": "count",
 "intervals": "2018-06-17T00:00:00+00:00/2018-07-17T19:27:53+00:00",
 "queryType": "topN",
 "dimension": "continent",
 "dataSource": "pageviews_daily",
 "threshold": 50000,
 "aggregations": [
   {
     "type": "count",
     "name": "count"
   }
 ]
}
' 'http://an-druid1001.eqiad.wmnet:8082/druid/v2/?pretty'

Monitoring the task

To access the overlord console, from your local machine create an SSH tunnel pointing at the overlord

ssh -N an-druid1001.eqiad.wmnet -L 8081:an-druid1002.eqiad.wmnet:8081

Enter localhost:8081 and check out the indexing tab. Your task should be listed there. It's recommended to follow the log link the moment you see it in the console, since the task disappears the moment it's either successful or fails.

If the task fails and the log link from the overlord console says nothing (which happens when no data has started loading), don't despair: that doesn't mean no log has been produced. Log into the druid host you used to send the task (in the case above, druid1003).

ssh an-druid1003.eqiad.wmnet
ls /var/lib/druid/indexing-logs

Your task should be logged there with a timestamp in the title. If it's not, there's still hope: a druid middle manager may have moved your task to a different druid host. You can find out which one in /var/log/druid/middlemanager.log. Or you can just SSH into any of the other four hosts (currently the hosts are: an-druid100[1-5]) and it'll probably be there.

Troubleshooting

The Druid documentation has a pretty nice ingestion troubleshooting section, so give that a shot. If everything fails, dig in the logs of the overlords (/var/log/druid), since there will be definitely information about your task there.

Realtime indexation to Druid

Druid offers Kafka Supervisors to allow real time indexation from Kafka to Druid. This is particularly useful for datasets like wmf_netflow (and in the past also banner impressions).

The json specs for our current supported jobs are stored in refinery under druid/kafka. To kick off the real time indexation it is sufficient to send a POST to the Druid overlord like the following:

curl -L -X POST -H 'Content-Type: application/json' -d '{
  "type": "kafka",
  [..cut..]
  }
}' http://an-druid1001.eqiad.wmnet:8090/druid/indexer/v1/supervisor

Or even simpler:

curl -L -X POST -H 'Content-Type: application/json' -d @PATH_OF_THE_SPEC.json http://an-druid1001.eqiad.wmnet:8090/druid/indexer/v1/supervisor

If you want to update an existing running supervisor, it is sufficient to POST the updated JSON spec to the overlord. It will take care of stopping the current one and re-launching the new one.

See also