Spark подключение к oracle
I’ve been meaning to write about Apache Spark for quite some time now – I’ve been working with a few of my customers and I find this framework powerful, practical, and useful for a lot of big data usages. For those of you who don’t know about Apache Spark, here is a short introduction.
Apache Spark is a framework for distributed calculation and handling of big data. Like Hadoop, it uses a clustered environment in order to partition and distribute the data to multiple nodes, dividing the work between them. Unlike Hadoop, Spark is based on the concepts of in-memory calculation. Its main advantages are the ability to pipeline operations (thus breaking the initial concept of a single map-single reduce in the MapReduce framework), making the code much easier to write and run, and doing it in an in-memory architecture so things run much faster.
Hadoop and Spark can co-exist, and by using YARN – we get many benefits from that kind of environment setup.
Of course, Spark is not bulletproof and you do need to know how to work with it to achieve the best performance. As a distributed application framework, Spark is awesome – and I suggest getting to know with it as soon as possible.
I will probably make a longer post introducing it in the near future (once I’m over with all of my prior commitments) .
In the meantime, here is a short explanation about how to connect from Spark SQL to Oracle Database.
Update: here is the 200 long slides presentation I made for Oracle Week 2016: it should cover most of the information new comers need to know about spark.
Spark presentation from Oracle Week 2016.
Using Spark SQL and Spark Shell
Once we have everything in place, we can use the Spark Shell (Scala based interpreter) to connect to the database and query some tables:
First, we create a data frame:
Now, we can count the rows:
Show the schema:
Or even query the data
We can even start manipulating the data, and converting it to RDD:
Architecture
This architecture deploys an Apache Spark cluster on Oracle Cloud Infrastructure using the manager/worker model. It has a manager node and three worker nodes, running on compute instances.
The following diagram illustrates this reference architecture.
Description of the illustration spark-oci-png.jpg
The architecture has the following components:
An Oracle Cloud Infrastructure region is a localized geographic area that contains one or more data centers, called availability domains. Regions are independent of other regions, and vast distances can separate them (across countries or even continents).
Availability domains are standalone, independent data centers within a region. The physical resources in each availability domain are isolated from the resources in the other availability domains, which provides fault tolerance. Availability domains don’t share infrastructure such as power or cooling, or the internal availability domain network. So, a failure at one availability domain is unlikely to affect the other availability domains in the region.
A fault domain is a grouping of hardware and infrastructure within an availability domain. Each availability domain has three fault domains with independent power and hardware. When you distribute resources across multiple fault domains, your applications can tolerate physical server failure, system maintenance, and power failures inside a fault domain.
A VCN is a customizable, software-defined network that you set up in an Oracle Cloud Infrastructure region. Like traditional data center networks, VCNs give you complete control over your network environment. A VCN can have multiple non-overlapping CIDR blocks that you can change after you create the VCN. You can segment a VCN into subnets, which can be scoped to a region or to an availability domain. Each subnet consists of a contiguous range of addresses that don't overlap with the other subnets in the VCN. You can change the size of a subnet after creation. A subnet can be public or private.
The compute instance that hosts the Apache Spark manager is attached to a regional public subnet. The workers are attached to a regional private subnet.
With block storage volumes, you can create, attach, connect, and move storage volumes, and change volume performance to meet your storage, performance, and application requirements. After you attach and connect a volume to an instance, you can use the volume like a regular hard drive. You can also disconnect a volume and attach it to another instance without losing data.
The Terraform quick-start template provided for this architecture provisions a 700-GB block volume for each worker node. While deploying the architecture, you can choose the number and size of the block volumes.
The architecture uses iSCSI, a TCP/IP-based standard, for communication between the volumes and the attached instances.
The internet gateway allows traffic between the public subnets in a VCN and the public internet.
A NAT gateway enables private resources in a VCN to access hosts on the internet, without exposing those resources to incoming internet connections.
Summary
This is not all we can do with Spark SQL – we can also join RDDs, intersect them and more – but I think this is enough for one post. This was about the basics, and maybe in a later post I can show some one functionality – if you find this interesting.
Spark has great potential in the big data world – it’s been one of the driving forces behind big data in the last couple of years. If you’re a DBA and hadn’t had the chance to go into big data – Spark is a great place to start…
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Open with Desktop
- View raw
- Copy raw contents Copy raw contents
Copy raw contents
Copy raw contents
Notes on querying Oracle from Apache Spark.
Relevant to reading Oracle tables using Spark SQl (Dataframes API), to transfer data from Oracle into Parquet or other formats. Find here also some notes on measuring performance, use of partitioning and also some thoughts on Apache Sqoop vs. Spark for data transfer.
An example of how to create a Spark DataFrame that reads from and Oracle table/view/query using JDBC.
Examples on how to write to Oracle
Example with TPCS protocol
Tested with Oracle 18c
Note on partitioning/parallelization of the JDBC source with Spark:
- The instruction above will read from Oracle using a single Spark task, this can be slow.
- When using partitioning options Spark will use as many tasks as "numPartitions"
- Each task will issue a query to read the data with an additional "where condition" generated from the lower and upper bounds and the number of partitions.
- If the Oracle table is partitioned by the column "partitionColumn" this could improve performance and use partition pruning for example. In other cases the query can generate multiple table scans or suboptimal index range scans of large parts of the table. See also below the discussion on Sqoop, that has additional optimizations for mappers/partitioners to use with Oracle. This functionality has not yet been ported to Spark.
- When loading large tables you may want to check with a DBA that the load is acceptable on the source DB Example:
Note: instead of a table name you can specify a query as in
What to check on the Oracle side and what to expect
- Use an Oracle monitoring tool, such as Oracle EM, or use relevant "DBA scripts" as in this repo
- Check the number of sessions connected to Oracle from the Spark executors and the sql_id of the SQL they are executing.
- expect numPartitions sessions in Oracle (1 session if you did not specify the option)
- The relevant info is in V$SESSION, or use the script @sessinfo s.username='MYORAUSER'
- from the sql_id see the SQl text from V$SQLSTATS or use the script @sql
- you should see queries on the table you specified, optionally with where caluses to read chunks of it if you sepficied partitioning cluases
- this can be done from Oralce SQL using V$SYSMETRIC or the script @sysmetric
- See various OS and DB metrics, including the network throughput
- You should expect the network throughput by this additional load to be around 10MB/sec per session. It could be less if reading from tables with small rows.
- query from V$SESSION and V$EVENTMETRIC to see the workload, for example use the scripts @top and @eventmetric
- in many cases you may see a low load on Oracle
- for example if workload is reading from Oracle and writing into Parquet, you'll find that in many cases the bottleneck is the CPU needed by Spark tasks to write into Parquet
- when the bottleneck is on the Spark side, Oracle sessions will report "wait events" such as: "SQL*Net more data to client", measning that Oracle sessions are waiting to be able to push more data to Spark executors which are otherwise busy
What to check on the Spark side
- Check the SPARK UI to see the progress of the job and how many tasks are being used concurrently
- you should expect "numPartitions" tasks (1 tasks if you did not specify a value for this option)
For example the output from a run with parallelism 12 shows that most of the time was spent running CPU cycles on the Spark cluster (rather than on Oracle which was mostly idle):
Notes on Apache Sqoop
Apache Sqoop and in particular its Oracle connector orahoop have additional optimizations that can improve substantially the performance of data transfer from Oracle to Hadoop compared to the method described above using Spark. See this link to Scoop documentation
An example of Sqoop/orahoop usage:
- Sqoop will generate a Map reduce job to process the data transfer
- Compared to the JDBC method with Spark described above this has several optimizations for Oracle
- Notably the way data is split among mappers uses methods that are native for Oracle (ROWID ranges by default, Sqoop can also use Oracle partitions to chunk data with the option -Doraoop.chunk.method="PARTITION"). Also data reads for Sqoop workloads by default do not interfere with the Oracle buffer cache (i.e. Sqoop uses serial direct reads).
Issues and remarks:
-
In one system the I found the following blocking Oracle error while loading a DF from an Oracle table with timestamp columns
SPARK-21519: Add an option to the JDBC data source to initialize the environment of the remote database session
Comments: The proposal of this option comes from the need to use it for an Oracle database and enable serial direct read (this allows Oracle to perform read operations that bypass the buffer cache), however it can be used for other DBs too as it is quite generic. Note this mechanism allows to inject code into the database connection, this is not a security vulnerability as it requires password authentication, however beware of the possibilities for injecting SQL (and PL/SQL) that this opens.
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Open with Desktop
- View raw
- Copy raw contents Copy raw contents
Copy raw contents
Copy raw contents
Узнайте, как установить Jupyter Notebook локально на компьютере и подключить его к кластеру Apache Spark.
Установка Jupyter Notebook на компьютере и подключение к Apache Spark в HDInsight
Из этой статьи вы узнаете, как установить Jupyter Notebook с ядром PySpark (для Python) и Apache Spark (для Scala) с помощью программы Spark Magic. Затем вы подключаете записную книжку к кластеру HDInsight.
При установке Jupyter и подключении к Apache Spark в HDInsight необходимо выполнить четыре основных шага.
- Настройка кластера Spark.
- Установите Jupyter Notebook.
- Установите ядра PySpark и Spark с помощью волшебной команды Spark.
- Настройте волшебную команду Spark для доступа к кластеру Spark в HDInsight.
Дополнительные сведения о пользовательских ядрах и волшебе Spark см. в разделе ядра, доступные для записных книжек Jupyter с кластерами Apache Spark Linux в HDInsight.
Кластер Apache Spark в HDInsight. Инструкции см. в статье Начало работы. Создание кластера Apache Spark в HDInsight на платформе Linux и выполнение интерактивных запросов с помощью SQL Spark. Локальная Записная книжка подключается к кластеру HDInsight.
Опыт работы с записными книжками Jupyter с Spark в HDInsight.
Установка Jupyter Notebook на компьютере
Установите Python перед установкой записных книжек Jupyter. В дистрибутиве Anaconda будут установлены оба, Python и Jupyter Notebook.
Скачайте установщик Anaconda для своей платформы и запустите программу установки. В мастере установки укажите параметр для добавления Anaconda в переменную PATH. См. также Установка Jupyter с помощью Anaconda.
Установка программы Spark Magic
Введите команду pip install sparkmagic==0.13.1 для установки программы Spark Magic для кластеров HDInsight версии 3,6 и 4,0. См. также документацию по sparkmagic.
Убедитесь, что ipywidgets установлен правильный параметр, выполнив следующую команду:
Установка ядер PySpark и Spark
Найдите место sparkmagic установки, введя следующую команду:
Затем измените рабочий каталог на Расположение , указанное в приведенной выше команде.
В новом рабочем каталоге введите одну или несколько приведенных ниже команд, чтобы установить требуемые ядра.
Необязательный параметр. Введите следующую команду, чтобы включить расширение сервера:
Настройка волшебной команды Spark для подключения к кластеру HDInsight Spark
В этом разделе вы настроите магическое значение Spark, установленное ранее, для подключения к кластеру Apache Spark.
Запустите оболочку Python с помощью следующей команды:
Сведения о конфигурации Jupyter обычно хранятся в домашнем каталоге пользователей. Введите следующую команду, чтобы указать домашний каталог, и создайте папку с именем . sparkmagic. Будет выведен полный путь.
В папке .sparkmagic Создайте файл с именем config.js и добавьте в него следующий фрагмент JSON.
Внесите следующие изменения в файл:
Значение шаблона Новое значение ИМЕН Имя входа кластера, по умолчанию — admin . CLUSTERDNSNAME Имя кластера Пароль в кодировке Base64 для фактического пароля. Пароль для base64 можно создать по адресу https://www.url-encode-decode.com/base64-encode-decode/ . "livy_server_heartbeat_timeout_seconds": 60 При использовании sparkmagic 0.12.7 (кластеры версии 3.5 и 3.6) не заключайте. При использовании sparkmagic 0.2.3 (Clusters v 3.4) Замените на "should_heartbeat": true . Полный пример файла можно просмотреть в образце config.jsвразделе.
[!TIP]
Сигналы пульса отправляются, чтобы предотвратить утечку сеансов. При переходе в спящий режим или завершении работы компьютера пульс не отправляется, что приводит к очистке сеанса. Если вы хотите отключить такое поведение для кластеров версии 3.4, то можете настроить для параметра Livy livy.server.interactive.heartbeat.timeout значение 0 с помощью пользовательского интерфейса Ambari. Если для кластеров версии 3.5 не настроить соответствующую конфигурацию, приведенную выше, то сеанс не будет удален.Запустите Jupyter. Выполните следующую команду из командной строки.
Убедитесь, что вы можете использовать магическую платформу Spark, доступную в ядрах. Выполните следующие шаги.
а. Создайте новую записную книжку. В правом углу выберите создать. Вы должны увидеть ядро по умолчанию Python 2 или Python 3 и установленные ядра. Фактические значения могут различаться в зависимости от выбранных вариантов установки. Выберите PySpark.
. image type="content" source="./media/apache-spark-jupyter-notebook-install-locally/jupyter-kernels-notebook.jpg " alt-text="Доступные ядра в Jupyter Notebook" border="true".
b. Запустите следующий фрагмент кода.
Если вы успешно получили выходные данные, подключение к кластеру HDInsight работает.
Если вы хотите обновить конфигурацию записной книжки для подключения к другому кластеру, обновите config.jsс новым набором значений, как показано на шаге 3 выше.
Зачем устанавливать Jupyter на моем компьютере?
Причины для установки Jupyter на компьютере и последующего подключения к кластеру Apache Spark в HDInsight:
[!WARNING]
Если Jupyter установлен на локальном компьютере, несколько пользователей могут одновременно запустить одну и ту же записную книжку в одном кластере Spark. В такой ситуации создаются несколько сеансов Livy. Если вы столкнетесь с проблемами и начнете их отладку, вам будет сложно определить, какой сеанс Livy какому пользователю принадлежит.Spark SQL also includes a data source that can read data from other databases using JDBC. This functionality should be preferred over using JdbcRDD. This is because the results are returned as a DataFrame and they can easily be processed in Spark SQL or joined with other data sources. The JDBC data source is also easier to use from Java or Python as it does not require the user to provide a ClassTag. (Note that this is different than the Spark SQL JDBC server, which allows other applications to run queries using Spark SQL).
To get started you will need to include the JDBC driver for your particular database on the spark classpath. For example, to connect to postgres from the Spark Shell you would run the following command:
Deploy
The code required to deploy this reference architecture is available in GitHub. You can pull the code into Oracle Cloud Infrastructure Resource Manager with a single click, create the stack, and deploy it. Alternatively, download the code from GitHub to your computer, customize the code, and deploy the architecture by using the Terraform CLI.
If you aren't already signed in, enter the tenancy and user credentials.
To make any changes, return to the Stack Details page, click Edit Stack , and make the required changes. Then, run the Plan action again.
- Go to GitHub.
- Clone or download the repository to your local computer.
- Follow the instructions in the README document.
Spark SQL
We can build RDDs from files, and that’s great, but files is not the only source of data out there. We know that sometime we keep our data in a more complex data stores: it can be relational databases (Oracle, MySQL, Postgres), it can be NoSQL (Redis, Oracle NoSQL etc.), JSON datasets, and it can even be on data structures that are native to Hadoop – Hbase and Hive to name a few. Most of those structures, allow retrieving of the data in a language we already know (SQL) – but Spark didn’t allow building RDDs from that and a new module was born, Spark SQL (Shark, in earlier versions).
The Spark SQL module allows us the ability to connect to databases and use SQL language to create new structure that can be converted to RDD. Spark SQL is built on two main components: DataFrame and SQLContext.
The SQLContext encapsulate all relational functionality in Spark. When we create the SQLContext from the existing SparkContext (basic component for Spark Core), we’re actually extending the Spark Context functionality to be able to “talk” to databases, based on the connecter we provide.
DataFrame is a distributed collection of data organized into named columns. This will be the result set of what we read from the database (table). The nice thing about DF is that we can convert it to RDD, and use Spark regular functionality; we can manipulate the data in it just like any other RDD we load into Spark.
Connecting Spark with Oracle Database
Before we actually begin connecting Spark to Oracle, we need a short explanation on Spark’s basic building block, which is called RDD – Resilient Distributed Dataset. RDD is a data structure that is being distributed across the cluster, but from the developer perspective, there is no need to know how and where the data is. Every operation we do to the data is being distributed and collected back whenever we need it.
Using Spark Core, most RDDs are being built from files – they can be on the local driver machine, Amazon S3, and even HDFS – but never the less, they are all files. We can also build RDDs from other RDDs by manipulating them (transform functions).
Explore More
Using Spark with Oracle RDBMS
The first thing we need to do in order to use Spark with Oracle is to actually install Spark framework. This is a very easy task, even if you don’t have any clusters. There is no need for Hadoop installation or any kind of framework, other than Spark binaries.
The first step is to go to Spark download page and download a package. In my example, I used Pre-Built for Hadoop 2.6. This means that we don’t need Hadoop cluster – the Spark installation will come with all of its Hadoop prerequisites.
Once we get the file, we can deploy the Spark on our server:
We can now go to the Spark directory and start the master server. The master in Spark is the component that is in charge of distributing the work between Spark workers (or slaves):
Once the master has started, we can start a worker node as well. This worker will need to know the master node:
We now have a (small) working Spark Standalone Cluster. The standalone in this part refer to the fact that there is no external resource manager such as YARN, not that the node is stand-alone.
Change Log
This log lists significant changes:
Added the option to download editable versions (.SVG and .DRAWIO) of the architecture diagram.
Data Source Option
Spark supports the following case-insensitive options for JDBC. The Data source options of JDBC can be set via:
- the .option / .options methods of
- DataFrameReader
- DataFrameWriter
For connection properties, users can specify the JDBC connection properties in the data source options. user and password are normally provided as connection properties for logging into the data sources.
- It is not allowed to specify dbtable and query options at the same time.
- It is not allowed to specify query and partitionColumn options at the same time. When specifying partitionColumn option is required, the subquery can be specified using dbtable option instead and partition columns can be qualified using the subquery alias provided as part of dbtable .
Example:
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()
- refreshKrb5Config flag is set with security context 1
- A JDBC connection provider is used for the corresponding DBMS
- The krb5.conf is modified but the JVM not yet realized that it must be reloaded
- Spark authenticates successfully for security context 1
- The JVM loads security context 2 from the modified krb5.conf
- Spark restores the previously saved security context 1
- The modified krb5.conf content just gone
Note that kerberos authentication with keytab is not always supported by the JDBC driver.
Before using keytab and principal configuration options, please make sure the following requirements are met:- The included JDBC driver version supports kerberos authentication with keytab.
- There is a built-in connection provider which supports the used database.
There is a built-in connection providers for the following databases:
If the requirements are not met, please consider using the JdbcConnectionProvider developer API to handle custom authentication.
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
Apache Spark is an open-source, cluster-computing framework for data analytics. Oracle Cloud Infrastructure provides a reliable, high-performance platform for running and managing your Apache Spark -based Big Data applications.
Recommendations
Your requirements might differ from the architecture described here. Use the following recommendations as a starting point.
When you create a VCN, determine the number of CIDR blocks required and the size of each block based on the number of resources that you plan to attach to subnets in the VCN. Use CIDR blocks that are within the standard private IP address space.
Select CIDR blocks that don't overlap with any other network (in Oracle Cloud Infrastructure , your on-premises data center, or another cloud provider) to which you intend to set up private connections.
After you create a VCN, you can change, add, and remove its CIDR blocks.
When you design the subnets, consider your traffic flow and security requirements. Attach all the resources within a specific tier or role to the same subnet, which can serve as a security boundary.
Use regional subnets.
This architecture uses an Oracle Linux 7.7 OS image with a VM.Standard2.1 shape for both the manager and worker nodes. If your application needs more memory, cores, or network bandwidth, you can choose a different shape.
Even though Apache Spark can run alone, in this architecture, it runs on Hadoop.
In this architecture, a single manager node and three worker nodes are deployed as part of the Apache Spark cluster.
Use Oracle Cloud Guard to monitor and maintain the security of your resources in Oracle Cloud Infrastructure proactively. Cloud Guard uses detector recipes that you can define to examine your resources for security weaknesses and to monitor operators and users for risky activities. When any misconfiguration or insecure activity is detected, Cloud Guard recommends corrective actions and assists with taking those actions, based on responder recipes that you can define.
For resources that require maximum security, Oracle recommends that you use security zones. A security zone is a compartment associated with an Oracle-defined recipe of security policies that are based on best practices. For example, the resources in a security zone must not be accessible from the public internet and they must be encrypted using customer-managed keys. When you create and update resources in a security zone, Oracle Cloud Infrastructure validates the operations against the policies in the security-zone recipe, and denies operations that violate any of the policies.
Considerations
Consider using bare metal shapes for Compute instances for both the manager and worker nodes. You can achieve significant performance benefits by running Big Data applications on a bare metal Spark cluster.
Fault domains provide the best resilience within a single availability domain. You can deploy compute instances that perform the same tasks in multiple availability domains. This design removes a single point of failure by introducing redundancy.
You might also consider creating an extra Spark manager node as a backup for high availability.
You can scale your application by using the instance pool and autoscaling features.
- Using instance pools you can provision and create multiple compute instances based on the same configuration within the same region.
- Autoscaling enables you to automatically adjust the number of compute instances in an instance pool, based on performance metrics like CPU utilization.
You can use Oracle Cloud Infrastructure Object Storage to store the data instead of block volumes. If you use object storage, create a service gateway for connectivity from nodes in private subnets.
Oracle also offers the Hadoop Distributed File System (HDFS) Connector for Oracle Cloud Infrastructure Object Storage . Using the HDFS connector, your Apache Hadoop applications can read and write data to and from object storage.
This architecture uses Terraform to create the infrastructure and deploy the Spark cluster.
You can, instead, use the fully managed service, Oracle Cloud Infrastructure Data Flow , which provides a rich user interface to allow developers and data scientists to create, edit, and run Apache Spark applications at any scale without the need for clusters, an operations team, or highly specialized Spark knowledge. As a fully managed service, there’s no infrastructure to deploy or manage.
Use policies to restrict who can access your Oracle Cloud Infrastructure resources and what actions they can perform.
Читайте также: